Using LEAD or LAGΒΆ
Let us understand LEAD and LAG functions to get column values from following or prior records.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Windowing Functions").
master("yarn").
getOrCreate
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Here is the example where we can get prior or following records based on ORDER BY within OVER Clause.
%%sql
USE itversity_retail
%%sql
SELECT * FROM daily_revenue
ORDER BY order_date DESC
LIMIT 10
%%sql
SELECT t.*,
lead(order_date) OVER (ORDER BY order_date DESC) AS prior_date,
lead(revenue) OVER (ORDER BY order_date DESC) AS prior_revenue,
lag(order_date) OVER (ORDER BY order_date) AS lag_prior_date,
lag(revenue) OVER (ORDER BY order_date) AS lag_prior_revenue
FROM daily_revenue AS t
ORDER BY order_date DESC
LIMIT 10
%%sql
SELECT t.*,
lead(order_date) OVER (ORDER BY order_date DESC) AS prior_date,
lead(revenue) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue AS t
ORDER BY order_date
LIMIT 10
We can also pass number of rows as well as default values for nulls as arguments.
%%sql
USE itversity_retail
%%sql
SELECT t.*,
lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date DESC
LIMIT 10
%%sql
SELECT t.*,
lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date
LIMIT 10
%%sql
SELECT t.*,
lead(order_date, 7, 'NA') OVER (ORDER BY order_date DESC) AS prior_date,
lead(revenue, 7, 0) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date
LIMIT 10
Let us see how we can get prior or following records with in a group based on particular order.
Here is the example where we can get prior or following records based on PARTITION BY and then ORDER BY Clause.
%%sql
USE itversity_retail
%%sql
DESCRIBE daily_product_revenue
%%sql
SELECT * FROM daily_product_revenue
ORDER BY order_date, revenue DESC
LIMIT 10
%%sql
SELECT t.*,
LEAD(order_item_product_id) OVER (
PARTITION BY order_date
ORDER BY revenue DESC
) next_product_id,
LEAD(revenue) OVER (
PARTITION BY order_date
ORDER BY revenue DESC
) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100
spark.sql("""
SELECT t.*,
LEAD(order_item_product_id) OVER (
PARTITION BY order_date
ORDER BY revenue DESC
) next_product_id,
LEAD(revenue) OVER (
PARTITION BY order_date
ORDER BY revenue DESC
) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
""").
show(100, false)
We can also pass number of rows as well as default values for nulls as arguments.
%%sql
SELECT t.*,
LEAD(order_item_product_id) OVER (
PARTITION BY order_date ORDER BY revenue DESC
) next_product_id,
LEAD(revenue, 1, 0) OVER (
PARTITION BY order_date ORDER BY revenue DESC
) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100
spark.sql("""
SELECT t.*,
LEAD(order_item_product_id) OVER (
PARTITION BY order_date ORDER BY revenue DESC
) next_product_id,
LEAD(revenue, 1, 0) OVER (
PARTITION BY order_date ORDER BY revenue DESC
) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100
""").
show(100, false)