Exercise - Partitioned Tables¶
Let us take care of this exercise related to partitioning to self evaluate our comfort level in working with partitioned tables.
Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.
val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession
val username = System.getProperty("user.name")
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
enableHiveSupport.
appName(s"${username} | Spark SQL - Managing Tables - DML and Partitioning").
master("yarn").
getOrCreate
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Duration: 30 Minutes
Use data from /data/nyse_all/nyse_data
Use database YOUR_OS_USER_NAME_nyse
Create partitioned table nyse_eod_part
Field Names: stockticker, tradedate, openprice, highprice, lowprice, closeprice, volume
Determine correct data types based on the values
Create Managed table with “,” as delimiter.
Partition Field should be tradeyear and of type INT (one partition for corresponding year)
Insert data into partitioned table using dynamic partition mode.
Here are the steps to come up with the solution.
Review the files under /data/nyse_all/nyse_data - determine data types (For example: tradedate should be INT and volume should be BIGINT)
Create database YOUR_OS_USER_NAME_nyse (if it does not exists)
Create non partitioned stage table
Load data into non partitioned stage table
Validate the count and also see that data is as expected by running simple select query.
Create partitioned table
Set required properties to use dynamic partition
Insert data into partitioned table - here is how you can compute year from tradedate of type int
year(to_date(cast(tradedate AS STRING), 'yyyyMMdd')) AS tradeyear
Run below validate commands to validate
Validation¶
Here are the instructions to validate the results.
Run
hdfs dfs -ls /user/YOUR_OS_USER_NAME/warehouse/YOUR_OS_USER_NAME_nyse.db/nyse_eod_part
Run
SHOW PARTITIONS YOUR_OS_USER_NAME_nyse.nyse_eod_part
. You should see partitions for all the years using which you have loaded the data.Run
SELECT count(1) FROM YOUR_OS_USER_NAME_nyse.nyse_eod_part
. The count should match the number of records in our dataset.You can compare with the output generated by this simple Python code which is validated in our labs.
import pandas as pd
import glob
path = r'/data/nyse_all/nyse_data' # use your path
all_files = glob.glob(path + "/*.txt.gz")
li = []
for filename in all_files:
df = pd.read_csv(filename, index_col=None, header=None)
li.append(df)
frame = pd.concat(li, axis=0, ignore_index=True)
frame.shape