From Data Analysis with Python and PySpark by Jonathan Rioux
This article covers
· Using pandas Series UDF to accelerate column transformation compared to Python UDF.
· Addressing the cold start of some UDF using Iterator of Series UDF.
Take 37% off Data Analysis with Python and PySpark by entering fccrioux into the discount code box at checkout at manning.com.
This article approaches the distributed nature of PySpark a little differently. If we take a few seconds to think about it, we read data into a data frame and Spark distributes the data across partitions on nodes. What if we could directly operate on those partitions like if they were single-node data frames? More interestingly, what if we control how those single-node partitions were created and used? Using a tool we know? What about pandas?
PySpark’s interoperability with Pandas (also colloquially called Pandas UDF) is a huge selling point when performing data analysis at scale. Pandas is the dominant in-memory Python data manipulation library where PySpark is the dominant distributed one. Combining both of them together unlocks additional possibility: we start by scaling some basic Pandas data manipulation functionality. We then look into operations on
GroupedData and how PySpark+Pandas implement the split apply combine pattern common to data analysis. We finish with the ultimate interaction between pandas and PySpark: treating a PySpark data frame like a small collection of Pandas data frame.
This article obviously makes great use of the Pandas (http://pandas.pydata.org) library. Extensive Pandas knowledge is a nice-to-have but is in no way expected. This chapter will cover the necessary Pandas skills to use in within a basic Pandas UDF. If your wish to level up you Pandas skills to become a Pandas UDF ninja, I warmly recommend the Pandas in Action book, by Boris Parkhaver (Manning, 2021).
The tales of two versions
PySpark 3.0 completely changed how we interact with the Pandas UDF API and added a lot of functionality and performance improvements. Because of this, I’ve structured this article with PySpark 3.0 in mind. For convenience with those using PySpark 2.3 or 2.4, I’ve added some side-bars when applicable with the appropriate syntax.
Pandas UDF were introduced in PySpark 2.3. If you are using Spark 2.2 or below, you’re out of luck!
For the examples in the chapter, we will need three previously unused libraries: pandas, scikit-learn, and PyArrow. If you have installed Anaconda (see appendix B), you can use
conda to install the libraries; otherwise, you can use
If you are using Spark 2.3 or 2.4, you additionally need to set a flag in the
conf/spark-env.sh file of your Spark root directory to account for a change in Arrow serialization format. In the
conf/ directory, you should find a
spark-env.sh.template file. Make a copy, name it
spark-env.sh and add this line in the file.
This will tell PyArrow to use a serialization format compatible with Spark 2.X, instead of the newer one only compatible with Spark 3.0. The Spark JIRA ticket contains more information about this (https://issues.apache.org/jira/browse/SPARK-29367). You can also use PyArrow version 0.14 and avoid the problem altogether.
# Conda installation conda install pandas sklearn pyarrow # Pip installation pip install pandas sklearn pyarrow
Column transformations with Pandas: using Series UDF
In this section, we cover the simplest family of Pandas UDF: the Series UDF. This family shares a column-first focus with regular PySpark data transformation functions. All of our UDF in this section will take a
Column object (or objects) as input and return a
Column object as output. In practice, they serve as the most common types of UDF and fill the use-case where you want to bring a functionality already implemented in Pandas — or a library that plays well with Pandas — and promote it to the distributed world of PySpark.
PySpark provides three types of Series UDF. Here is a summary of them; we will explore them further in the rest of the section.
- The Series to Series is the simplest: it takes
Columnsobjects as inputs, converts them to Pandas
Seriesobjects (giving it its name) and return a
Seriesobject that gets promoted back to a PySpark
- The Iterator of Series to Iterator of Series differs in the sense that the
Columnobjects gets batched into batches and then fed as an Iterator object. It takes a single
Columnobject as input and returns a single
Column. It provides performance improvements, especially when the UDF needs to initialize an expensive state before working on the data (for instance, local ML models created in scikit-learn).
- The Iterator of multiple Series to Iterator of Series if a combination of #1 and #2: it can take multiple
Columnsas input, like #1, yet preserves the iterator pattern from #2.
Before we start exploring Series UDF, let’s grab a data set to experiment with.
Series to Series UDF: column functions, but with Pandas
In this section, we cover the most common types of Scalar UDF: the Series to Series UDF. Series to Series UDF are akin to most of the functions in the
pyspark.sql model. For the most part, they work just like Python UDF with one key difference. Python UDF work on one record at a time and you express your logic through regular Python code. Scalar UDF work on one Series at a time and you express your logic through Pandas code. The difference is subtle and it’s easier to explain visually.
In a Python UDF, when you pass column objects to you UDF, PySpark will unpack each value, perform the computation, and then return the value for each record in a
Column object. In a Scalar UDF, depicted in figure 1, PySpark will serialize (through a library called PyArrow, that we installed at the beginning of the chapter) each partitioned column into a pandas
Series object (https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html). You then perform the operations on the Series object directly, returning a Series of the same dimension from your UDF. From an end-user perspective, they are the same functionally. Because Pandas is optimized for rapid data manipulation, it is preferable to use Series to Series UDF when you can instead of a regular Python UDF as it’ll be much faster.
Figure 1. Comparing a Python UDF to a Pandas scalar UDF. The former splits a column in individual records, where the latter breaks them in Series.
Now armed with the “how it works” of Series to Series UDF, let’s create one ourselves. I chose to create a simple function that will transform Fahrenheit degrees to Celsius. In Canada, we use both scales depending on the usage: F for cooking, C for body or outside temperature. As a true Canadian, I cook my dinner at 350F yet know that 10C is sweater weather. The function is depicted in listing 1. The building blocks are eerily similar, but we can pick on two main differences.
- Instead of
udf(), I use
pandas_udf(), again, from the
pyspark.sql.functionsmodule. Optionally (but recommended), we pass the return type of the UDF as an argument to the
- Our function signature is also different: rather than using scalar values (such as
str), the UDF takes
pd.Seriesand return a
The code within the function could be used as-is for a regular python UDF. I am (ab)using the fact that you can do arithmetic operations with pandas Series.
Listing 1. Creating a pandas scalar UDF that transforms Fahrenheit into Celsius. I use the
pandas_udf decorator with a UDF type of
import pandas as pd import pyspark.sql.types as T @F.pandas_udf(T.DoubleType()) (1) def f_to_c(degrees: pd.Series) -> pd.Series: (2) """Transforms Farhenheit to Celcius.""" return (degrees - 32) * 5 / 9
❶ For scalar UDF, the biggest change happens in the decorator used. I could use the
pandas_udf function directly too.
❷ The signature for a Series to Series UDF is a function that takes one of multiple
For Spark 2.X users, you need to add another parameter to the decorator here, as only Spark 3.0 and above recognizes function signature for Pandas UDF. The code in listing 9.4 would read
In listing 2, we apply our newly created Series to Series UDF to the
temp column of the
gsod data frame that contains the temperature (in Fahrenheit) of each station-day combination. Just like with regular Python UDF, Series to Series (and all Scalar UDF) are used like any data manipulation function: here, I create a new column
withColumn() and apply the
f_to_c temperature on the
Listing 2. Using a Series to Series UDF like any other column manipulation function.
gsod = gsod.withColumn("temp_c", f_to_c(F.col("temp"))) gsod.select("temp", "temp_c").distinct().show(5) # +-----+-------------------+ # | temp| temp_c| # +-----+-------------------+ # | 37.2| 2.8888888888888906| # | 85.9| 29.944444444444443| # | 53.5| 11.944444444444445| # | 71.6| 21.999999999999996| # |-27.6|-33.111111111111114| # +-----+-------------------+ # only showing top 5 rows
Series to Series UDF, just like Python regular UDF, are very convenient when the record-wise transformation (or “mapping”) you want to apply to your data frame is not available within the stock PySpark functions (
pyspark.sql.functions). Creating a “Fahrenheit to Celsius” converter as part of core Spark would be a little intense, so using a Python or a Pandas Series to Series UDF is a way to extend the core functionality with a minimum of fuss. Next, we see how to gain more control over the split and use the split-apply-combine pattern in PySpark.
Working with complex types in Pandas UDF
PySpark has a richer data type system than Pandas, who clubs strings and complex types into a catchall
object type. Since you are dropping from PySpark into Pandas during the execution of the UDF, you are solely responsible for aligning the types accordingly. This is where the return type attribute of the
pandas_udf decorator comes in handy, as it’ll help diagnosing bugs early.
What if you want to accept or return complex types, such as the array of the struct? Pandas will accept as values within a series a list of items that will be promoted to an
ArrayType column. For
StructType columns, you will need to replace the relevant
pd.Series by a
pd.DataFrame. In chapter 6, we saw that struct columns are like mini data frames: the equivalence continues here!
Scalar UDF + cold start = Iterator of Series UDF
This is only available with PySpark 3.0+.
This section combines the two other types of Scalar UDF: the Iterator of Series to Iterator of Series UDF and the Iterator of multiple Series to Iterator of Series. Because they are so similar to the Series to Series UDF in their application, I will focus on the Iterator portion that gives them their power. Iterator of Series UDF are very useful when you have a cold start operation you need to perform, such as deserializing a local ML model (fitted with scikit-learn or another Python modeling library). In our case here, I’ll reuse our
f_to_c function but will add a cold start to demonstrate the usage.
Our UDF in listing 3 is really similar to the Series to Series UDF from part 1. A few differences pops:
- The signature goes from
(pd.Series) → pd.Seriesto
(Iterator[pd.Series]) → Iterator[pd.Series]. This is consequential to using an Iterator of Series UDF.
- When working with the Series to Series UDF, we assumed that PySpark would give us one batch at a time. Here, since we are working with an Iterator of Series, we are explicitly iterating over each batch one by one. PySpark will take care of distributing the work for us.
- Rather than using a
yieldso that our function returns an iterator.
Listing 3. Using an Iterator of Series to Iterator of Series UDF to convert the temperature.
sleep(5) will only be called once.
from time import sleep from typing import Iterator @F.pandas_udf(T.DoubleType()) def f_to_c2(degrees: Iterator[pd.Series]) -> Iterator[pd.Series]: ❶ """Transforms Farhenheit to Celcius.""" sleep(5) ❷ for batch in degrees: ❸ yield (batch - 32) * 5 / 9 ❸ gsod.select( "temp", f_to_c2(F.col("temp")).alias("temp_c") ).distinct().show(5) # +-----+-------------------+ # | temp| temp_c| # +-----+-------------------+ # | 37.2| 2.8888888888888906| # | 85.9| 29.944444444444443| # | 53.5| 11.944444444444445| # | 71.6| 21.999999999999996| # |-27.6|-33.111111111111114| # +-----+-------------------+ # only showing top 5 rows
❶ The signature is now
(Iterator[pd.Series]) → Iterator[pd.Series]. Notice the add-on of the
Iterator keyword (from the typing module).
❷ We simulate a cold-start using
sleep() for 5 seconds. The cold start will happen on each worker once, rather than for every batch.
❸ Since we are working with an iterator here, we iterate over each batch, using
yield (instead of
We have covered here the Iterator of Series to Iterator of Series case. What about the Iterator of multiple Series to Iterator of Series? This special case is to wrap multiple columns in a single iterator. For this example, I’ll assemble the
da columns (representing the year, month, and day) into a single column. This example requires more data transformation than when using an Iterator of a single Series; I illustrate the process of data transformation in figure 2.
Figure 2. The transformation of three Series of values into a single date column. We iterate over each batch using a for loop, use multiple assignment to get the individual columns from the Tuple, pack them in a dictionary that feeds into a data frame where we can apply our
year_mo_dais an Iterator of a Tuple of series, representing all the batches of values contained into the
- To access each batch, we use a for loop over the Iterator, the same principle as for the Iterator of Series UDF.
- To extract each individual series from the Tuple, we use multiple assignment. In this case,
yearwill map to the first Series of the Tuple,
moto the second, and
dato the third.
pd.to_datetimerequests a data frame containing the
daycolumns, we create the data frame via a dictionary, giving the keys the relevant column names.
pd.to_datetimereturns a Series.
- Finally, we
yieldthe answer to build the Iterator of Series, fulfilling our contract.
Listing 4. Assembling the date from three columns using an Iterator of multiple Series UDF.
from typing import Tuple @F.pandas_udf(T.DateType()) def create_date( year_mo_da: Iterator[Tuple[pd.Series, pd.Series, pd.Series]] ) -> Iterator[pd.Series]: """Merges three cols (representing Y-M-D of a date) into a Date col.""" for year, mo, da in year_mo_da: yield pd.to_datetime( pd.DataFrame(dict(year=year, month=mo, day=da)) ) gsod.select( "year", "mo", "da", create_date(F.col("year"), F.col("mo"), F.col("da")).alias("date"), ).distinct().show(5)
This concludes our overview of how to use the Scalar UDF. Scalar UDF are very useful when you make column level transformations, just like the functions in
pyspark.sql.functions. When using any Scalar UDF, you need to remember that PySpark will not guarantee the order or the composition of the batches when applying the UDF. If you follow the same “Columns in, Columns out” we use when working with PySpark Column functions, you’ll do great.
By default, Spark will aim for 10,000 records per batch; you can customize the maximum size of each batch using the
spark.sql.execution.arrow.maxRecordsPerBatch config when creating the
Should you need to worry about batch composition based on one or more Columns, the next section addresses how to apply UDF on a
GroupedData object to have a finer level of control over the records. We will not only create aggregate functions (an example of this is
sum()) but also apply functions while controlling the batches composition.
That’s all for now. If you want to see more, you can check out the book on Manning’s liveBook platform here.