By Riccardo Terrell

In this article, you’re going to implement one of the most common coordination techniques—the pipeline pattern. In general, a pipeline is composed of a series of computational steps, organized as a chain of stages, where each stage depends on the output of its predecessor and usually performs a transformation on the input data.

 

Save 37% off Functional Concurrency in .NET with code fccterrell.

You can think of the pipeline pattern as an assembly line in a factory, where each item on the assembly line is constructed in stages. The evolution of an entire chain is expressed as a function, and it uses a message queue to execute the function each time new input is received. The message queue is non-blocking because it runs in a separate thread; even if the stages of the pipeline take some time to execute, it won’t block the sender of the input from pushing more data to the chain.

This pattern is like the Producer/Consumer pattern, where a producer manages one or more worker threads to generate data. On the other hand, there can be one or more consumers who consume the data being created by the producer. Pipelines allow these series to run in parallel.

The implementation of the pipeline in this article follows a slightly different design as compared to the traditional one. The traditional pipeline pattern with serial stages has a speedup, measured in throughput, which is limited to the throughput of the slowest stage. This is because every item pushed into the pipeline must be passed through that stage. Moreover, the traditional pipeline pattern can’t scale automatically with the number of cores, rather it is limited to the number of stages. Only a linear pipeline, where the number of stages matches the number of available logical cores, allows it to take full advantage of the computer’s power. For example, in a computer with eight cores, a pipeline composed of four stages can use only half of the resources, leaving fifty percent of the cores idle.


Figure 1 The traditional pipeline creates a buffer between each stage that works as a parallel Producer/Consumer pattern. You can find almost as many buffers as there are f stages. With this design, each work item to process is sent to the initial stage, then the result is passed into the first buffer, which coordinates the work in parallel to push it into the second stage. This process continues until the end of the pipeline when all the stages are computed. By contrast, the functional parallel pipeline combines all the stages into one, as if composing multiple functions. Then, using a Task object, each work item is pushed into the combined steps to be processed in parallel; using the TPL and the optimized scheduler.


Functional programming promotes composition, which is the concept the pipeline pattern is based on. In the following listing, the pipeline embraces this tenet by composing each step into a single function and then running and distributing the work in parallel, fully leveraging the available resources. In an abstract way, each function acts as the continuation of the previous one, behaving as a CPS model. The code listing implementing the pipeline is in F#, then consumed in C#. You can find the full implementation in both programming languages in the downloadable source code.

Here the IPipeline interface defines the functionality of the pipeline:

Listing 1  The IPipeline interface

 
 [<Interface>]
 type IPipeline<'a,'b> =                                              ❶ 
     abstract member Then : Func<'b, 'c> -> IPipeline<'a,'c>          ❷ 
     abstract member Enqueue : 'a * (('a * 'b) -> unit) -> unit       ❸ 
     abstract member Execute : (int * CancellationToken) -> IDispose  ❸ 
     abstract member Stop : unit                                      ❺ 
 

❶   Interface to define the pipeline contract

❷   Function to expose fluent API approach

❸   Function to push new input to process into the pipeline

❸   Start the pipeline execution

❺   The pipeline can be stopped anytime; this function’s triggering the underlying cancellation token

  The function Then is the core of the pipeline, where the input function is composed of the previous one, applying a transformation. This function returns a new instance of the pipeline, providing a convenient and fluent API to build the process.

The Enqueue function is responsible for pushing items into the pipeline to be processed. This function takes a Callback function as an argument, which is applied at the end of the pipeline to further process the result. This design gives flexibility to apply any arbitrary function for each item pushed.

The Execute function starts the computation. Its input arguments, respectively, set the size of the internal buffer and a cancellation token to stop the pipeline on demand. This function returns an IDisposable type, which can be used to trigger the cancellation token to stop the pipeline.

The following listing shows the full implementation of the pipeline:

