Description: https://images.manning.com/360/480/resize/book/1/f481d66-17c6-4de4-9fa2-abe34458353e/Rioux-PySpark-MEAP-HI.png

From Data Analysis with Python and PySpark by Jonathan Rioux

This article covers using transformers and estimators to prepare data into ML features.


Take 40% off Data Analysis with Python and PySpark by entering fccrioux into the discount code box at checkout at manning.com.


This article refers to data and code that have been covered in chapter 12, as seen below:

  1. Read a CSV file containing dishes name and multiple columns as feature candidates.
  2. Sanitized the column names (lowered the case, fixed the punctuation, spacing, and non-printable characters)
  3. Removed illogical and irrelevant records
  4. Filled the null values of binary columns to 0.0
  5. Capped the amounts for calories, protein, fat, and sodium to the 99% percentile
  6. Created ratio features (number of calories from a macro over number of calories for the dish)
  7. Imputed the mean of continuous features.
  8. Scaled continuous features between 0.0 and 1.0.

You can use the code leading to food_features in the book’s repository under ./code/Ch12/end_of_chapter.py

In this excerpt, we continue our journey to a robust machine learning training program. To help us, we delve deeper into transformers and estimators, this time in the context of an ML pipeline.

ML pipelines are how PySpark implements machine learning capabilities. They provide better code organization and flexibility, at the expense of a little preparation up front. This article starts by explaining what an ML pipeline is, using the dessert prediction data set. We review just enough theory about transformers, estimators, and ML pipelines to get us started.

Transformers and estimators: the building blocks of ML in Spark

We’re going to cover the two main components of ML pipelines: transformers and estimators. We take a second look at transformers and estimators in the context of re-usable and parameterizeable building blocks. From a 36,000 foot view, a ML pipeline is an ordered list of transformers and estimators. Notwithstanding, it is crucial that we understand not only how to create, but also how to modify those building blocks first to use ML pipelines with optimal efficiency.

Transformers and estimators are very useful classes for ML modelling. When we train a ML model, we get back a fitted model, which is akin to a new program that we did not code explicitly. This new data-driven program then has a sole purpose: taking a properly formatted data set and transform it by appending a prediction column. Next, we see that transformers and estimators not only provide a useful abstraction for ML modelling, they also provide portability through serialization and deserialization. This means that you can train and save your ML model and deploy it in another environment.

To illustrate how a transformer and an estimator is parameterized, we will use a transformer and an estimator defined and used in chapter 12:

  • continuous_assembler, a VectorAssembler transformer that taked five columns and creates a Vector column to be used for model training.
  • consinuous_scaler, a MinMaxScaler estimator that scales values contained in a Vector column, returning values between 0 and 1 for each element in the vectors.

For convenience, I include in listing 1 the relevant code. We start with the transformer and then build on it to introduce the estimator.

Listing 1. The VectorAssembler and MinMaxScaler example we’ll explore in this section

 
 CONTINUOUS_NB = ["rating", "calories_i", "protein_i", "fat_i", "sodium_i"]
  
 continuous_assembler = VectorAssembler(
     inputCols=CONTINUOUS_NB, outputCol="continuous"
 )
  
 continuous_scaler = MinMaxScaler(
     inputCol="continuous",
     outputCol="continuous_scaled",
 )
  

Data comes in, data comes out: the Transformer

This section formally introduces the transformer as the first building block of a ML pipeline. We introduce the general transformer blueprint and how to access and modify its parameterization. This added context on the transformer plays a crucial role when we want to run experiments with our ML code or optimize our ML models.

In our VectorAssembler transformer example, we provide two arguments to the constructor: inputCols and outpulCol. Those arguments provide the necessary functionality to create a fully functional VectorAssembler transformer. This transformer has for sole purpose — through its transform() method — to take the values in inputCols (assembled values) and return a single column, named outputCol, that contains a vector of all the assembled values.


Figure 1. The continuous_assembler transformer, along with its Params. The transformer uses the transform() method to apply a predefined transformation to the data frame passed as input.


The parameterization of a transformer is called Params (capital P). When instantiating a transformers class, just like with any Python class, we pass the parameters we want as argument, making sure to explicitly specify each keyword. Once the transformer has been instantiated, PySpark provides us with a set of methods to extract and modify Params. The next sections cover retrieving and modifying Params after the transformer’s instantiation.

Peeking at the signature of VectorAssembler: keyword-only arguments

If you look at the signature for VectorAssembler (and pretty much every transformer and estimator in the pyspark.ml modules), you’ll see an asterisk at the beginning of the parameters list.

 
 class pyspark.ml.feature.VectorAssembler(*, inputCols=None, outputCol=None, handleInvalid='error')
  

