Sorting Data

Let us understand how to sort the data using Spark SQL.

Let us start spark context for this Notebook so that we can execute the code provided. You can sign up for our 10 node state of the art cluster/labs to learn Spark SQL using our unique integrated LMS.

val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession

val username = System.getProperty("user.name")
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", s"/user/${username}/warehouse").
    enableHiveSupport.
    appName(s"${username} | Spark SQL - Basic Transformations").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse
  • We can perform global aggregations as well as aggregations by key.

  • Global Aggregations

    • Get total number of orders.

    • Get revenue for a given order id.

    • Get number of records with order_status either COMPLETED or CLOSED.

  • Aggregations by key - using GROUP BY

    • Get number of orders by date or status.

    • Get revenue for each order_id.

    • Get daily product revenue (using order date and product id as keys).

  • We can also use HAVING clause to apply filtering on top of aggregated data.

    • Get daily product revenue where revenue is greater than $500 (using order date and product id as keys).

  • Rules while using GROUP BY.

    • We can have the columns which are specified as part of GROUP BY in SELECT clause.

    • On top of those, we can have derived columns using aggregate functions.

    • We cannot have any other columns that are not used as part of GROUP BY on derived column using non aggregate functions.

    • We will not be able to use aggregate functions or aliases used in the select clause as part of the where clause.

    • If we want to filter based on aggregated results, then we can leverage HAVING on top of GROUP BY (specifying WHERE is not an option)

  • Typical query execution - FROM -> WHERE -> GROUP BY -> SELECT

  • We typically perform sorting as final step.

  • Sorting can be done either by using one field or multiple fields.

  • We can sort the data either in ascending order or descending order by using column or expression.

  • By default, the sorting order is ascendig and we can change it to descending by using DESC.

%%sql

SELECT * FROM orders
ORDER BY order_customer_id
LIMIT 10
%%sql

SELECT * FROM orders
ORDER BY order_customer_id,
    order_date
LIMIT 10
%%sql

SELECT * FROM orders
ORDER BY order_customer_id,
    order_date DESC
LIMIT 10
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    revenue DESC
LIMIT 10
  • Using Spark SQL with Python or Scala

spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id
""").show()
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id,
    order_date
""").show()
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id,
    order_date DESC
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    revenue DESC
""").show()