Aggregations using Windowing Functions¶
Let us see how we can perform aggregations with in a partition or group using Windowing/Analytics Functions.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Windowing Functions").
master("yarn").
getOrCreate
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
For simple aggregations where we have to get grouping key and aggregated results we can use GROUP BY.
If we want to get the raw data along with aggregated results, then using GROUP BY is not possible or overly complicated.
Using aggregate functions with OVER Clause not only simplifies the process of writing query, but also better with respect to performance.
Let us take an example of getting employee salary percentage when compared to department salary expense.
%%sql
USE itversity_hr
%%sql
SELECT employee_id, department_id, salary
FROM employees
ORDER BY department_id, salary
LIMIT 10
Let us write the query using
GROUP BY
approach.
%%sql
SELECT department_id,
sum(salary) AS department_salary_expense
FROM employees
GROUP BY department_id
ORDER BY department_id
%%sql
SELECT e.employee_id, e.department_id, e.salary,
ae.department_salary_expense,
ae.avg_salary_expense
FROM employees e JOIN (
SELECT department_id,
sum(salary) AS department_salary_expense,
avg(salary) AS avg_salary_expense
FROM employees
GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary
Let us see how we can get it using Analytics/Windowing Functions.
We can use all standard aggregate functions such as
count
,sum
,min
,max
,avg
etc.
%%sql
SELECT e.employee_id, e.department_id, e.salary,
sum(e.salary)
OVER (PARTITION BY e.department_id)
AS department_salary_expense
FROM employees e
ORDER BY e.department_id
%%sql
SELECT e.employee_id, e.department_id, e.salary,
sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
avg(e.salary) OVER (PARTITION BY e.department_id) AS avg_sal_expense,
min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id
Create tables to get daily revenue¶
Let us create couple of tables which will be used for the demonstrations of Windowing and Ranking functions.
We have ORDERS and ORDER_ITEMS tables.
Let us take care of computing daily revenue as well as daily product revenue.
As we will be using same data set several times, let us create the tables to pre compute the data.
daily_revenue will have the order_date and revenue, where data is aggregated using order_date as partition key.
daily_product_revenue will have order_date, order_item_product_id and revenue. In this case data is aggregated using order_date and order_item_product_id as partition keys.
Let us create table to compute daily revenue.
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS daily_revenue
%%sql
CREATE TABLE daily_revenue
AS
SELECT o.order_date,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
%%sql
SELECT *
FROM daily_revenue
ORDER BY order_date
LIMIT 10
Let us create table to compute daily product revenue.
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS daily_product_revenue
%%sql
CREATE TABLE daily_product_revenue
AS
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
%%sql
SELECT *
FROM daily_product_revenue
ORDER BY order_date, order_item_product_id
LIMIT 10