Preparing Tables

Let us prepare the tables to solve the problem.

  • Make sure database is created.

  • Create orders table.

  • Load data from local path /data/retail_db/orders into newly created orders table.

  • Preview data and get count from orders

  • Create order_items table.

  • Load data from local path /data/retail_db/order_items into newly created orders table.

  • Preview data and get count from order_items

As tables and data are ready let us get into how to write queries against tables to perform basic transformation.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
username = itversity
itversity
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Basic Transformations").
    master("yarn").
    getOrCreate
username = itversity
spark = org.apache.spark.sql.SparkSession@48b7120c
org.apache.spark.sql.SparkSession@48b7120c

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
%%sql

DROP DATABASE itversity_retail CASCADE
Waiting for a Spark session to start...
++
||
++
++
%%sql

CREATE DATABASE IF NOT EXISTS itversity_retail
++
||
++
++
%%sql
USE itversity_retail
++
||
++
++
%%sql
SHOW tables
+----------------+--------------+-----------+
|        database|     tableName|isTemporary|
+----------------+--------------+-----------+
|itversity_retail|        orders|      false|
|itversity_retail|orders_parquet|      false|
+----------------+--------------+-----------+
%%sql

DROP TABLE orders
Magic sql failed to execute with error: 
Table or view not found: orders;
%%sql

CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
++
||
++
++
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
username = itversity
warning: there was one feature warning; re-run with -feature for details
0
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
++
||
++
++
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
%%sql

SELECT * FROM orders LIMIT 10
|       8|2013-07-25 00:00:...|             2911|     PRO...
+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
+--------+--------------------+-----------------+---------------+
%%sql

SELECT count(1) FROM orders
+--------+
|count(1)|
+--------+
|   68883|
+--------+
%%sql

DROP TABLE order_items
%%sql 

CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql

SELECT * FROM order_items LIMIT 10
%%sql

SELECT count(1) FROM order_items
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("SELECT * FROM orders LIMIT 10").show()
spark.sql("SELECT count(1) FROM orders").show()
spark.sql("DROP TABLE order_items")
spark.sql("""
CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("SELECT * FROM order_items LIMIT 10").show()
spark.sql("SELECT count(1) FROM order_items").show()