From Deep Learning Patterns and Practices by Andrew Ferlitsch
This article covers:
● Feeding models training data in a production environment.
● Scheduling for continuous retraining.
● Using version control and evaluating models before and after deployment.
● Deploying models for large scale on-demand and batch requests, in both monolithic and distributed deployments.
To remind you with a visual, here, in figure 1, is the whole pipeline. I’ve circled the part of the system we’ll address in this article.
Fig. 1 Production e2e pipeline with emphasis on the training and deployment.
You may ask, what exactly is a pipeline and why do we use one, whether for ML production or any programmatic production operation which is managed by orchestration? You typically use pipelines when the job, such as training or other operation handled by orchestration, has multiple steps that occur in sequential order: do step A, do step B, and like this. Putting the steps into a ML production pipeline has multiple benefits. First, the pipeline is reusable for subsequent training and deployment jobs. Second, the pipeline can be containerized and as such ran as an asynchronous batch job. Third, pipelines can be distributed across multiple compute instances, where different tasks within the pipeline are executed on different compute instances, or portions of the same task are executed in parallel on different compute instances. Finally, all the tasks associated with the execution of the pipeline can be tracked and status/outcome preserved as history.
This article starts with the procedures for feeding models for training in a production environment, including both sequential and distributed systems, and example implementations using tf.data and Tensorflow Extended (TFX). We then learn how to schedule training and provision compute resources. We start by covering concepts of reusable pipelines, how metadata is used for integrating pipelines into a production environment, along with history and versioning for tracking and auditing.
Next, we’ll see how models are evaluated for release into a production environment. These days, we don’t only compare the metrics from the test, i.e. holdout, data against the test metrics of the previous version of the model. Instead, we identify different subpopulations, i.e. serving skew, and distributions, i.e., data drift which is seen in the production environment and construct additional evaluation data, and are commonly referred to as evaluation slices. Then the model is evaluated in a simulated production environment, commonly referred to as a sandbox, to see how well it performs for response times and scaling. I include example TFX implementations for model and evaluating a candidate model in a sandbox environment.
Then we’ll move to the process of deploying models into production and serving predictions for both on-demand and batch predictions. You can find methods for scaling and load balancing for current traffic demand. You’ll also see how serving platforms are provisioned. Lastly, we discuss how models are further evaluated against a previous version after they are deployed to production using A/B testing methods, and subsequent retraining from insights gained during production using continuous evaluation methods.
Figure 2 is a conceptual overview of the model feeding process within a training pipeline. On the front-end is the data pipeline which performs the tasks for extracting and preparing the training data (step 1 in the figure). Because today we work with large amounts of data in a production environment, we assume that the data is being drawn from disk on an on-demand basis. As such, the model feeder acts as a generator, where it:
- Makes requests to the data pipeline for examples (step 2).
- Receives those examples from the data pipeline (step 3).
- Assembles the received examples into a batch format for training (step 4).
The model feeder hands off each batch to the training method, which sequentially forward feeds each batch (step 5) to the model, calculates the loss at the end of the forward feed (step 6), and updates the weights by backward propagation (step 7).
Fig. 2 Interaction of model feeding process between the data pipeline and the train method.
Positioned between the data pipeline and the train function, the model feeder can potentially be an I/O bottleneck in the training process, and it’s important to consider the implementation that allows the feeder to generate batches as fast as the training method can consume them. For example, if the model feeder is running as a single CPU thread, and the data pipeline is a multi-CPU or GPU and the training process is a multi-GPU, it likely results in the feeder incapable of either processing examples as fast as they are received or generating batches as fast as the training GPUs can consume them.
Given its relationship to the train method, the model feeder must have the next batch ready in memory at or before the train method has consumed the current batch. A model feeder in production is typically a multi-threaded process operating on more than one CPU core. You can feed training examples to models during training in two ways: sequentially and distributed.
Model Feeder for Sequential Training
Figure 3 shows a sequential model feeder, where we start with an area of shared memory, then go through four steps, as follows:
- An area of shared memory reserved for the model feeder for holding two or more batches in-memory (step 1).
- A FIFO queue gets implemented in the shared memory (step 1).
- A first asynchronous process posts ready batches into the queue (step 2 and 3).
- A second asynchronous process pulls the next batch out of the queue, when requested by the train method (step 3 and 4).
Generally, a sequential approach is the most cost efficient for compute resources, and it’s used when the time period to complete the training is within your time to train requirements. The benefits are straightforward; there is no compute overhead, as in a distributed system, and the CPU/GPUs can be ran at full capacity.
Fig. 3 A model feeder for sequential training.
Model Feeder for Distributed Training
In distributed training, such as on multiple GPUs, the impact of I/O bottleneck at the model feeder can become more severe. As you can see in figure 4, it differs from the single instance, non-distributed – sequential approach, in that there are multiple asynchronous submit processes pulling batches from the queue, to feed the training of multiple instances of the model in parallel.
Although a distributed approach introduces some compute inefficiencies, this approach is used when the sequential approach to complete the training isn’t in your time requirements. Usually, the time requirement is based on a business requirement, and not meeting the business requirement has a higher cost than the compute inefficiencies.
Fig. 4 A model feeder for distributed training.
In distributed training, the first asynchronous process must submit multiple batches into the queue (step 1) at a speed equal or greater than the other plural asynchronous processes are pulling batches (step 2). Each of the distributed training nodes has an asynchronous process for pulling batches from the queue. Finally, there is a third asynchronous process for coordinating pulling batches from the queue and waiting for completion (step 3). In this form of distributed training,
A second asynchronous process is available for each distributed training node (step 2), where a node can be:
- Separate compute instances networked together.
- Separate hardware accelerators (e.g., GPU) on the same compute instance.
- Separate threads on a multi-core compute (e.g., CPU) instance.
You might ask, how does the model get trained when each instance only sees a subset of the batches? Good question. In this distributed method, we use batch smoothing of the weights. Think of it this way, each model instance learns from a sub-sampling distribution of the training data, and we need some method of merging the learned weights from each sub-sampling distribution. Each of the nodes, upon completion of a batch, sends its weight updates to the other nodes. When the recipient nodes receive the weight updates, it averages them with the weight updates from its own batch – hence the batch smoothing of weights. Two common network approaches are used to send the weights; one is to broadcast the weights on the subnet which all the nodes are connected to. The other is to use a ring network, where each next sends its weight updates to the next connected node.
Two consequences result from this form of distributed training, whether broadcast or ring. First, there is all the network activity. Second, you don’t know when the message with the weight updates will show up. It’s totally uncoordinated – or ad-hoc. As a result, the batch smoothing of the weights has some inherent inefficiencies and results in more epochs needed to train the model versus the sequential approach.
Model Feeder with a Parameter Server
Another version of distributed training is to use a parameter server. The parameter server typically runs on another node, which is usually a CPU. As an example, in Google’s TPU pods, each group of four TPUs has a CPU based parameter server. The purpose of the parameter server is to overcome the inefficiencies of the asynchronous updating for batch smoothing of the weights.
In this form of distributed training, the batch smoothing of weight updates happens synchronously. The parameter server, as depicted in figure 5, dispatches different batches to each of the training nodes, and then waits for each of the training nodes to finish consuming their corresponding batch (step 1), and send the loss calculation back to the parameter server (step 2). Upon receiving the loss calculation from each training node, the parameter server averages the loss and updates the weights on a master copy maintained by the parameter server, and then sends the updated weights to each training node (step 3). The parameter server then signals the model feeder to dispatch the next set of parallel batches (step 4).
Fig. 5 parameter server in distributed training.
The advantage of this synchronous method is that it doesn’t require as many epochs to be trained than the aforementioned asynchronous method, but the drawback is that each training node must weight on the parameter server to signal receiving the next batch, and the training nodes may run below GPU, or other compute, capacity.
A couple of more things we can point out; for each round, each distributed training node receives a different batch from each other because there could be significant variance in the loss across the training nodes, the overhead waiting on the parameter to update the weights, and that distributed training generally uses larger batch sizes. The larger batches smooth out or reduce variance across the parallel batches, and I/O bottlenecks during the training process.
Model Feeding with tf.data.Dataset
tf.data.Dataset can be used to construct a data pipeline, and It can be used as the mechanism for model feeding. In essence, an instance of tf.data.Dataset is a generator. It can be integrated into both sequential and distributed training, but in a distributor feeder the instance doesn’t act as a parameter server because that function is performed by the underlying distribution system.
Some of the primary benefits of a tf.data.Dataset are: setting of the batch size, shuffling the data for randomized batches, and prefetching the next batches in parallel to feeding a current batch.
The code below is an example of using tf.data.Dataset for feeding a model during training, using a dummy model — a Sequential model with a single layer (Flatten) with no parameters to train. For demonstration, we use the CIFAR-10 data from TF.Keras builtin datasets.
Because the CIFAR-10 data in this example is already in memory when loaded by cifar.load_data(), we create a generator that feeds batches from the in-memory source. The first step is to create the generator for our in-memory dataset. We do this using the from_tensor_slices(), which takes as a parameter a tuple of the in-memory training examples and corresponding labels (x_train, y_train). Note, this method doesn’t make a copy of the training data –instead, it builds an index to the source of the training data and uses the index to shuffle, iterate and fetch examples.
from tensorflow.keras.datasets import cifar10 import numpy as np from tensorflow.keras import Sequential from tensorflow.keras.layers import Flatten (x_train, y_train), (x_test, y_test) = cifar10.load_data() x_train = (x_train / 255.0).astype(np.float32) x_test = (x_test / 255.0).astype(np.float32) model = Sequential([ Flatten(input_shape=(32, 32, 3))] ) model.compile(loss='sparse_categorical_crossentropy', optimizer='adam') dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) #A dataset = dataset.batch(32).shuffle(1000).repeat().prefetch(2) #B model.fit(dataset, epochs=10, steps_per_epoch=len(x_train//32)) #C
#A Create a tf.data.Dataset as a generator for model feeding of CIFAR-10 training data.
#B Set model feeding attributes.
#C Use the generator as the model feeder when training.
Now that we have a generator in the above code example, we add some attributes to complete as a model feeder.
- We set the batch size to 32 (batch(32)).
- We set randomly shuffling a 1000 examples at a time in memory (shuffle(1000)).
- We re-iterate through the entire training data repeatedly (repeat()). Without repeat, the generator only makes a single pass through the training data.
- In parallel to feeding a batch, prefetch up to two batches in the feeder queue (prefetch(2)).
Next, we can pass the generator as the training input source to the fit(dataset,epochs=10, steps_per_epoch=len(x_train//32)) command for training. The fit command treats the generator as an iterator, and for each interaction the generator performs the model feeding task.
Note, because we’re using a generator for model feeding, the fit() method doesn’t know when it has consumed the entire training data for an epoch — because the repeat() attribute causees the generator to iterate forever. We need to tell the fit method how many batches constitute an epoch, which we set with the keyword parameter steps_per_epoch.
Dynamically Updating the Batch Size
Batch size is inversely related to the learning rate. During training, this inverse relationship means that conventional model feeding techniques increase the batch in proportion to decreases in the learning rate. Although TF.Keras has a built-in method for dynamically updating the learning rate with the LearningRateScheduler callback, it presently doesn’t have the same capability for the batch size. Instead, I’ll show you the DIY version of updating the batch size dynamically during training as it lowers the learning rate.
I’ll explain the DIY process as I describe the code (below) which implements it. In this case, we add an outer training loop to dynamically update the batch size. Recall, in the fit() method, the batch size is specified as a parameter. To update the batch size, we partition up the epochs, and call fit() multiple times. Inside the loop, we train the model for some specified number of epochs. As for the loop, each time we iterate through it, we update the learning rate and batch size, and set the number of epochs to train for in the loop. In the for loop, we use a list of tuples, where each tuple specifies the learning rate (lr), batch size (bs) and number of epochs (epochs). For example, (0.01, 32, 10).
Resetting the number of epochs in the loop is straightforward because we can specify it as a parameter to the fit() method. For the learning rate, we reset it by (re)compiling the model and reset the learning rate when we specify the optimizer parameter — Adam(lr=lr). It’s okay to recompile a model in the middle of training, as it doesn’t affect the model’s weights; recompiling doesn’t undo previous training. Resetting the batch size for a tf.data.Dataset isn’t as straightforward, because once set you can’t reset it. Instead, we must create a new generator for the training data in each loop iteration where we specify the current batch size with the method batch().
from tensorflow.keras.datasets import cifar10 import numpy as np from tensorflow.keras import Sequential from tensorflow.keras.layers import Flatten, Dense, Conv2D, MaxPooling2D from tensorflow.keras.optimizers import Adam (x_train, y_train), (x_test, y_test) = cifar10.load_data() x_train = (x_train / 255.0).astype(np.float32) x_test = (x_test / 255.0).astype(np.float32) model = Sequential([ Conv2D(16, (3, 3), activation='relu', input_shape=(32, 32, 3)), Conv2D(32, (3, 3), strides=(2, 2), activation='relu'), MaxPooling2D((2, 2), strides=2), Flatten(), Dense(10, activation='softmax') ]) for lr, bs, epochs in [ (0.01, 32, 10), (0.005, 64, 10), (0.0025, 128, 10) ]: #A print("hyperparams: lr", lr, "bs", bs, "epochs", epochs) dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)) #B dataset = dataset.shuffle(1000).repeat().batch(bs).prefetch(2) #B model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(lr=lr), metrics=['acc']) #C model.fit(dataset, epochs=epochs, steps_per_epoch=200, verbose=1) #D
#A Outer loop for dynamic resetting of hyperparams during training.
#B Create a new generator to reset the batch size.
#C Recompile the model to reset the learning rate.
#D Train the model with the reset number of epochs.
Let’s take a look at the abbreviated output from running our DIY version of dynamically resetting hyperparameters as we train. You can see in the first iteration of the outer loop, the training accuracy is 51% on the 10th epoch. On the second iteration, where the learning rate is halved and the batch size is doubled, the training accuracy is 58% on the 10th epoch, and on the third iteration it reaches 61%. As you can observe from the output, we maintained aconsist reduction in the loss and increase in accuracy over the three iterations, as we narrowed down into the loss space.
hyperparams: lr 0.01 bs 32 epochs 10 Epoch 1/10 200/200 [==============================] - 1s 3ms/step - loss: 1.9392 - acc: 0.2973 Epoch 2/10 200/200 [==============================] - 1s 3ms/step - loss: 1.6730 - acc: 0.4130 ... Epoch 10/10 200/200 [==============================] - 1s 3ms/step - loss: 1.3809 - acc: 0.5170 hyperparams: lr 0.005 bs 64 epochs 10 Epoch 1/10 200/200 [==============================] - 1s 3ms/step - loss: 1.2248 - acc: 0.5704 Epoch 2/10 200/200 [==============================] - 1s 3ms/step - loss: 1.2740 - acc: 0.5510 ... Epoch 10/10 200/200 [==============================] - 1s 3ms/step - loss: 1.1876 - acc: 0.5853 hyperparams: lr 0.0025 bs 128 epochs 10 Epoch 1/10 200/200 [==============================] - 1s 4ms/step - loss: 1.1186 - acc: 0.6063 Epoch 2/10 200/200 [==============================] - 1s 3ms/step - loss: 1.1434 - acc: 0.5997 ... Epoch 10/10 200/200 [==============================] - 1s 3ms/step - loss: 1.1156 - acc: 0.6129
Distributed Feeding with tf.strategy
The Tensorflow module tf.distribute.Strategy is convenient and encapsulated; everything is done for you, for distributed training across multiple GPUs on the same compute instance, or across multiple TPUs. It implements a synchronous parameter server as described earlier. This Tensorflow module is optimized for distributed training of tensorflow models, as well as optimized for distributed training on parallel Google TPUs.
When training on a single compute instance with multiple GPUs, use tf.distribute.MirrorStrategy and when training on TPUs use tf.distribute.TPUStrategy. We don’t cover distributed training across machines in this article, other than noting you should use tf.distribute.experimental.ParameterServerStrategy, which implements an asynchronous parameter server across a network. The set up for distributed training across multiple machines is complex, and requires an article in its own right. I recommend using this approach, as well as studying tensorflow documentation, if you’re building tensorflow models and meeting your business objectives requires substantial or massive parallelism during training.
Here’s our approach for setting up a distributed training run on a single machine, with multiple CPUs or GPUs:
- Instantiate a distribution strategy.
- Within the scope of the distribution strategy:
- Create the model.
- Compile the model.
- Train the model.
These steps may seem counterintuitive, in that we set up the distribution strategy when we build and compile the model, instead of when we train it. It’s a requirement in Tensorflow, that the construction of the model needs to be aware that it’s being trained using a distributed training strategy. As of this writing, the Tensorflow team has recently released a newer experimental version where the distribution strategy can be set independent of compiling the model.
And here is the code for implementing the above three steps, and two sub steps:
- We define the function create_model() to create an instance of the model to train (#A).
- We instantiate the distribution strategy — strategy = tf.distribute.MirrorStrategy() (#B).
- We set the distribution context — with strategy.scope() (#C).
- Within the distribution context, we create an instance of the model — model = create_model(), and then compile it — model.compile() (#C).
- Finally, we train the model (#D).
def create_model(): #A model = Sequential([ Conv2D(16, (3, 3), activation='relu', input_shape=(32, 32, 3)), Conv2D(32, (3, 3), strides=(2, 2), activation='relu'), MaxPooling2D((2, 2), strides=2), Flatten(), Dense(10, activation='softmax') ]) return model strategy = tf.distribute.MirroredStrategy() #B with strategy.scope(): #C model = create_model() #C model.compile(loss='sparse_categorical_crossentropy', optimizer='adam') #C model.fit(dataset, epochs=10, steps_per_epoch=200) #D
#A Function for creating an instance of the model.
#B Instantiate the distribution strategy.
#C Within the scope of the distribution strategy, create and compile the model.
#D Train the model.
You may ask, can I use a model that has already been built. The answer is no, you must build the model within the scope of the distribution strategy. For example, the following code example causes an error indicating the model wasn’t built within the scope of the distribution strategy:
model = create_model() #A with strategy.scope(): model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')
#A Model isn’t built within the scope of the distribution strategy.
Again, you might ask: I already have a pre-built or pre-trained model that wasn’t built for distribution strategy; can I still do distributed training? The answer is yes; if you’ve an existing TF.Keras model saved to disk, when you load it back into memory using load_model(), it implicitly builds the model. Below is an example implementation of setting a distribution strategy from a pre-trained model.
with strategy.scope(): model = tf.keras.models.load_model('my_model') #A model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')
#A Model is implicitly rebuilt when loaded from disk.
Likewise, when a pre-built model is loaded from a model repository, there’s an implicit load and correspondingly an implicit build. The following code sequence is an example of loading a model from the tf.keras.applications built-in model repository, where the model’s implicitly rebuilt.
with strategy.scope(): model = tf.keras.applications.ResNet50() #A model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')
#A Model is implicitly rebuilt when loaded from a repository.
By default, the mirrored strategy uses all the GPUs on the compute instance. You can get the number of GPU, or CPU cores, which are used with the property num_replicas_in_sync. You can also explicitly set which GPUs, or cores, to use. In the following code example, we set the distribution strategy to use two GPUs.
strategy = tf.distribute.MirroredStrategy(['/gpu:0', '/gpu:1']) print("GPUs:", strategy.num_replicas_in_sync)
The above code example generates the following output:
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1') GPUs: 2
That’s all for now. If you want to learn more about the book, check it out on Manning’s liveBook platform here.