In Python, every parameter after the asterisk * is called keyword-only argument, meaning that we need to mention the keyword. For instance, we couldn’t do VectorAssembler("input_column", "output_column"). For more information, refer to PEP (Python Enhancement Proposal) 3102 at https://www.python.org/dev/peps/pep-3102/.

As a fun add-on, Python also supports positional-only parameters with the slash / character. See PEP 570 (https://www.python.org/dev/peps/pep-0570/).

Peeking under the hood: getting and explaining the Params

Looking back at figure 1, the instantiation of VectorAssembler accepted three arguments: inputCols, outputCol, and handleInvalid. We also hinted that the configuration of a transformer (and estimator, by the same occasion) class instance relied on Params, which drove the behavior of the transformers. In this section, we explore Params, highlight their similarities and differences compared to regular class attributes, and why those differences matters. You might think “well, I know how to get attributes out of a Python class, and transformers are Python classes“. While that is correct, transformers (and estimators) follow a more Java/Scala-like design and I recommend not skipping over this section. It’s short, useful, and will help you avoid headaches when working with ML pipelines.

First, let’s do what any Python developer would do and access one of the attributes of the transformer directly. In listing 2, we see that accessing the outputCol attribute of continuous_assembler does not yield continuous, like we passed to the constructor. Instead, we get a reference to an object called a Param (class pyspark.ml.param.Param) which wraps each of our transformers attributes.

Listing 2. Accessing a transformer’s parameters directly yields an object (called a Param).

 
 print(continuous_assembler.outputCol)
 # VectorAssembler_e18a6589d2d5__outputCol 
  

Rather than returning the continuous value passed as an argument to outputCol, we get an object called a Param.

To access the value directly of a specific Param, we use a getter method, which is simply put the word get, followed by the name of our Param in CamelCase. In the case of outputCol, shown in listing 3, the getter method is called getOutputCol() (note the capital O).

Listing 3. Accessing the value of a the outputCol Param through getOutputCol()

 
 print(continuous_assembler.getOutputCol())  # => continuous
  

So far, Params seem like they add boilerplate with little benefit. explainParam() changes this. This method provides documentation about the Param as well as the value. This is best explained by an example and we see the output of explaining the outputCol Param in listing 4.

If you want to see all the Params at once, you can also use the pluralized version, explainParams(). This method takes no argument and will return a newline-delimited string of all the Params.

The string output contains

  1. the name of the Param: outputCol;
  2. a short description of the Param: output column name.;
  3. the default value of the Param: VectorAssembler_e18a6589d2d5__output, used if we don’t explicitly pass a value ourselves;
  4. and the current value of the Param: continuous.

Listing 4. Explaining the outputCol Param with explainParam

 
 print(continuous_assembler.explainParam("outputCol"))
 # outputCol: output column name.  )
 # (default: VectorAssembler_e18a6589d2d5__output, current: continuous)  
  

The name and a short description of the outputCol Param.

Even we defined a value for outputCol,

In this section, we got our way around getting the relevant information out of our transformer’s Params. This section applies verbatim to estimators as well. In the next section, we stop looking at Params and we start changing them. Afterwards, transformers will have no more secrets!

What about the plain getParam() method?

Transformers (and estimators) provide the plain getParam(). It simply returns the Param, just like accessing the outputCol did at the beginning of the section. I believe that this is done so that PySpark transformers can have a consistent API with their Java/Scala equivalent.

Setting params of an instantiated transformer using getters and setters

Just like the previous section on getting Params, setting Params works the same for estimators.

In this section, we modify the Params of a transformer! Simple as that! This is mainly useful in two scenarios:

  1. You are building your transformer in the REPL and you want to experiment with different Param-eterization;
  2. You are optimizing your ML pipeline Params, like we do in.

How do we change the Params of a transformer? For every getter, there is a setter, which is simply put the word set, followed by the name of our Param in CamelCase. Unlike getters, setters take the new value as their sole argument. In listing 5, we change the outputCol Param to more_continuous using the relevant setter method. This operation returns the transformed transformer but also makes the modification in-place, which means that you do not have to assign the result of a setter to a variable.

Listing 5. Setting the outputCol Param to more_continuous. The modification is done in-place.

 
 continuous_assembler.setOutputCol("more_continuous")  
  
 print(continuous_assembler.getOutputCol())  # => more_continuous
  

While the setOutputCol() method returns a new transformer object, it also makes the modification in place, so we don’t have to assign the result to a variable.

If you need to change multiple Params as once (for instance, you want to change the input and outpul columns in one fell swoop while experimenting with different scenarios) you can use the setParams() method. setParams() has the exact same signature as the constructor: you just pass the new values as keywords, like shown in listing 6.

Listing 6. Changing multiple Params at once using setParams()

 
 continuous_assembler.setParams(
     inputCols=["one", "two", "three"], handleInvalid="skip"
 )
 print(continuous_assembler.explainParams())
 # handleInvalid: How to handle invalid data (NULL and NaN values). [...]
 #     (default: error, current: skip)
 # inputCols: input column names. (current: ['one', 'two', 'three'])  
 # outputCol: output column name.
 #     (default: VectorAssembler_e18a6589d2d5__output, current: continuous)
  

Params not passed to setParams keep their previous value (set in listing 5).

Finally, if you want to return a Param to its default value, you can use the clear() method. This time, you need to pass the Param object: for instance, in listing 7, we reset the handleInvalid Param by using clear. We pass the actual Param as an argument, accessed via the attribute slot seen at the beginning of the section, continuous_assembler.handleInvalid. This will prove useful if you have a transformer that has both inputCol/outputCol and inputCols/outputCols as possible Params. PySpark only allows one set to be active at once, so if you want to move between one column and multiple columns, you need to clear() the ones not being used.

Listing 7. Clearing the current value of the handleInvalid Param with clear()

 
 continuous_assembler.clear(continuous_assembler.handleInvalid)
  
 print(continuous_assembler.getHandleInvalid())  # => error 
  

handleInvalid returned to its original value, error.

This is it, folks! In this section, we learned in greater detail the how and why of a transformer, as well as how to get, set, and clear its Params. In the next section, we apply this useful knowledge to speed through the second building block of a ML pipeline, the estimator.

Transformers and estimators are passed by reference: the copy() method

So far in our PySpark journey we have been using a fluent API, where each data frame transformation generates a new data frame. This enables method chaining which makes our data transformation code very readable.

When working with transformers (and estimators), remember that they are passed by reference and that setters modify the object in-place. If you assign your transformer to a new variable name and then use a setter on either of those variables, it’ll modify the Param for both references.

 
 new_continuous_assembler = continuous_assembler
  
 new_continuous_assembler.setOutputCol("new_output")
  
 print(new_continuous_assembler.getOutputCol())  # => new_output
 print(continuous_assembler.getOutputCol())  # => new_output 
  

Both the outputCol of continuous_assembler and new_continuous_assembler were modified by the setter.

The solution to this is to copy() the transformer and then assign the copy to the new variable.

 
 copy_continuous_assembler = continuous_assembler.copy()
  
 copy_continuous_assembler.setOutputCol("copy_output")
  
 print(copy_continuous_assembler.getOutputCol())  # => copy_output
 print(continuous_assembler.getOutputCol())  # => new_output 
  

When making a copy, modifications to the Params of copy_continuous_assembler don’t impact continuous_assembler.

Data comes in, transformer comes out: the Estimator

This section covers the estimator, the second half of the ML pipeline. Just like with transformers, understanding how to operate and configure an estimator is an invaluable step in creating an efficient ML pipeline. Where a transformer transforms an input data frame into an output data frame, an estimator is fitted on an input data frame and returns an output transformer. In this section, we see that this relationship between transformers and estimators means that they are Param-eterized the same way as explained in. We focus on estimator usage through the fit() method (vs. transform() for the transformer), really the only notable difference for the end-user.

Where a transformer uses a transform() method, applied to a data frame, to return a transformed data frame, an estimator uses a fit() method, applied to a data frame, to return a fully parameterized transformer called a Model. This distinction enables estimators to configure transformers based on the input data.

As an example, the MinMaxScaler estimator in figure 2 takes four parameters, two of which we rely on the default value.

  1. min and max, which are the minimum and maximum values our scaled column will take. We keep both at their default of 0.0 and 1.0, respectively.
  2. inputCols and outputCols are the input and output column, respectively. They follow the same conventions as with the transformer.

In order to scale the values between min and max, we need to extract from the input column the minimum value (which I call E_min) as well as the maximum value (E_max). E_min is transformed to 0.0, E_max is transformed to 1.0, and any value in between takes a value between min and max using the following formula (see the exercises at the end of the section for a corner (or edge) case when E_max and E_min are the same).



Because the transformation relies on actual values from the data, we can’t use a plain transformer, which expects to “know” everything (through its Param-eterization) before it can apply the transform() method. In the case of the MinMaxScaler, we can translate E_min and E_max as simple operations (max comes from pyspark.sql.functions).

  1. E_min = min(inputCol)
  2. E_max = max(inputCol)

Once those values are computed (during the fit() method), PySpark creates, Param-meterizes, and returns a transformer/model.


Figure 2. The MinMaxScaler estimator, along with its Params. The transformer uses the fit() method to create and parameterize a Model (a sub-type of transformer) using the data frame passed as argument.


This fit()/transform() approach applies for estimators far more complex than MinMaxScaler. Case in point: ML models are actually implemented as estimators in Spark.

That’s all for this article. If you want to learn more, check out the book on Manning’s liveBook platform here.