Aggregations using Windowing Functions

Let us see how we can perform aggregations with in a partition or group using Windowing/Analytics Functions.

val username = System.getProperty("")
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    appName(s"${username} | Spark SQL - Windowing Functions").

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • For simple aggregations where we have to get grouping key and aggregated results we can use GROUP BY.

  • If we want to get the raw data along with aggregated results, then using GROUP BY is not possible or overly complicated.

  • Using aggregate functions with OVER Clause not only simplifies the process of writing query, but also better with respect to performance.

  • Let us take an example of getting employee salary percentage when compared to department salary expense.


USE itversity_hr

SELECT employee_id, department_id, salary 
FROM employees 
ORDER BY department_id, salary

Let us write the query using GROUP BY approach.


SELECT department_id,
       sum(salary) AS department_salary_expense
FROM employees
GROUP BY department_id
ORDER BY department_id

SELECT e.employee_id, e.department_id, e.salary,
FROM employees e JOIN (
     SELECT department_id, 
            sum(salary) AS department_salary_expense,
            avg(salary) AS avg_salary_expense
     FROM employees
     GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary

Let us see how we can get it using Analytics/Windowing Functions.

  • We can use all standard aggregate functions such as count, sum, min, max, avg etc.


SELECT e.employee_id, e.department_id, e.salary,
         OVER (PARTITION BY e.department_id)
         AS department_salary_expense
FROM employees e
ORDER BY e.department_id

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
    avg(e.salary) OVER (PARTITION BY e.department_id) AS avg_sal_expense,
    min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
    max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
    count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id

Create tables to get daily revenue

Let us create couple of tables which will be used for the demonstrations of Windowing and Ranking functions.

  • We have ORDERS and ORDER_ITEMS tables.

  • Let us take care of computing daily revenue as well as daily product revenue.

  • As we will be using same data set several times, let us create the tables to pre compute the data.

  • daily_revenue will have the order_date and revenue, where data is aggregated using order_date as partition key.

  • daily_product_revenue will have order_date, order_item_product_id and revenue. In this case data is aggregated using order_date and order_item_product_id as partition keys.

Let us create table to compute daily revenue.


USE itversity_retail

DROP TABLE IF EXISTS daily_revenue

CREATE TABLE daily_revenue
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date

FROM daily_revenue
ORDER BY order_date

Let us create table to compute daily product revenue.


USE itversity_retail

DROP TABLE IF EXISTS daily_product_revenue

CREATE TABLE daily_product_revenue
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id

FROM daily_product_revenue
ORDER BY order_date, order_item_product_id