Description: https://images.manning.com/360/480/resize/book/1/f481d66-17c6-4de4-9fa2-abe34458353e/Rioux-PySpark-MEAP-HI.png

From Data Analysis with Python and PySpark by Jonathan Rioux

This article covers window functions and the kind of data transformation they enable.


Take 40% off Data Analysis with Python and PySpark by entering fccrioux into the discount code box at checkout at manning.com.


When performing data analysis or feature engineering (which is my favorite part of machine learning!), nothing gets me happy quite like window functions. When you take a first glance at them, they look like a watered-down version of the split-apply-combine. Then you open the blinds and bam! Powerful manipulations in a short, expressive body of code.

Those who don’t know window functions are bound to reimplement their functionality, poorly. This has been my experience coaching data analysts, scientists, and engineers. If you find yourself struggling to

  1. Rank records
  2. Identify the top/bottom record according to a set of conditions
  3. Get a value from a previous observation in a table
  4. Build trended features (meaning features that summarize past observations, such as the average of the observations for the previous week).

You will find that window functions will multiply your productivity and simplify your code.

Window functions fill a niche between group aggregate (groupBy().agg()) and group map UDF (groupBy().apply()) transformations. Both grouped-aggregate methods and group map UDF rely on partitioning for splitting the data frame based on a predicate. A group aggregate transformation will yield one record per grouping, while a group map UDF allows for any shape of resulting data frame; a window function always keeps the dimensions of the data frame intact. Window functions have a secret weapon in the window frame that we define within a partition: it determines which records are included in the application of the function.

Window functions are mostly for creating new columns, so they leverage some familiar methods, such as select() and withColumn(). Because we already are familiar with the syntax for adding columns, I approach this chapter differently. First, we look at how we can emulate a simple window function by relying on some concepts we already know, such as groupbys and joins. Following this, we get familiar with the two components of a window function: the window spec and the function. I then apply and dissect the three main types of window functions (summarizing, ranking, and analytical). Equipped with the building blocks of window function application, we break open the window spec by introducing ordered and bounded window, where I introduce the concept of a window frame. Finally, we go full circle and introduce UDF as window functions.

Window functions are themselves not complicated, but there is a lot of new terminology and their behavior may not be intuitive at first.

Growing and using a simple window function

When learning a new concept, I find that I have a much easier time when I am able to build it from first principles, using what I know so far. This is exactly what happened when I learned about window functions: I started by reproducing their behavior using a mess of SQL instructions. After I was done, it was obvious to me why window functions are so useful. Imagine my joy when I found them woven into PySpark, with a beautiful Python API to boot. Color me happy!

In this section, we follow exactly the same path: I start by reproducing a simple window function using techniques from past chapters. Once we understand the plumbing well, I introduce the syntax for window functions and how they simplify your data transformation logic. My hope is that you’ll get the same excitement that I got when window functions finally “clicked” for me.

For this section, we use a data set that contains weather observation for a series of stations, summarized by day. Window functions especially shine when working with time-series-like data (like daily observations of temperature), because you can slice the data by day, month, or year, and get useful statistics.

Listing 1. Reading the data necessary for the chapter’s example: GSOD NOAA weather data.

 
 gsod = spark.read.parquet("./data/window/gsod.parquet")
  

Now equipped with the data, let’s start asking questions! The next sections illustrate the thought process behind a window function, before diving right into the terminology and the syntax.

Identifying the coldest day of each year, the long way

In this section, we emulate a simple window function through functionality we learned in previous chapters - the join() method. The idea here is to provide an intuition for window functions and remove some of the magic surrounding them. To illustrate this, we start with a simple questions to ask our data frame: when and where were the lowest temperature recorded each year. In other words, we want a data frame containing three records, one for each year, showing the station, the date (year, month, day), and the temperature of the coldest day recorded for that year.

Let’s map the thought process. First, we shall get a data frame containing the coldest temperature for each year. This will give us two of the columns (year, and temp) and their values. In listing 2, we create the coldest_temp data frame. This data frame takes our historical data, groups by the year column, and we extract the minimum temp through the min() aggregate function applied through the agg().

Listing 2. Computing the lowest temperature for each year using groupBy().

 
 coldest_temp = gsod.groupby("year").agg(F.min("temp").alias("temp"))
 coldest_temp.orderBy("temp").show()
  
 # +----+------+
 # |year|  temp|
 # +----+------+
 # |2017|-114.7|
 # |2018|-113.5|
 # |2019|-114.7|
 # +----+------+
  

People, Earth is cold!

This provides the year and the temperature, which are like 40% of the original ask. To get the other three columns (mo, da, stn), we can use a left-semi join on the original table, using the results of coldest_temp to resolve the join. In listing 3, we join gsod to coldest_temp using a left-semi equi-join on the year and temp columns. Because coldest_temp only contains the coldest temperature for each year, the left-semi join keeps only the records from gsod corresponding to that year-temperature pair; this is equivalent to keep only the records for each year where the temperature was coldest.

