Conclusion - Final Solution

Let us review the Final Solution for our problem statement daily_product_revenue.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Basic Transformations").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • Prepare tables

    • Create tables

    • Load the data into tables

  • We need to project the fields which we are interested in.

    • order_date

    • order_item_product_id

    • product_revenue

  • As we have fields from multiple tables, we need to perform join after which we have to filter for COMPLETE or CLOSED orders.

  • We have to group the data by order_date and order_item_product_id, then we have to perform aggregation on order_item_subtotal to get product_revenue.

%%sql

DROP DATABASE itversity_retail CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql

USE itversity_retail
%%sql

SHOW tables
%%sql

CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
%%sql 

CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    product_revenue DESC
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables").show()
spark.sql("""
CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
INTO TABLE orders
""")
spark.sql("""
CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' 
INTO TABLE order_items
""")
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    product_revenue DESC
""").show()