D

From Spark in Action, Second Edition by Jean-Georges Perrin

This article teaches you how to perform an aggregation using Apache Spark. You first look at the definition of an aggregation. You may already know and use aggregations in your job, and this might be a reminder for you. If this is the case, you can safely skim through it: Apache Spark’s aggregations are standard. The second part of this section shows you how to transform a SQL aggregation statement to Spark.


Take 37% off Spark in Action, Second Edition. Just enter fccperrin into the discount code box at checkout at manning.com.


A quick reminder on aggregations

In this section, you’ll go through a quick lesson of what aggregations are and which type of aggregations exist. Aggregations in Spark are similar to any relational database.

Aggregations are a way to group data together to look at it from a higher level, as illustrated in figure 1. Aggregation can be performed on tables, joined tables, views, etc.


Figure 1 A look at the data before you perform an aggregation. An aggregation is all about performing one or more functions on grouped columns. Columns can also be dropped in the process.


Imagine an order system. You have thousands of orders from a few hundred customers and you want to know who’s your best customer.

Without using aggregations, you could go through each customer, query all the orders for this customer, sum the amount, store this sum somewhere, and go on. This means a lot of I/O and interaction with the database. Let’s see how an aggregation can help us perform this task faster.

Figure 2 puts “real data” in the table: the table now contains first, last name, and state of the customer, the ordered quantity, the generated revenue, and a timestamp of when the ordered was placed. You can see six orders from four different customers.


Figure 2 Looking at a table containing order data before aggregation. You can see four distinct customers and six orders.


In this scenario, my goal is to study my best customers, and I’m going to focus on the customer rather than on the individual elements composing the order. I focus on the aggregation of data around the customer. A unique customer is defined by its first name, last name, and state where they got their order shipped. You’re going to group on these three criteria. You’ll aggregate on both the quantity and the revenue. On the revenue data, you’ll calculate both the total revenue and the average to see what’s the average revenue per order.

You can also safely drop the timestamp column, which isn’t needed in this use case. You don’t explicitly drop it, but you don’t integrate in your query.

Figure 3 shows this process.


Figure 3 Details of the grouping operations done on first name, last name, and state columns. The quantity and revenue columns have morphed into three new columns. The timestamp column isn’t needed anymore.


The final result only shows the aggregate results, as in figure 4.


Figure 4 Final result of the aggregation, with only one row per unique customer and its related statistical data.


If you turn that in SQL, listing 1 shows a PostgreSQL-based SELECT statement.

Listing 1 An aggregation using a SQL statement

 SELECT
     "firstName",
     "lastName",
     "state",
     SUM(quantity) AS sum_qty, #A
     SUM(revenue) AS sum_rev, #B
     AVG(revenue::numeric::float8) AS avg_rev #C
   FROM public.ch13
   GROUP BY ("firstName", "lastName", "state");

#A Sums the quantities

#B Sums the revenue

#C Converts to a float before computing the average

Now that you have been refreshed on the aggregation mechanisms, let’s see how we can do the same things we Apache Spark.

Performing basic aggregations with Spark

Now that you’re fully groomed on aggregations, let’s do the same aggregations with Apache Spark. You’ll see two ways of doing aggregations: one using the now-familiar dataframe APIs and the second using SparkSQL, in a similar way as you do with a RDBMS.

The goal of those aggregations is to compute, for an individual customer:

  • The total count of items the customer bought.
  • The revenue.
  • The average revenue per order.

A unique customer is defined, in this scenario, by their first name, last name, and state.

Let’s start by looking at the desired output. Listing 2 shows the expected dataframe. With no surprise, it’s similar to figure 4.

Listing 2 Result of an aggregation with Spark

 +------------+--------+-----+-------------+------------+------------+
 |   firstName|lastName|state|sum(quantity)|sum(revenue)|avg(revenue)|
 +------------+--------+-----+-------------+------------+------------+
 |       Ginni| Rometty|   NY|            7|          91|        91.0|
 |Jean Georges|  Perrin|   NC|            3|         420|       210.0|
 |      Holden|   Karau|   CA|           10|         190|        95.0|
 |Jean Georges|  Perrin|   CA|            4|          75|        75.0|
 +------------+--------+-----+-------------+------------+------------+

