Aggregations using Windowing Functions

Let us see how we can perform aggregations with in a partition or group using Windowing/Analytics Functions.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Windowing Functions").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • For simple aggregations where we have to get grouping key and aggregated results we can use GROUP BY.

  • If we want to get the raw data along with aggregated results, then using GROUP BY is not possible or overly complicated.

  • Using aggregate functions with OVER Clause not only simplifies the process of writing query, but also better with respect to performance.

  • Let us take an example of getting employee salary percentage when compared to department salary expense.

%%sql

USE itversity_hr
%%sql

SELECT employee_id, department_id, salary 
FROM employees 
ORDER BY department_id, salary
LIMIT 10

Let us write the query using GROUP BY approach.

%%sql

SELECT department_id,
       sum(salary) AS department_salary_expense
FROM employees
GROUP BY department_id
ORDER BY department_id
%%sql

SELECT e.employee_id, e.department_id, e.salary,
       ae.department_salary_expense,
       ae.avg_salary_expense
FROM employees e JOIN (
     SELECT department_id, 
            sum(salary) AS department_salary_expense,
            avg(salary) AS avg_salary_expense
     FROM employees
     GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary

Let us see how we can get it using Analytics/Windowing Functions.

  • We can use all standard aggregate functions such as count, sum, min, max, avg etc.

%%sql

SELECT e.employee_id, e.department_id, e.salary,
       sum(e.salary) 
         OVER (PARTITION BY e.department_id)
         AS department_salary_expense
FROM employees e
ORDER BY e.department_id
%%sql

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
    avg(e.salary) OVER (PARTITION BY e.department_id) AS avg_sal_expense,
    min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
    max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
    count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id

Create tables to get daily revenue

Let us create couple of tables which will be used for the demonstrations of Windowing and Ranking functions.

  • We have ORDERS and ORDER_ITEMS tables.

  • Let us take care of computing daily revenue as well as daily product revenue.

  • As we will be using same data set several times, let us create the tables to pre compute the data.

  • daily_revenue will have the order_date and revenue, where data is aggregated using order_date as partition key.

  • daily_product_revenue will have order_date, order_item_product_id and revenue. In this case data is aggregated using order_date and order_item_product_id as partition keys.

Let us create table to compute daily revenue.

%%sql

USE itversity_retail
%%sql

DROP TABLE IF EXISTS daily_revenue
%%sql

CREATE TABLE daily_revenue
AS
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
%%sql

SELECT * 
FROM daily_revenue
ORDER BY order_date
LIMIT 10

Let us create table to compute daily product revenue.

%%sql

USE itversity_retail
%%sql

DROP TABLE IF EXISTS daily_product_revenue
%%sql

CREATE TABLE daily_product_revenue
AS
SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
%%sql

SELECT * 
FROM daily_product_revenue
ORDER BY order_date, order_item_product_id
LIMIT 10