Exercise - Partitioned Tables

Let us take care of this exercise related to partitioning to self evaluate our comfort level in working with partitioned tables.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Managing Tables - DML and Partitioning").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • Duration: 30 Minutes

  • Use data from /data/nyse_all/nyse_data

  • Use database YOUR_OS_USER_NAME_nyse

  • Create partitioned table nyse_eod_part

  • Field Names: stockticker, tradedate, openprice, highprice, lowprice, closeprice, volume

  • Determine correct data types based on the values

  • Create Managed table with “,” as delimiter.

  • Partition Field should be tradeyear and of type INT (one partition for corresponding year)

  • Insert data into partitioned table using dynamic partition mode.

  • Here are the steps to come up with the solution.

    • Review the files under /data/nyse_all/nyse_data - determine data types (For example: tradedate should be INT and volume should be BIGINT)

    • Create database YOUR_OS_USER_NAME_nyse (if it does not exists)

    • Create non partitioned stage table

    • Load data into non partitioned stage table

    • Validate the count and also see that data is as expected by running simple select query.

    • Create partitioned table

    • Set required properties to use dynamic partition

    • Insert data into partitioned table - here is how you can compute year from tradedate of type int year(to_date(cast(tradedate AS STRING), 'yyyyMMdd')) AS tradeyear

    • Run below validate commands to validate

Validation

Here are the instructions to validate the results.

  • Run hdfs dfs -ls /user/YOUR_OS_USER_NAME/warehouse/YOUR_OS_USER_NAME_nyse.db/nyse_eod_part

  • Run SHOW PARTITIONS YOUR_OS_USER_NAME_nyse.nyse_eod_part. You should see partitions for all the years using which you have loaded the data.

  • Run SELECT count(1) FROM YOUR_OS_USER_NAME_nyse.nyse_eod_part. The count should match the number of records in our dataset.

  • You can compare with the output generated by this simple Python code which is validated in our labs.

import pandas as pd
import glob

path = r'/data/nyse_all/nyse_data' # use your path
all_files = glob.glob(path + "/*.txt.gz")

li = []

for filename in all_files:
    df = pd.read_csv(filename, index_col=None, header=None)
    li.append(df)

frame = pd.concat(li, axis=0, ignore_index=True)
frame.shape