Listing 3. Using a left-semi self-join for computing the coldest station/day for each year.

 
coldest_when = gsod.join(
     coldest_temp, how="left_semi", on=["year", "temp"]
 ).select("stn", "year", "mo", "da", "temp")
  
 coldest_when.orderBy("year", "mo", "da").show()
  
 # +------+----+---+---+------+
 # |   stn|year| mo| da|  temp|
 # +------+----+---+---+------+
 # |896250|2017| 06| 20|-114.7|
 # |896060|2018| 08| 27|-113.5|
 # |895770|2019| 06| 15|-114.7|
 # +------+----+---+---+------+
  

If you pay attention to the code in listing 2 and listing 3, we are performing a join between the gsod table and, well, something coming from the gsod table. A self-join, which is when you join a table with itself, is often considered an anti-pattern for data manipulation: while it’s not technically wrong, it can be slow and make the code look more complex than what it needs to be. It also looks a little odd: joining tables make sense when you want to link data contained into two or more tables. Joining a table with itself feels redundant, as we can see in figure 1: the data is already in the (one) table!


Figure 1. A self-join happens when a table ends up being joined with itself. You can replace most self-joins by window functions.


Fortunately, a window function gives you the same result, faster, with less code clutter. In the next section, we’ll reproduce the same data transformation using a window function, simplifying and speeding up our data transformation code.

Creating and using simple window function to get the coldest days

This section introduces window functions by replacing the self-join example of the previous section. I introduce the Window object and to parameterize it to split a data frame over column values. We then apply the window over a data frame using the traditional selector approach.

At the beginning of the article, I drew a parallel between window functions and the split-apply-combine pattern I covered when introducing Pandas group map UDF. To stay consistent with the Window function terminology, which comes from SQL, I use a different vocabulary for the three stages of split-apply-combine.

  1. Instead of splitting, we’ll partition the data frame.
  2. Instead of applying, we’ll select values over the window
  3. The combine/union operation is implicit (that is, not coded explicitly) in a window function.

Window functions apply over a window of data, split according to the values on a column. Each split, called a partition, gets the window function applied to each of its records, like if they were independent data frames. The result then gets unioned back into a single data frame. In listing 4, I create the window, partitioning according to the values in column year. The Window class is a builder class, just like with SparkSession.builder: we chain the parameterization by appending methods after the Window class identifier. The result is a WindowSpec object containing the information about the parameterization.

Listing 4. Creating a WindowSpec object by using the Window builder class.

 
 from pyspark.sql.window import Window  
  
 each_year = Window.partitionBy("year")  
  
 print(each_year)
 # <pyspark.sql.window.WindowSpec object at 0x7f978fc8e6a0>
  

We import Window from pyspark.sql.window. Since it’s the only object we’ll use for window functions, no need import the whole module.

To partition according to the values of one or more columns, we pass the column name (or a Column object) to the partitionBy() method.

In itself, a WindowSpec object is nothing more than a blueprint for an eventual window function. In our case, we created in listing 4 a window specification called each_year that contains the instruction “partition the data frame this WindowSpec will be applied to according to the values contained in the column `year`”. The real magic happens when you apply it on your data frame. For our first window function application, I print the whole code reproducing the self-join approach seen earlier, before going through it line by line. Check the difference between the window application and the left-semi join, reproduced here in listing 5.

Listing 5. Using a left-semi self-join for computing the coldest station/day for each year (bis).

 
 coldest_when = gsod.join(
     coldest_temp, how="left_semi", on=["year", "temp"]
 ).select("stn", "year", "mo", "da", "temp")
  
 coldest_when.orderBy("year", "mo", "da").show()
  
 # +------+----+---+---+------+
 # |   stn|year| mo| da|  temp|
 # +------+----+---+---+------+
 # |896250|2017| 06| 20|-114.7|
 # |896060|2018| 08| 27|-113.5|
 # |895770|2019| 06| 15|-114.7|
 # +------+----+---+---+------+
  

Listing 6. Selecting the minimum temperature for each year — including when and where — using a window function.

 
 gsod.withColumn("min_temp", F.min("temp").over(each_year)).where(  
     "temp = min_temp"
 ).select("year", "mo", "da", "stn", "temp").orderBy(
     "year", "mo", "da"
 ).show()
  
 # +----+---+---+------+------+
 # |year| mo| da|   stn|  temp|
 # +----+---+---+------+------+
 # |2017| 06| 20|896250|-114.7|
 # |2018| 08| 27|896060|-113.5|
 # |2019| 06| 15|895770|-114.7|
 # +----+---+---+------+------+
  

We select the minimum temperature over the defined window (here: for each year).

