Preparing Tables¶
Let us prepare the tables to solve the problem.
Make sure database is created.
Create orders table.
Load data from local path /data/retail_db/orders into newly created orders table.
Preview data and get count from orders
Create order_items table.
Load data from local path /data/retail_db/order_items into newly created orders table.
Preview data and get count from order_items
As tables and data are ready let us get into how to write queries against tables to perform basic transformation.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
username = itversity
itversity
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Basic Transformations").
master("yarn").
getOrCreate
username = itversity
spark = org.apache.spark.sql.SparkSession@48b7120c
org.apache.spark.sql.SparkSession@48b7120c
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
%%sql
DROP DATABASE itversity_retail CASCADE
Waiting for a Spark session to start...
++
||
++
++
%%sql
CREATE DATABASE IF NOT EXISTS itversity_retail
++
||
++
++
%%sql
USE itversity_retail
++
||
++
++
%%sql
SHOW tables
+----------------+--------------+-----------+
| database| tableName|isTemporary|
+----------------+--------------+-----------+
|itversity_retail| orders| false|
|itversity_retail|orders_parquet| false|
+----------------+--------------+-----------+
%%sql
DROP TABLE orders
Magic sql failed to execute with error:
Table or view not found: orders;
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
++
||
++
++
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
username = itversity
warning: there was one feature warning; re-run with -feature for details
0
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
++
||
++
++
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
%%sql
SELECT * FROM orders LIMIT 10
| 8|2013-07-25 00:00:...| 2911| PRO...
+--------+--------------------+-----------------+---------------+
|order_id| order_date|order_customer_id| order_status|
+--------+--------------------+-----------------+---------------+
| 1|2013-07-25 00:00:...| 11599| CLOSED|
| 2|2013-07-25 00:00:...| 256|PENDING_PAYMENT|
| 3|2013-07-25 00:00:...| 12111| COMPLETE|
| 4|2013-07-25 00:00:...| 8827| CLOSED|
| 5|2013-07-25 00:00:...| 11318| COMPLETE|
| 6|2013-07-25 00:00:...| 7130| COMPLETE|
| 7|2013-07-25 00:00:...| 4530| COMPLETE|
| 8|2013-07-25 00:00:...| 2911| PROCESSING|
| 9|2013-07-25 00:00:...| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:...| 5648|PENDING_PAYMENT|
+--------+--------------------+-----------------+---------------+
%%sql
SELECT count(1) FROM orders
+--------+
|count(1)|
+--------+
| 68883|
+--------+
%%sql
DROP TABLE order_items
%%sql
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
SELECT count(1) FROM order_items
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("SELECT * FROM orders LIMIT 10").show()
spark.sql("SELECT count(1) FROM orders").show()
spark.sql("DROP TABLE order_items")
spark.sql("""
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("SELECT * FROM order_items LIMIT 10").show()
spark.sql("SELECT count(1) FROM order_items").show()