Conclusion - Final Solution¶
Let us review the Final Solution for our problem statement daily_product_revenue.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Basic Transformations").
master("yarn").
getOrCreate
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Prepare tables
Create tables
Load the data into tables
We need to project the fields which we are interested in.
order_date
order_item_product_id
product_revenue
As we have fields from multiple tables, we need to perform join after which we have to filter for COMPLETE or CLOSED orders.
We have to group the data by order_date and order_item_product_id, then we have to perform aggregation on order_item_subtotal to get product_revenue.
%%sql
DROP DATABASE itversity_retail CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
%%sql
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
ORDER BY o.order_date,
product_revenue DESC
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables").show()
spark.sql("""
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
INTO TABLE orders
""")
spark.sql("""
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/order_items'
INTO TABLE order_items
""")
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
ORDER BY o.order_date,
product_revenue DESC
""").show()