Inserting Data using Stage TableΒΆ
Let us understand how to insert data into order_items with Parquet file format.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Managing Tables - DML and Partitioning").
master("yarn").
getOrCreate
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
As data is in text file format and our table is created with Parquet file format, we will not be able to use LOAD command to load the data.
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items'
OVERWRITE INTO TABLE order_items
Above load command will be successful, however when we try to query it will fail as the query expects data to be in Parquet file format.
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
TRUNCATE TABLE order_items
Following are the steps to get data into table which is created using different file format or delimiter than our source data.
We need to create stage table with text file format and comma as delimiter (order_items_stage).
Load data from our files in local file system to stage table.
Using stage table run insert command to insert data into our target table (order_items).
Let us see an example of inserting data into the target table from staging table.
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
CREATE TABLE order_items_stage (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
spark.sql("DESCRIBE FORMATTED order_items_stage").show(200, false)
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items_stage
%%sql
SELECT * FROM order_items_stage LIMIT 10
%%sql
TRUNCATE TABLE order_items
%%sql
INSERT INTO TABLE order_items
SELECT * FROM order_items_stage
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
SELECT count(1) FROM order_items
INSERT INTO
will append data into the target table by adding new files.
%%sql
INSERT INTO TABLE order_items
SELECT * FROM order_items_stage
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
SELECT count(1) FROM order_items
INSERT OVERWRITE
will overwrite the data in target table by deleting the files related to old data from the directory pointed by the Spark Metastore table.
%%sql
INSERT OVERWRITE TABLE order_items
SELECT * FROM order_items_stage
%%sql
SELECT * FROM order_items
%%sql
SELECT count(1) FROM order_items
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/order_items" !