Ranking using Windowing FunctionsΒΆ
Let us see how we can assign ranks using different rank functions.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
username = itversity
itversity
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Windowing Functions").
master("yarn").
getOrCreate
username = itversity
spark = org.apache.spark.sql.SparkSession@32571fa2
org.apache.spark.sql.SparkSession@32571fa2
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
If we have to assign ranks globally, we just need to specify ORDER BY
If we have to assign ranks with in a key then we need to specify PARTITION BY and then ORDER BY.
By default ORDER BY will sort the data in ascending order. We can change the order by passing DESC after order by.
We have 3 main functions to assign ranks -
rank
,dense_rank
androw_number
. We will see the difference between the 3 in a moment.
Here is an example to assign sparse ranks using the table daily_product_revenue with in each day based on revenue. We can use rank
function to assign sparse ranks.
%%sql
USE itversity_retail
Waiting for a Spark session to start...
++
||
++
++
%%sql
SELECT t.*,
rank() OVER (
PARTITION BY order_date
ORDER BY revenue DESC
) AS rnk
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100
Magic sql failed to execute with error:
execute, tree:
Exchange hashpartitioning(order_date#5, 200)
+- HiveTableScan [order_date#5, order_item_product_id#6, revenue#7], HiveTableRelation `itversity_retail`.`daily_product_revenue`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [order_date#5, order_item_product_id#6, revenue#7]
Note
Here is another example to assign ranks using employees data set with in each department. We can also use other functions such as dense_rank
and row_number
to assign ranks.
%%sql
USE itversity_hr
++
||
++
++
%%sql
SELECT
employee_id,
department_id,
salary,
rank() OVER (
PARTITION BY department_id
ORDER BY salary DESC
) rnk,
dense_rank() OVER (
PARTITION BY department_id
ORDER BY salary DESC
) drnk,
row_number() OVER (
PARTITION BY department_id
ORDER BY salary DESC
) rn
FROM employees
ORDER BY department_id, salary DESC
spark.sql("""
SELECT
employee_id,
department_id,
salary,
rank() OVER (
PARTITION BY department_id
ORDER BY salary DESC
) rnk,
dense_rank() OVER (
PARTITION BY department_id
ORDER BY salary DESC
) drnk,
row_number() OVER (
PARTITION BY department_id
ORDER BY salary DESC, employee_id
) rn
FROM employees
ORDER BY department_id, salary DESC
""").
show(100, false)
%%sql
SELECT * FROM employees ORDER BY salary LIMIT 10
Note
Here is the example for global rank with out PARTITION BY
clause.
%%sql
SELECT employee_id, salary,
dense_rank() OVER (ORDER BY salary DESC) AS drnk
FROM employees
Let us understand the difference between rank, dense_rank and row_number.
We can use either of the functions to generate ranks when there are no duplicates in the column based on which ranks are assigned.
When the column based on which ranks are assigned have duplicates then row_number should not be used as it generate unique number for each record with in the partition. For those duplicate values, the row number need not be same across multiple runs.
rank will skip the ranks in between if multiple people get the same rank while dense_rank will not skip the ranks based up on the number of times the value is repeated.