From High-Performance Python for Data Analytics by Tiago Rodrigues Antao

In this article we will start to explore Python’s framework for concurrency – the first step in developing parallel applications.


Take 40% off High-Performance Python for Data Analytics by entering fccantao into the discount code box at checkout at manning.com.


Modern CPU architectures allow for more than one sequential program to be executed at the same time, permitting speed ups up to the number of parallel processing units available (e.g. CPU cores). However most Python code is normally sequential, so it is not able to use all available CPU resources. This means that most Python code doesn’t take advantage of modern hardware’s capabilities, and tends to run at a much lower speed than the hardware allows. So we need to devise techniques to make use of all the available CPU power.

Here we will be developing a MapReduce framework based on Python threads. The data will be in-memory and will run on a single computer. The service will have to be able to handle requests from several clients at the same time.

Revisiting sequential, concurrent and parallel computing

Before we start lets briefly review the meaning of sequential processing, concurrency and parallelism. Although these are basic concepts, many experienced developers still get them confused, so here’s a quick refresher to make sure we’re all using the terms in the same way.

Parallelism is the easiest concept to explain: Tasks are said to run in parallel when they are running at the same time. Concurrent tasks may run in any order: they may be run in parallel, or in sequence, depending on the language and OS. So all parallel tasks are concurrent, but not the other way around.

The term sequential can be used in two different ways. First, it can mean that a certain set of tasks need to be run in a strict order. For example, to write in your computer, you have to first turn it on: the ordering – or sequence —is imposed by the tasks themselves. The second task can only happen after the execution of the first one.

Sometimes, however, sequential is used to mean a limitation that the system imposes on the order of the execution of tasks, For example, when going through a metal detector in an airport, only one person is allowed at a time, even if two would be able to fit through it simultaneously.

Finally there is the concept of preemption: This happens when a task is interrupted (involuntarily) for another one to run. Another possibility is for a function to voluntary release control so that other code can run.

Figure 1 tries to make some of these concepts clearer.


Figure 1. Understanding sequential, concurrent and parallel models. Sequential execution occurs when all tasks are executed in sequence and never interrupted. Concurrent execution with no parallelism adds the possibility of a task being interrupted but another and later resumed. Parallelism occurs when several tasks are run at the same time, in this case the most common case is that preemption still occurs as the number of processors/cores are not enough for all the tasks. A dream scenario is when there are more processors than tasks: this allows parallel execution of all tasks without the need for any preemption.


Understanding MapReduce frameworks

Let’s start with deconstructing a MapReduce framework to see what components go into it. From a theoretical perspective, MapReduce computations are separated into at least two halves: a map and a reduce part. Let’s see this in action with a typical example of a MapReduce application: word counting. In this case, we’ll use two lines from Shakespeare’s “The Tempest”: “I am a fool. To weep at what I am glad of.” You can see this input in a MapReduce in figure 2. Other than map and reduce, in practice there need to exist other components, for example the results from a map need to be shuffled before being sent to reduce processes: if the two instances of the word am were sent to distinct reduce process, the count would not be correct.

The basics of a map reduce framework using word counting as an example. Traditional MapReduce frameworks have several processes or threads implementing the map and result steps. In many cases these can be distributed across several computers.

Word counting could be implemented with a map function that would emit an entry for every word found with a count of 1, and a reduce function would sum all the map entries for the same word. So map would emit:

 
 I, 1
 am, 1
 a, 1
 fool, 1
 To, 1
 weep, 1
 at, 1
 what, 1
 I, 1
 am, 1
 glad, 1
 of, 1
  

And reduce would then generate:

 
 I, 2
 a, 1
 fool, 1
 To, 1
 weep, 1
 at, 1
 what, 1
 am, 2
 glad, 1
 of, 1
  

Somewhere in the middle we need to shuffle the results so that a unique word would be seen only by a single reduce function. For example if “am” was seen by two different reduce functions, then we would end up with 2 counts of 1, when we want to see 1 count of 2. In our server, the shuffle function is built-in – the user doesn’t need to provide it.

Developing a very simple test scenario

Remember that we are implementing a MapReduce framework ourselves. While we won’t be users, we will need to test our map reduce framework. To do that we will return to the most common exercise with MapReduce: counting words in a text. Our framework will then be used with many other problems — but for basic testing of the framework, counting words will suffice.

The user code to implement this would be as simple as the following. Remember this is not what we were commissioned to do, just the example that we will use for testing:

 
 emiter = lambda word: (word, 1)     
 counter = lambda (word, emissions): (work, sum(emissions))
  

NOTE  We will be using functional notation on purpose as MapReduce has functional origins. If you use PEP 8, your syntax checker will complain as PEP 8 says “Always use a def statement instead of an assignment statement that binds a lambda expression directly to an identifier” – the way this is reported will depend on your linter. It is up to you if you prefer to use this notation or the PEP 8 one – which would be of the form def emiter(word):….

We will be using this code to test our framework.

Implementing a too-simple MapReduce framework