Listing 2 The parallel functional pipeline pattern

 
 [<Struct>]
 type internal Continuation<'a, 'b>(input:'a, callback:('a * 'b) -> unit) =
     member this.Input with get() = input
     member this.Callback with get() = callback                  ❶ 
  
 type Pipeline<'a, 'b> private (func:'a -> 'b) as this =
     let continuations = Array.init 3 (fun _ -> new BlockingCollection<Continuation<'a,'b>>(100))      ❷ 
  
     let then' (nextFunction:'b -> 'c) =
         Pipeline(func >> nextFunction) :> IPipeline<_,_>        ❸ 
  
     let enqueue (input:'a) (callback:('a * 'b) -> unit) =
         BlockingCollection<Continuation<_,_>>.AddToAny(continuations, Continuation(input, callback))   ❸ 
  
     let stop() = for continuation in continuations do continuation.CompleteAdding()                    ❺ 
  
     let execute blockingCollectionPoolSize (cancellationToken:CancellationToken) =
  
         cancellationToken.Register(Action(stop)) |> ignore      ❻ 
  
         for i = 0 to blockingCollectionPoolSize - 1 do
             Task.Factory.StartNew(fun ( )->                     ❼ 
                 while (not <| continuations.All(fun bc -> bc.IsCompleted)) && (not <| cancellationToken.IsCancellationRequested) do
                     let continuation = ref Unchecked.defaultof<Continuation<_,_>>
                     BlockingCollection.TakeFromAny(continuations, continuation)
                     let continuation = continuation.Value
                     continuation.Callback(continuation.Input,  func(continuation.Input))
  
             ,cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default) |> ignore
        
     static member Create(func:'a -> 'b) =
     Pipeline(func) :> IPipeline<_,_>                            ❽ 
  
     interface IPipeline<'a, 'b> with
        member this.Then(nextFunction) = then' nextFunction
        member this.Enqueue(input, callback) = enqueue input callback |> ignore
        member this.Stop() = stop()
        member this.Execute(blockingCollectionPoolSize, cancellationToken) =
             execute blockingCollectionPoolSize cancellationToken
           { new IDisposable member self.Dispose() = stop() }
 

 

❶   The Continuation struct encapsulates the input value for each task with the callback to run when the computation completes

❷   Initialization of the BlockingCollection buffers the work

❸   Function composition combines the current function of the pipeline with the new one passed and returns a new pipeline

❸   The Enqueue function pushes the work in the buffer

❺   The BlockingCollection is notified to complete, which stops the pipeline

❻   Registers the cancellation token to run the stop function when it’s triggered

❼   Starts the tasks to compute in parallel

❽   Static method creates a new instance of the pipeline

In the code, the Continuation structure is used internally to pass through the pipeline functions to compute the items. The implementation of the pipeline uses an internal buffer composed by an array of the concurrent collection BlockingCollection<Collection>, which ensures thread safety during parallel computation of the items. The argument to this collection constructor specifies the maximum number of items to buffer at any given time. In this case, the value is 100 for each buffer.

Each item pushed into the pipeline is added to the collection, which will be taken and processed in parallel in the future. The Then function is composing the function nextFunction, passed as argument, with the function func passed into the pipeline constructor. When the pipeline starts the process, it applies the final composed function to each input value.

The parallelism in the pipeline is achieved in the Execute function, which is spawning one task for each BlockingCollection instantiated. This guarantees a buffer for running the thread. The tasks are created with the LongRunning option to schedule a dedicated thread. The BlockingCollection concurrent collection allows thread-safe access to the items stored using the static methods TakeFromAny and AddToAny, which internally distribute the items and balance the workload among the running threads. This collation is used to manage the connection between the input and output of the pipeline, which behave as producer/consumer threads.

NOTE Using BlockingCollection, remember to call GetConsumingEnumerable because the BlockingCollection class implements IEnumerable<T>. Enumerating over the blocking collection instance won’t consume values.

The pipeline constructor is set as private to avoid direct instantiation. Instead, the static method Create initializes a new instance of the pipeline. This facilitates a fluent API approach to manipulate the pipeline.

This pipeline design ultimately resembles a parallel Produce/Consumer pattern capable of managing the concurrent communication between many-producers to many-consumers.

The following listing uses the implemented pipeline to refactor the detect faces program from the previous section. In C#, a fluent API approach is a convenient way to express and compose the steps of the pipeline.

Listing 3 The refactor detect face code using the parallel pipeline

 
 var files = Directory.GetFiles(ImagesFolder);
  
 var imagePipe = Pipeline<string, Image<Bgr, byte>>
     .Create(filePath => new Image<Bgr, byte>(filePath))
     .Then(image => Tuple.Create(image, image.Convert<Gray, byte>()))
     .Then(frames => Tuple.Create(frames.Item1,   
      CascadeClassifierThreadLocal.Value.DetectMultiScale(frames.Item2, 1.1,
          3, System.Drawing.Size.Empty)))
     .Then(faces =>{
  foreach (var face in faces.Item2)
         faces.Item1.Draw(face, new Bgr(System.Drawing.Color.BurlyWood), 3);
                              return faces.Item1.ToBitmap();
                          });                                             ❶ 
  
 imagePipe.Execute(cancellationToken);                                    ❷ 
  
 foreach (string fileName in files)
    imagePipe.Enqueue(file, (_, bitmapImage) => Images.Add(bitmapImage)); ❸ 
 

❶   Constructs the pipeline using fluent API.

❷   Starts the execution of the pipeline. The cancellation token stops the pipeline at any given time.

❸   The iteration pushes (enqueues) the file paths into the pipeline queue, whose operation’s non-blocking.

By exploiting the pipeline, the code has changed the structure considerably.

The pipeline definition is elegant, and it can be used to construct the process to detect the faces in the images using a nice, fluent API. Each function is composed step-by-step, and then the Execute function is called to start the pipeline. Because the underlying pipeline processing is already running in parallel, the loop to push the file path of the images is sequential. The Enqueue function of the pipeline is non-blocking; there are no performance penalties involved. Later, when an image is returned from the computation, the Callback passed into the Enqueue function updates the UI.

Here’s the benchmark to compare the different approaches implemented:

Table 1 Benchmark processing of 100 images using four logical core computers with 16 GB RAM of memory. The results represent the average from running each design three times.

Serial Loop

Parallel

Parallel continuation

Parallel LINQ Combination

Parallel Pipeline

68.57

22.89

19.73

20.43

17.59

The benchmark shows that, over the average of downloading 100 images for three times, the pipeline parallel design is the fastest. Furthermore, it’s also the more expressive and concise pattern.

For more information on the intersection of concurrency and the functional programming paradigm, download the free first chapter of Functional Concurrency in .NET and see this Slideshare presentation for more details.