Inserting Data using Stage TableΒΆ

Let us understand how to insert data into order_items with Parquet file format.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Managing Tables - DML and Partitioning").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

As data is in text file format and our table is created with Parquet file format, we will not be able to use LOAD command to load the data.

%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/order_items'
    OVERWRITE INTO TABLE order_items
  • Above load command will be successful, however when we try to query it will fail as the query expects data to be in Parquet file format.

%%sql

SELECT * FROM order_items LIMIT 10
%%sql

TRUNCATE TABLE order_items

Following are the steps to get data into table which is created using different file format or delimiter than our source data.

  • We need to create stage table with text file format and comma as delimiter (order_items_stage).

  • Load data from our files in local file system to stage table.

  • Using stage table run insert command to insert data into our target table (order_items).

Let us see an example of inserting data into the target table from staging table.

%%sql

USE itversity_retail
%%sql

SHOW tables
%%sql

CREATE TABLE order_items_stage (
  order_item_id INT,
  order_item_order_id INT,
  order_item_product_id INT,
  order_item_quantity INT,
  order_item_subtotal FLOAT,
  order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
spark.sql("DESCRIBE FORMATTED order_items_stage").show(200, false)
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items_stage
%%sql

SELECT * FROM order_items_stage LIMIT 10
%%sql

TRUNCATE TABLE order_items
%%sql

INSERT INTO TABLE order_items
SELECT * FROM order_items_stage
%%sql

SELECT * FROM order_items LIMIT 10
%%sql

SELECT count(1) FROM order_items
  • INSERT INTO will append data into the target table by adding new files.

%%sql

INSERT INTO TABLE order_items
SELECT * FROM order_items_stage
%%sql

SELECT * FROM order_items LIMIT 10
%%sql

SELECT count(1) FROM order_items
  • INSERT OVERWRITE will overwrite the data in target table by deleting the files related to old data from the directory pointed by the Spark Metastore table.

%%sql

INSERT OVERWRITE TABLE order_items
SELECT * FROM order_items_stage
%%sql

SELECT * FROM order_items
%%sql

SELECT count(1) FROM order_items
import sys.process._

s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/order_items" !