Remember, the code above is what your user will write. We will now implement a MapReduce engine – which is our real goal—that will count words and do much more. We will start with something that works but not much more – hence the too-simple moniker. Here is the first version available in the repo on 03-concurrency/sec2-naive/naive_server.py:

 
 from collections import defaultdict
  
 def map_reduce_ultra_naive(my_input, mapper, reducer):
     map_results = map(mapper, my_input)
  
     shuffler = defaultdict(list)
     for key, value in map_results:
         shuffler[key].append(value)
  
     return map(reducer, shuffler.items())
  

You can now use this with:

 
 words = 'Python is great Python rocks'.split(' ')
 list(map_reduce_ultra_naive(words, emiter, counter))
  

list forces the lazy map call to actually execute and so you will get the output:

 
 [('Python', 2), ('is', 1), ('great', 1), ('rocks', 1)]
  

While the implementation above is quite clean from a conceptual point of view, from an operational perspective it fails to grasp the most important operational expectation for a MapReduce framework: that its functions are run in parallel. In the next sections we will make sure we create an efficient parallel implementation in Python.

Implementing a threaded version of a MapReduce engine

Let’s try a second time and do a concurrent framework by using multi-threading. We will use the threaded executor from the concurrent.futures module in order to manage our MapReduce jobs. We are doing this in service of having a solution that is not only concurrent but also parallel, which allows us to use all the compute power available. At least that is what we hope.

Using concurrent.futures to implement a threaded server

We start with concurrent.futures because it is more declarative and higher-level than the most commonly used threading and multiprocessing modules. These are foundational modules in the in Python for concurrent and parallel processing.

Here is the new version available in 03-concurrency/sec3-thread/threaded_mapreduce_sync.py:

 
 from collections import defaultdict
 from concurrent.futures import ThreadPoolExecutor as Executor  
  
 def map_reduce_still_naive(my_input, mapper, reducer):
     with Executor() as executor:                  
         map_results = executor.map(mapper, my_input)   
  
         distributor = defaultdict(list)              
         for key, value in map_results:
             distributor[key].append(value)
  
         results = executor.map(reducer, distributor.items()) 
     return results
  

We use the threaded executor from the concurrent.futures module

The executor can work as a context manager

Executors have a map function with blocking behavior

We use a very simple shuffler function

Our function again takes some input along with mapper and reducer functions. The executor from concurrent.futures is responsible for thread management though we can specify the number of threads we want. If not, the default is related to os.cpu_count – the actual number of threads varies across Python versions. This is summarized in figure 2.


Figure 2. Threaded execution of our MapReduce framework


Remember that we need to make sure that the results for the same object – words in our example – are sent to the correct reduce function. In our case we implement a very simple version in the distributor default dictionary that creates an entry per word.

The code above can have a fairly big memory footprint, especially because the shuffler will hold all results in memory – though in a compact fashion. But for the sake of simplicity we will leave it as it is.

Exactly how the number of workers are managed is a more or less a black box with concurrent.futures. But if we want to make sure we are extracting the maximum performance we need to be in full control of how execution is done – because concurrent.futures is a black box we do not know for what it has been optimized. If you want to fine tune worker management you will need to use the threading module[1] directly – we will dig deeper into this in the book.

You can try this solution with:

 
 words = 'Python is great Python rocks'.split(' ')
 print(list(map_reduce_still_naive(words, emiter, counter)))
  

And the output will be the same as in the previous section.

The solution above has a problem: it doesn’t allow any kind of interaction with the ongoing outside program. That is, when you do executor.map you will have wait until the complete solution is computed. This is irrelevant with an example with 5 words, but you might want to have some feedback with very large texts. For example, you want to be able to report on percentage of progress done while the code runs. This requires a somewhat different solution.

Asynchronous execution with Futures

In the first instance let’s just code the map part in order to understand what is going on – see 03-concurrency/sec3-thread/threaded_mapreduce.py:

 
 from collections import defaultdict
 from concurrent.futures import ThreadPoolExecutor as Executor
  
 def async_map(executor, mapper, data):
     futures = []
     for datum in data:
         futures.append(executor.submit(mapper, datum))   
     return futures
  
 def map_less_naive(executor, my_input, mapper):
     map_results = async_map(executor, mapper, my_input)
     return map_results
  

We use submit instead of map when calling the executor

While the map function of the executor waits for results, submit doesn’t. We will see what that means when we run this soon.

We are going to change our emitter in order to be able to track what is going on:

 
 from time import sleep
  
 def emitter(word):
     sleep(10)
     return word, 1
  

The sleep call is there to slow the code down allowing us to track what is going on even with a simple example.

Let’s use our map function as it is:

 
 with Executor(max_workers=4) as executor:
     maps = map_less_naive(executor, words, emitter)
     print(maps[-1])
  

If you print the last item from the list, it might be something unexpected:

 
 <Future at 0x7fca334e0e50 state=pending>
  

You do not get ('rocks', 1) but instead you get a Future. A future represents a potential result which can be subject to await and checked for its state.

