Filtering - Window Function Results

Let us understand how to filter on top of results of Window Functions.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Windowing Functions").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • We can use Window Functions only in SELECT Clause.

  • If we have to filter based on Window Function results, then we need to use Sub Queries.

  • Once the query with window functions is defined as sub query, we can apply filter using aliases provided for the Window Functions.

Here is the example where we can filter data based on Window Functions.

%%sql

USE itversity_retail
%%sql

SELECT * FROM (
  SELECT t.*,
    dense_rank() OVER (
      PARTITION BY order_date
      ORDER BY revenue DESC
    ) AS drnk
  FROM daily_product_revenue t
) q
WHERE q.drnk <= 5
ORDER BY q.order_date, q.revenue DESC
LIMIT 100
spark.sql("""SELECT * FROM (
  SELECT t.*,
    dense_rank() OVER (
      PARTITION BY order_date
      ORDER BY revenue DESC
    ) AS drnk
  FROM daily_product_revenue t
) q
WHERE q.drnk <= 5
ORDER BY q.order_date, q.revenue DESC
""").
    show(100, false)

Ranking and Filtering - Recap

Let us recap the procedure to get top 5 orders by revenue for each day.

  • We have our original data in orders and order_items

  • We can pre-compute the data and store in a table or create a view with the logic to generate daily product revenue

  • Then, we have to use the view or table or even sub query to compute rank

  • We can use the query with ranks as sub query to filter so that we can get top 5 products by revenue.

  • Let us see the overall process in action.

Let us come up with the query to compute daily product revenue.

%%sql

USE itversity_retail
%%sql

DESCRIBE orders
%%sql

DESCRIBE order_items
%%sql

SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
ORDER BY o.order_date, revenue DESC
LIMIT 100

Let us compute the rank for each product with in each date using revenue as criteria.

%%sql

SELECT q.*,
  rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS rnk,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM (SELECT o.order_date,
        oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q
ORDER BY order_date, revenue DESC
LIMIT 35

Now let us see how we can filter the data.

%%sql

SELECT * FROM (SELECT q.*,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM (SELECT o.order_date,
        oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q) q1
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35
spark.sql("DESCRIBE daily_product_revenue").show(false)
%%sql

SELECT * FROM (SELECT dpr.*,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM daily_product_revenue AS dpr)
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35