Lab This lab is called OrderStatisticsApp and it’s available in the net.jgp.books.spark.ch13.lab100_orders package.

The raw data is a simple CSV file as shown in listing 3.

Listing 3 Raw order data to perform the aggregation

 firstName,lastName,state,quantity,revenue,timestamp
 Jean Georges,Perrin,NC,1,300,1551903533
 Jean Georges,Perrin,NC,2,120,1551903567
 Jean Georges,Perrin,CA,4,75,1551903599
 Holden,Karau,CA,6,37,1551904299
 Ginni,Rometty,NY,7,91,1551916792
 Holden,Karau,CA,4,153,1552876129

Figure 5 shows the data and metadata you’re working on. It’s the same dataset as you just used.


Figure 5 Structure, sample data, and grouping of the dataset user in this Spark-based aggregation.


As is usual with Spark, you’ll initialize the session and load the data as illustrated in listing 4. This code includes all the import statements which allows you to know precisely which packages, classes, and functions you’ll use.

Listing 4 Initializing Spark and loading the dataset

 package net.jgp.books.spark.ch13.lab100_orders;
  
 import static org.apache.spark.sql.functions.avg;
 import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.sum;
  
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
  
 public class OrderStatisticsApp {
  
   public static void main(String[] args) {
     OrderStatisticsApp app = new OrderStatisticsApp();
     app.start();
   }
  
   private void start() {
     SparkSession spark = SparkSession.builder() #A
         .appName("Orders analytics")
         .master("local[*]")
         .getOrCreate();
  
     Dataset<Row> df = spark.read().format("csv") #B
         .option("header", true)
         .option("inferSchema", true)
         .load("data/orders/orders.csv");

#A Start a session

#B Ingest a CSV file with header and automatic inference of the schema

Two ways to perform this aggregation are:

  • You can use the dataframe API, as in listing 5.
  • You can use Spark SQL, as in listing 6.

Both solutions provides the same results.

Performing an aggregation using the dataframe API

Let’s have a look at the dataframe API to perform an aggregation. You’ll use the dataframe as your source and use the groupBy() method. Using method chaining, you can apply directly an aggregate method like agg(), avg(), count(), and so on.

The agg() method doesn’t perform aggregations but uses functions which do them at the column-level. Appendix K lists the aggregation methods.

Listing 5 Aggregation using the dataframe API

     Dataset<Row> apiDf = df
         .groupBy(col("firstName"), col("lastName"), col("state")) #A
         .agg( #B
             sum("quantity"), #C
             sum("revenue"), #D
             avg("revenue")); #E
     apiDf.show(20);

#A Columns to group by

#B Start the aggregation process

#C Sum the quantity column

#D Sum the revenue column

#E Calculate the average of the revenue column

Note that the groupBy() method has several signatures, where the columns can be specified by name, sequence, or using the column objects. You’ll see the different usages in the code snippets. I find this one rather elegant in this context.

Performing an aggregation using Spark SQL

The alternate way to perform a GROUP BY operation is to directly use Spark SQL, like you do with your RDBMS. Listing 6 uses the Spark SQL version of the SQL statement I wrote for PostgreSQL in listing 1.

As you manipulate data through SQL, you need a view. You can then build your SQL statement and execute it from the Spark session.

Listing 6 Aggregation using Spark SQL

     df.createOrReplaceTempView("orders"); #A
     String sqlStatement = "SELECT " + #B
         "    firstName, " +
         "    lastName, " +
         "    state, " +
         "    SUM(quantity), " +
         "    SUM(revenue), " +
         "    AVG(revenue) " +
         "  FROM orders " +
         "  GROUP BY firstName, lastName, state"; #C
     Dataset<Row> sqlDf = spark.sql(sqlStatement); #D
     sqlDf.show(20);
   }
 }

#A Creation of the view to allow the execution SQL statements

#B Creation of the SQL statements

#C Spark doesn’t allow parentheses around the GROUP BY part

#D Execution of the SQL statement

Note that, contrary to PostgreSQL and other RDBMS, Spark doesn’t want the GROUP BY columns to be between parenthesis.

That’s all for now. If you want to learn more about the book, check it out on liveBook here and see this slide deck.