We can now allow the user to track progress like this:

 
 with Executor(max_workers=4) as executor:    
     maps = map_less_naive(executor, words, emitter)
     not_done = 1
     while not_done > 0:                             
         not_done = 0
         for fut in maps:
             not_done += 1 if not fut.done() else 0    
             sleep(1)                                 
         print(f'Still not finalized: {not_done}')
  

We put only 4 executors to let us track progress as we have 5 tasks

We print status while there are still tasks to be done

Check if the future is done

Sleep for a bit as we do not want a barrage of text

If you run the code above, you will get a few lines with ‘Still not finalized…​’. Typically for the first 10 seconds you will see 5, then just 1. As there are 4 workers, it takes 10 seconds to do the first 4 and then the final one can start. Given that this is concurrent code, this can change a bit from run to run, so the way threads are preempted can vary every time you run this code: it is non-deterministic.

There is one final piece of the puzzle left to do, which will be in the last version of the threaded executor: we need a way for the caller to be able to be informed of the progress. The caller will have to pass a callback function which will be called when an important event occurs. In our case, that important event will be tracking the completion of all map and reduce jobs. This is implemented in the code below:

 
 def report_progress(futures, tag, callback):     
     done = 0
     num_jobs = len(map_returns)
     while num_jobs > done:
         done = 0
         for fut in futures:
             if fut.done():
                 done +=1
         sleep(0.5)
         if callback:
             callback(tag, done, num_jobs - done)
  
 def map_reduce_less_naive(my_input, mapper, reducer, callback=None):
     with Executor(max_workers=2) as executor:
         futures = async_map(executor, mapper, my_input)
         report_progress(futures, 'map', callback)          
         map_results = map(lambda f: f.result(), futures)    
         distributor = defaultdict(list)
         for key, value in map_results:
             distributor[key].append(value)
  
         futures = async_map(executor, reducer, distributor.items())
         report_progress(futures, 'reduce', callback)   
         results = map(lambda f: f.result(), futures)  
     return results
  

report_progress will require a callback function that will be called every half second with statistical information about jobs done.

We report the progress for all map tasks.

We report the progress for all map tasks.

We report the progress for all reduce tasks.

So, every 0.5 seconds while the map and reduce are running the user supplied callback function will be executed. A callback can be as simple or as complicated as you want, though it should be fast as everything else will be waiting for it. For the word count example that we use for testing we have a very simple one:

 
 def reporter(tag, done, not_done):
     print(f'Operation {tag}: {done}/{done+not_done}')
  

Note that the callback function signature is not arbitrary: it has to follow the protocol imposed by report_progress, which requires as arguments the tag, and the number of done and not done tasks.

If you run this:

 
 words = 'Python is great Python rocks'.split(' ')
 results = map_reduce_less_naive(words, emitter, counter, reporter)
  

You will have a few lines printing the ongoing status of the operation. It would not be too difficult, for example, to use the return value as an indicator to the MapReduce framework to cancel the execution. This would allow us to change the semantics of the callback function to interrupt the process.

The GIL and multi-threading

Unfortunately, this solution is concurrent but not parallel. This is because Python – or rather, CPython – only executes one thread a time, courtesy of the infamous CPython GIL, the Global Interpreter Lock [2].

Let’s take a closer look at how the GIL deals with threads. While CPython makes use of OS threads – so they are preemptive threads the GIL imposes that only one thread can run at time. So, you might have a multi-threaded program running on a multi-core computer but you will end up with no parallelism at all. It’s actually a bit worse than that: the performance of thread swapping can be quite bad in multi-core computers due to the friction between the GIL, which doesn’t allow more than one thread to run at a time and the CPU and OS which are actually optimized to do the opposite.

GIL problems are overrated. The fact is that if you need to do high performance code at the thread level, Python is probably too slow anyway – at least the CPython implementation but probably also Python’s dynamic features. You will want to implement any extremely efficient code in a lower level language like C or Rust or using a system like Cython or Numba – which get discussed later on in the book.

And the GIL provides a few escape routes for lower-level code implemented in other languages: when you enter your lower-level solution you can actually release the GIL and use parallelism to your hearts content. This is what libraries like NumPy, SciPy or scikit-learn do. So your code case still be parallel: it’s just that the parallel part will not be written in Python.

But you can still write parallel code in pure-Python, and do that at a level of computing granularity that makes sense in Python. Not with multi-threading but with multi-processing.

So, due to the GIL, our multi-threaded code is actually not really parallel. In the book, we explore three directions to solve this: we can re-implement our Python code in a lower level language like Cython, C or Rust; use Numba to rewrite our code; or we can turn to multiprocessing to have parallelism and make usage of all CPU power available from Python.

That’s all for this article. If you want to learn more about the book, you can check it out on our browser-based liveBook platform here.

 


[1] Another alternative is to implement a concurrent.futures executor yourself, but in that case you would need an understanding of the underlying modules like threading or multiprocessing anyway.

[2] Other Python implementations like Jython, IronPython or PyPy do not have this limitation.