Ranking using Windowing FunctionsΒΆ

Let us see how we can assign ranks using different rank functions.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
username = itversity
itversity
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Windowing Functions").
    master("yarn").
    getOrCreate
username = itversity
spark = org.apache.spark.sql.SparkSession@32571fa2
org.apache.spark.sql.SparkSession@32571fa2

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • If we have to assign ranks globally, we just need to specify ORDER BY

  • If we have to assign ranks with in a key then we need to specify PARTITION BY and then ORDER BY.

  • By default ORDER BY will sort the data in ascending order. We can change the order by passing DESC after order by.

  • We have 3 main functions to assign ranks - rank, dense_rank and row_number. We will see the difference between the 3 in a moment.

Here is an example to assign sparse ranks using the table daily_product_revenue with in each day based on revenue. We can use rank function to assign sparse ranks.

%%sql

USE itversity_retail
Waiting for a Spark session to start...
++
||
++
++
%%sql

SELECT t.*,
  rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS rnk
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100
Magic sql failed to execute with error: 
execute, tree:
Exchange hashpartitioning(order_date#5, 200)
+- HiveTableScan [order_date#5, order_item_product_id#6, revenue#7], HiveTableRelation `itversity_retail`.`daily_product_revenue`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [order_date#5, order_item_product_id#6, revenue#7]

Note

Here is another example to assign ranks using employees data set with in each department. We can also use other functions such as dense_rank and row_number to assign ranks.

%%sql

USE itversity_hr
++
||
++
++
%%sql

SELECT
  employee_id,
  department_id,
  salary,
  rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rnk,
  dense_rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) drnk,
  row_number() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rn
FROM employees
ORDER BY department_id, salary DESC
spark.sql("""
SELECT
  employee_id,
  department_id,
  salary,
  rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rnk,
  dense_rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) drnk,
  row_number() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC, employee_id
  ) rn
FROM employees
ORDER BY department_id, salary DESC
""").
    show(100, false)
%%sql

SELECT * FROM employees ORDER BY salary LIMIT 10

Note

Here is the example for global rank with out PARTITION BY clause.

%%sql

SELECT employee_id, salary,
    dense_rank() OVER (ORDER BY salary DESC) AS drnk
FROM employees

Let us understand the difference between rank, dense_rank and row_number.

  • We can use either of the functions to generate ranks when there are no duplicates in the column based on which ranks are assigned.

  • When the column based on which ranks are assigned have duplicates then row_number should not be used as it generate unique number for each record with in the partition. For those duplicate values, the row number need not be same across multiple runs.

  • rank will skip the ranks in between if multiple people get the same rank while dense_rank will not skip the ranks based up on the number of times the value is repeated.