## Aggregations using Windowing Functions

Let us see how we can perform aggregations with in a partition or group using Windowing/Analytics Functions.

In [None]:
%%HTML
<iframe width="560" height="315" src="https://www.youtube.com/embed/peDMzBredoU?rel=0&amp;controls=1&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our [10 node state of the art cluster/labs](https://labs.itversity.com/plans) to learn Spark SQL using our unique integrated LMS.

In [None]:
val username = System.getProperty("user.name")

In [None]:
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Windowing Functions").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

**Using Spark SQL**

```
spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Scala**

```
spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

**Using Pyspark**

```
pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
```

* For simple aggregations where we have to get grouping key and aggregated results we can use **GROUP BY**.
* If we want to get the raw data along with aggregated results, then using **GROUP BY** is not possible or overly complicated.
* Using aggregate functions with **OVER** Clause not only simplifies the process of writing query, but also better with respect to performance.
* Let us take an example of getting employee salary percentage when compared to department salary expense.

In [None]:
%%sql

USE itversity_hr

In [None]:
%%sql

SELECT employee_id, department_id, salary 
FROM employees 
ORDER BY department_id, salary
LIMIT 10

> Let us write the query using `GROUP BY` approach.

In [None]:
%%sql

SELECT department_id,
       sum(salary) AS department_salary_expense
FROM employees
GROUP BY department_id
ORDER BY department_id

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
       ae.department_salary_expense,
       ae.avg_salary_expense
FROM employees e JOIN (
     SELECT department_id, 
            sum(salary) AS department_salary_expense,
            avg(salary) AS avg_salary_expense
     FROM employees
     GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary

> Let us see how we can get it using Analytics/Windowing Functions. 

* We can use all standard aggregate functions such as `count`, `sum`, `min`, `max`, `avg` etc.

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
       sum(e.salary) 
         OVER (PARTITION BY e.department_id)
         AS department_salary_expense
FROM employees e
ORDER BY e.department_id

In [None]:
%%sql

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
    avg(e.salary) OVER (PARTITION BY e.department_id) AS avg_sal_expense,
    min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
    max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
    count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id

### Create tables to get daily revenue

Let us create couple of tables which will be used for the demonstrations of Windowing and Ranking functions.

* We have **ORDERS** and **ORDER_ITEMS** tables.
* Let us take care of computing daily revenue as well as daily product revenue.
* As we will be using same data set several times, let us create the tables to pre compute the data.
* **daily_revenue** will have the **order_date** and **revenue**, where data is aggregated using **order_date** as partition key.
* **daily_product_revenue** will have **order_date**, **order_item_product_id** and **revenue**. In this case data is aggregated using **order_date** and **order_item_product_id** as partition keys.

Let us create table to compute daily revenue.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

DROP TABLE IF EXISTS daily_revenue

In [None]:
%%sql

CREATE TABLE daily_revenue
AS
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date

In [None]:
%%sql

SELECT * 
FROM daily_revenue
ORDER BY order_date
LIMIT 10

Let us create table to compute daily product revenue.

In [None]:
%%sql

USE itversity_retail

In [None]:
%%sql

DROP TABLE IF EXISTS daily_product_revenue

In [None]:
%%sql

CREATE TABLE daily_product_revenue
AS
SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id

In [None]:
%%sql

SELECT * 
FROM daily_product_revenue
ORDER BY order_date, order_item_product_id
LIMIT 10