Time for some code unpacking. At first, we define through the withColumn() method a column min_temp which collects the minimum of the temp column. Now, rather than picking the minimum temperature of the whole data frame, the min() is applied over the window specification we defined, using the over() method. For each window partition, Spark computes the minimum and then broadcasts the value over each record. This is an important distinction compared to aggregating functions or UDF: in the case of a window function, the number of records in the data frame does not change. Although min() is an aggregate function, since it’s applied with the over() method, every record in the window has the minimum value appended. The same would apply for any other aggregate function from pyspark.sql.functions, such as sum(), avg(), min(), max(), and count().

Window “functions” are just methods on columns (almost)

Since a window function is applied though a method on a Column object, you can also apply them in a select(). You can also apply more than one window (or different ones) within the same select(). Spark won’t allow you to use a window directly in a groupby() or where() method, where it’ll spit an AnalysisException. If you want to group by or filter according to the result of a window function, “materialize” the column using select() or withColumn() before using the desired operation.

As an example, listing 6 could be re-written like in listing 7, putting the window definition into the select. Because the window applies on a column by column basis, you can have multiple window applications within a select statement.

Listing 7. Using a window function within a select() method. It works just the same as with any column operation.

 
 gsod.select(
     "year",
     "mo",
     "da",
     "stn",
     "temp",
     F.min("temp").over(each_year).alias("min_temp"),
 ).where(
     "temp = min_temp"
 ).drop(  
     "min_temp"
 ).orderBy(
     "year", "mo", "da"
 ).show()
  

We drop min_temp as it served its purpose during the where clause and is not needed (it’ll always be equal to temp in the resulting data frame).

Check the end-of-chapter exercises to experiment with multiple windows application.

Under the hood, PySpark realizes the window spec when applied to a column. I defined a rather simple window spec here: partition the data frame according to the values of the year column. Just like with the split-apply-combine pattern, we partition the data frame according to the year column values.

You can partitionBy() by more than one column! Just add more column names to the partitionBy() method. 

For each window partition (see the But data frames already have partitions! side-bar at the end of the section), we compute the aggregate function (here, min()), before broadcasting the result on each record. In plain English, we compute the minimum temperature for each year and append it as a column for each record of this year. I creatively name the new column min_temp.


Figure 2. We partition the gsod data frame according to the year column and compute the minimum temperature for each partition. Each record belonging to the partition gets the minimum temperature appended. The resulting data frame contains the same number of records, but with a new column min_temp that contains the coldest temperature for the year.


Next, we need to only keep the records where the temperature is actually the minimum for the year. For this, we simply need to filter() (or where()) to keep only the records where temp = min_temp. Because the window function application gives each record a min_temp field corresponding to the minimum temperature for that year, we are back to our regular arsenal of data manipulation tricks.

That’s all, folks! Our very first window function. This was a purposefully simple example to nail the concept of window spec, window function, and window partition. In the next section, I compare the application and speed of both approaches and why window functions are easier, friendlier, and faster.

But data frames already have partitions!

Once again, we’re having a vocabulary problem. Most think of a partition as a physical splits of the data on each executor node. Now we are also using partition with window functions to mean the logical splits of the data, which may or may not be equal to the Spark physical ones.

Unfortunately, most of the literature online will not tell you which partition they refer to. Fortunately, once you’ve internalized both Spark + Window function concepts, it’ll be easy to know which one is which. For this article, I’ll use window partitions when talking about the logical partitions made by the application of a window spec.

Comparing both approaches

In this section, I compare both the self-join and window function approaches from a code readability perspective. We also touch upon performance implication of windows versus joins. When performing data transformation and analysis, code clarity and performance are the two most important consideration for a working body of code; since we have two approaches that perform the same work, it make sense to compare them from a clarity and performance perspective.

Compared to the self-join approach, using a window function makes your intention clears. With a window named each_year, the code snippet F.min("temp").over(each_year) almost reads like a plain English sentence. The self-join approach accomplishes the same thing, but at the expense of a slightly more cryptic code: why am I joining this table to… itself?

Performance-wise, window functions avoids potentially costly self-joins. When working with large data sets, window functions only require splitting the data frame into window partitions, before performing a function over the (smaller) partitions. When you consider that Spark’s operating model is splitting large data sets across multiple nodes, it makes tremendous sense.

Finding which approach works fastest will depend on the size of the data, the memory available and how complex the join/window operation is. I tend to overwhelmingly prefer window functions as they are clearer and express my intent more faithfully. As I repeat to myself when I code, make it work, make it clear, then make it fast!

Finally, and this will be the content of the next sections, window functions are much more flexible than merely computing aggregated measurements over a given window. Next, I introduce ranking and analytical functions, which provide a new window (get it?) over your data. The “summarize-and-join” approach will quickly fall short!

That’s all for this article. If you want to learn more, check out the book on Manning’s liveBook platform here.