From Functional Programming in C#, Second Ed. By Enrico Buonanno

This article discusses Async streams and their use/implementation in C#.


Take 40% off Functional Programming in C#, Second Ed. By entering fccbuonanno2 into the discount code box at checkout at manning.com.


Task<T> is very good for modelling operations that take some time to deliver a single T, allowing you to write asynchronous code without excessive complexity. However, we often have operations that return not a single T, but several T‘s, which can be delivered individually or in batches, with a relatively long time between items or batches. Here are some examples:

  • Retrieving several pages from an API that features pagination: each page is retrieved with a single asynchronous operation, and includes a certain number of resources, but you need to retrieve several pages in order to retrieve all the resources you require.
  • Reading a file: instead of reading the whole content of a file into memory, you can read it line-by-line, asynchronously; this allows you to start processing the lines read so far while the rest of the file is still being read.
  • Retrieving data from a cloud-hosted DB

As with any long-running operation, we don’t want to wait while the requested values are delivered; instead, we want the calling thread to be freed as soon as the async request is kicked off.


Figure 1. An asynchronous sequence. The consumer of information asks for some data, which is returned asynchronously by the producer of information, with a non-negligible delay between the produced values.


We can model such scenarios as async streamsthat is, streams of values that are delivered asynchronously. These are represented with the IAsyncEnumerable<T> interface, and C# 8 introduced dedicated syntax to create and consume IAsyncEnumerables.

IAsyncEnumerable is like Task (in that it provides asynchrony) and like IEnumerable (in that it provides aggregation). It combines both effects, if you like. This is shown in Table 1.

Table 1. How IAsyncEnumerable compares with other abstractions

Synchronous

Asynchronous

Single value

T

Task<T>

Multiple values

IEnumerable<T>

IAsyncEnumerable<T>

So, you may ask, how is IAsyncEnumerable<T> different from Task<IEnumerable<T>>? Crucially, with Task<IEnumerable<T>>, you have to wait for the enclosing Task to complete, before you can consume the resulting T‘s. With IAsyncEnumerable<T>, on the other hand, you can start consuming the incoming T‘s as soon as they are received, without waiting for the end of the stream.

Let’s explore some concrete scenarios next.

Reading from a file as an async stream

Imagine you’re working in e-commerce, and have to keep track of how many items are available in the warehouse for each product. The logistics side of the business uses an obsolete protocol: stock deliveries to the warehouse are recorded in comma-separated values in a CSV file, which is uploaded at the end of each day. So, you have to write a process that reads from the CSV file (each line represents delivery of stock of a particular product) and updates the e-commerce DB accordingly.

Since reading from a file is relatively slow, it’s natural to model this as an asynchronous operation; furthermore, you can read the contents of the file line-by-line; this is more efficient than storing the entire contents of a large file into memory. Hence, you can use an IAsyncEnumerable:

Listing 1. Reading the contents of a file as an async stream of strings

 
 using System.Collections.Generic;
 using System.IO;
  
 static async IAsyncEnumerable<string> ReadLines(string path)
 {
    using StreamReader reader = File.OpenText(path);
    while (!reader.EndOfStream)
       yield return await reader.ReadLineAsync();
 }
  

Notice that to generate an IAsyncEnumerable, you use yield return (just like you do with IEnumerable) in combination with await. In fact, every time you have an asynchronous operation (a method that returns a Task<T>, as is the case with ReadLineAsync here) that needs to be called repeatedly, you can consider using an IAsyncEnumerable<T>.

Now that we have an async stream of strings, we can use each line to populate a data object (I’ll call this Delivery), and use it to update the DB:

 
 record Delivery(long ArticleID, int Quantity); (1)
  
 static Delivery Parse(string s) (2)
 {
    string[] ss = s.Split(',');
    return new(long.Parse(ss[0]), int.Parse(ss[1]));
 }
  
 static void UpdateDb(Delivery r) => // ... (3)
  

Models a delivery

Populates a Delivery from a line from the file

Updates the DB with information from the delivery

With these building blocks in place, we can write the program that updates the DB with the values from the CSV file:

 
 public static async Task Main()
 {
    await foreach (var line in ReadLines("warehouse.csv")) 
    {
       Delivery d = Parse(line); 
       UpdateDb(d); 
    }
 }
  

Consume the stream of lines from the CSV file

Parse each line into a Delivery

Save the Delivery to the DB

Notice that here we consume the values in the IAsyncEnumerable with await foreach. This is similar to how you consume the elements of an IEnumerable with foreach.

Consuming async streams functionally

I hope that you’re now thinking: but we never want to use foreach to explicitly loop over the elements in a collection; instead, we want to use Map, or a LINQ comprehension, to transform each line into a Delivery, and ForEach to update the DB!

And of course you would be right to think that, since that’s the approach I’ve been following throughout the book. The only catch is that the relevant extension methods on IAsyncEnumerable must be imported by referencing the System.Interactive.Async package.

Once you have this reference in place, you can rewrite the program as follows:

Listing 2. Using the extension methods in System.Interactive.Async to work with IAsyncEnumerable

 
 using System.Linq; 
  
 public static async Task Main()
    => await ReadDeliveries("warehouse.csv")
       .ForEachAsync(UpdateDb); 
  
 static IAsyncEnumerable<Delivery> ReadDeliveries(string path)
    => from line in ReadLines(path) 
       select Parse(line);          
  

Extensions on IAsyncEnumerable are in this namespace

Performs a side effect for each element in the stream

Applies a function to each element in the stream

Here we use a LINQ comprehension to transform each asynchronously delivered string into a Delivery, and ForEachAsync to update the DB. Why is it called ForEachAsync, instead of just ForEach? Well, since it completes only when all values in the stream have been processed, it returns a Task, and the convention is to use the -Async suffix for Task-returning operations.

Notice that above I’ve defined UpdateDb to be synchronous. In practice, you would probably like to make this operation asynchronous as well, so it would return a Task, rather than void. You would then need to rewrite the program as follows, using ForEachAwaitAsync rather than ForEachAsync:

 
 public static async Task Main()
    => await ReadDeliveries("warehouse.csv")
       .ForEachAwaitAsync(UpdateDbAsync);
  
 static Task UpdateDbAsync(Delivery r) => // ...
  

Consuming data from several streams

So far, you’ve seen how to define an async stream, and how to consume its values, using Select for data transformations (whether directly or through a single from-clause LINQ comprehension) and ForEachAsync or ForEachAwaitAsync to perform side effects. Next, we’ll look at using a LINQ comprehension with multiple from-clauses; as you know from sec_linq_pattern, this resolves to SelectMany, which is essentially Bind.

Imagine your client has not one, but several warehouses. They all upload their respective CSV files into a directory at the end of the day, so your program needs to change to process multiple files.

The change is quite simple. Instead of taking a path to a file, ReadDeliveries can take the directory path, and process all files present in that directory:

 
 static IAsyncEnumerable<Delivery> ReadDeliveries(string dir)
    => from path in Directory.EnumerateFiles(dir).ToAsyncEnumerable()
       from line in ReadLines(path)
       select Parse(line);
  

That’s it! A one-line change. EnumerateFiles yields an IEnumerable<string>. This needs to be promoted to an IAsyncEnumerable<string> so that it can be used in a LINQ comprehension with the streams generated by processing each file. Note that the files will be processed sequentially; therefore, the resulting stream will have all the deliveries from the first file, before moving on to the second file, and so on.

Aggregation and sorting with async streams

Async streams are very powerful in that they enable you to start consuming the values in the stream before the stream has ended. In our example, this means that you can start updating the DB while the CSV file is still being read. In some scenarios this can give you huge gains in efficiency.

Now imagine that the warehouse receives several deliveries throughout the day, potentially including several deliveries of the same product. So, the CSV file may include several entries for the same product ID. If this is the case, you want to perform a single DB update for that product.

Your code would then need to change as follows:

 
 public static async Task Main()
    => await ReadDeliveries("warehouse.csv")
       .GroupBy(r => r.ProductID)
       .SelectAwait(async grp => new Delivery(grp.Key
          , await grp.SumAsync(r => r.Quantity)))
       .ForEachAwaitAsync(UpdateDbAsync);
  

The gist here is that you are grouping the elements in the stream by their product ID; this is done with GroupBy, just as you would do with IEnumerable. Within each grouping, you then take the sum of all quantities to create a single Delivery for each product. But notice that you cannot use Sum like you would on IEnumerable; instead, you have to use SumAsync, which returns a Task (because you have to wait until you receive all items before you can compute their sum).

As a result, although the code is correct, you’ll notice that we’ve effectively lost some of the gains of asynchrony. We need to wait until all elements are received to compute their sum, or any other aggregate operation. Therefore, in this case IAsyncEnumerable ends up being no better than Task<IEnumerable>. This is also the case if you want the values to be sorted.

If you want to learn more about the book, check it out on Manning’s liveBook platform here.