From Clojure: The Essential Reference by Renzo Borgatti

In this article we will explore some concrete examples of the many uses and intricacies of the fold function from the Clojure standard library.


Save 37% off Clojure: The Essential Reference. Just enter code fccborgatti into the promotional discount code box at checkout at manning.com.


The fold Function

function since 1.5

Listing 1 Parallel processing, Reduce-Combine, Fork-Join

 (fold
   ([reducef coll])
   ([combinef reducef coll] )
   ([n combinef reducef coll]))

In its simplest form, fold takes a reducing function (a function supporting at least two arguments) and a collection. If the input collection type supports parallel folding (currently vectors, maps, and foldcat objects), it splits the input collection into chunks of roughly the same size and executes the reducing function on each partition in parallel (and on multiple CPU cores when possible). It then combines the results back into the final output:

  
 (require '[clojure.core.reducers :as r])  
 (r/fold + (into [] (range 1000000)))      
 ;; 499999500000
  

❶   Reducers are bundled with Clojure, but they need to be required before use.

 fold splits the one million elements vector into chunks of roughly 512 each (the default). Chunks are then sent to the fork-join thread pool for parallel execution where they’re reduced by + and combined back again by +

fold offers parallelism based on “divide and conquer”: chunks of work are created and computation happens in parallel while, at the same time, finished tasks are combined back into the final result. The following diagram illustrates the journey of a collection going through a fold operation:


Figure 1. How the fork-join model to reduce-combine in parallel.


An important mechanism that fold implements (the diagram can’t show this clearly without being confusing) is work-stealing. After fold sends a chunk to the Java fork-join framework, each worker could further splits the work into smaller pieces, generating a mix of smaller and larger chunks. When free, a worker can “steal” work from another—the Fork-Join model for parallel computation is a complicated subject that can’t be illustrated in this article. If you want to know more, please read the following paper by Doug Lea, the author of Fork-join in Java: https://gee.cs.oswego.edu/dl/papers/fj.pdf. Work-stealing improves over basic thread-pooling, such as for less predictable jobs keeping one thread unexpectedly busy.

Contract

INPUT

The contract is different based on the presence of the optional “combinef” and whether the input collection is a map.

  • “reducef” is a mandatory argument. It must be a function supporting at least two arguments (and a zero argument call when “combinef” isn’t provided). The two arguments call implements the canonical reduce contract receiving an accumulator and the current element. The zero arguments call is used to establish the seed for the result, similarly to the “init” argument in reduce. When no “combinef” is provided, the 0-arity is invoked once for each chunk to establish the seed for the reduction. “reducef” is also used in place of “combinef” when the combination function isn’t provided. In this case “reducef” must be associative, as the chunks can be re-combined in any order.
  • “combinef” is optional and when present it must allow a zero and a two-arguments call. “combinef” needs to be associative to allow chunks to be combined in any order. The two-argument call is used to concatenate chunks back into the final result. When “combinef” is present, “reducef” zero arity is never called and “combinef” is called instead.
  • “n” is the approximate size of the chunk the input collection “coll” is split into. The default’s 512.
  • “coll” can be of any sequential type, empty or nil. If “coll” isn’t a vector, hash-map or clojure.core.reducers.Cat object (see r/foldcat to know more), fold falls back on sequential reduce instead of going parallel. When “coll” is a hash-map both “reducef” and “combinef” are invoked with three arguments instead of two, as per “reduce-kv” contract.

NOTABLE EXCEPTIONS

  • IllegalArgumentException is raised for the few unsupported collections types. This could happen for example when “coll” is a transient or a popular Java collection like java.util.HashMap. Good reasons are there to exclude thread-unsafe mutable collections that are subject to concurrency otherwise. Other thread-safe Java collections (like java.util.concurrent.ConcurrentHashMap) could be made “foldable” as we’re going to explore in the extended example).

OUTPUT

  • returns the result of invoking (reducef) or (combinef) with no arguments when “coll” is nil or contains one element.
  • returns the result of applying “reducef” to the next item in the collection. Then again “reducef” applied to the previous result and the next item, up to the last item in the collection. If “combinef” is present, then the partial accumulations are merged back using “combinef”. The last result of applying “reducef” (or “combinef”) is returned.

Examples

fold enables parallelism on top of the reduce-combine model. Many types of computations benefit from (or they can be adapted to) fold-like operations and reduce based data pipelines are a good candidate. In that example, we used a sequential count-occurrences function to count the frequency of words in a large text. We could rewrite the example to use fold like this:

  
 (require '[clojure.core.reducers :as r])
  
 (defn count-occurrences [coll]
   (r/fold
     (r/monoid #(merge-with + %1 %2) (constantly {}))  
     (fn [m [k cnt]] (assoc m k (+ cnt (get m k 0))))  
     (r/map #(vector % 1) (into [] coll))))            
  
 (defn word-count [s]
   (count-occurrences (.split #"\s+" s)))
  
 (def war-and-peace "https://www.gutenberg.org/files/2600/2600-0.txt")
 (def book (slurp war-and-peace))
  
 (def freqs (word-count book))
 (freqs "Andrew")
 ;; 700
  

❶  <r/monoid is a helper function to create a function suitable for “combinef”. The first argument for r/monoid is the merge function to use when to pieces are combined together. We want to sum the counts for the same word, something we can do with “merge-with”.

 “reducef” needs to assoc every word to the results map “m”. Two cases are possible: the word already exists and the count gets incremented or the word doesn’t exist and zero is used as the initial count.

 “coll” needs to be a vector to  ensure the input is transformed with into. The transformation of each line includes the creation of a tuple (vector of two items) with the word and the number one. We use r/map from the reducers library for this, and the transformation is deferred to parallel execution.

fold also works natively on maps. We could use freqs produced before as a new input for another fold operation. We could for example see the relationship between the first letter of a word and its frequency in the book.

The following example groups words by their initial letter and then calculates their average frequency. This operation is a good candidate for parallel fold, because the input contains thousands of keys (one for each word found in the input text):

  
 (defn group-by-initial [freqs]                           
   (r/fold
     (r/monoid #(merge-with into %1 %2) (constantly {}))  
     (fn [m k v]                                          
       (let [c (Character/toLowerCase (first k))]
         (assoc m c (conj (get m c []) v))))
     freqs))
  
 (defn update-vals [m f]                                  
   (reduce-kv (fn [m k v] (assoc m k (f v))) {} m))
  
 (defn avg-by-initial [by-initial]                        
   (update-vals by-initial #(/ (reduce + 0. %) (count %))))
  
 (defn most-frequent-by-initial [freqs]                   
   (->> freqs
     group-by-initial
     avg-by-initial
     (sort-by second >)
     (take 5)))
  
 (most-frequent-by-initial freqs)                         
  
 ;; ([\t 41.06891634980989]
 ;;  [\o 33.68537074148296]
 ;;  [\h 28.92705882352941]
 ;;  [\w 26.61111111111111]
 ;;  [\a 26.54355400696864])
  

group-by-initial uses fold expecting a hash-map from strings to numbers. The output is a much smaller map from letters to vectors. Number of keys in this map, is equal to the number of letters in the alphabet (assuming the text is large enough and we filtered out numbers and symbols). The letter “a” in this map contains something like [700, 389, 23, 33, 44] which are the occurrences of each word in the book starting with the letter “a”.

 The combining function is assembled using r/monoid. The initial value for each reducing operation is the empty map {}. Partial results are combined together by key merging their vector values together into a single vector.

 The reducing function takes three parameters: a map of partial results “m”, the current key “k” and the current value “v”. Similarly, to count word frequencies, we fetch a potentially existent key (using an empty vector as default value) and conj that into the vector of values “v”. The key is the initial letter of each word found in the input map.

❹  update-vals takes a map and a function “f” of one parameter. It then applies “f” to every value in the map using “reduce-kv.”

❺   avg-by-initial replace each vector value in a map with the average of the numbers found in it.

❻   most-frequent-by-initial orchestrates the functions seen this far to extract the top-most frequent words by initial.

❼   freqs is the result of the word count from the previously in the example.

After running most-frequent-by-initial we can see that the letter “t” is on average the most used at the beginning of a word, closely followed by “o”, “h”, “w” and “a”. This indicates that words starting with the letter “t” are on average the most repeated throughout my book (although some other word not starting with “t” might be, on absolute, the most frequent).

 

Creating your own fold

fold is a protocol-based extensible mechanism. Most of the Clojure collections provide a basic sequential folding mechanism based on reduce with the exception of vectors, maps, and foldcat objects which are equipped with a parallel reduce-and-combine algorithm. Classes like java.util.HashMap don’t have a proper fold and there are good reasons connected to the danger of exposing a mutable data structure to potentially parallel threads. Other thread-safe classes like java.util.concurrent.ConcurrentHashMap could be extended to be foldable, which is the subject of the following section. What we’re going to do for java.util.concurrent.ConcurrentHashMap could be easily extended to completely custom collections (provided they’re equipped for concurrent access).

To drive our example, let’s use a large ConcurrentHashMap of integers (keys) into integers (values) and some expensive function to apply to all the keys. A trivial transformation on values like inc or str is probably overkill for fold parallelism and we could use the Leibniz formula to approximate “Pi.” We’d like to execute the transformation on each key in parallel.

The design of the parallel execution is as follow: we don’t split the map, we split the keys into chunks. Values corresponding to each key partition are transformed in parallel by separate threads. No clashing normally happens, but fork-join is a work stealing algorithm and a partition can be routed to a thread where another partition was assigned, generating an overlap. This is the reason why we need java.util.concurrent.ConcurrentHashMap instead of a plain java.util.HashMap.

 

  
 (import 'java.util.concurrent.ConcurrentHashMap)
 (require '[clojure.core.reducers :as r])
  
 (defn pi [n]                         
   "Pi Leibniz formula approx."
   (->> (range)
        (filter odd?)
        (take n)
        (map / (cycle [1 -1]))
        (reduce +)
        (* 4.0)))
  
 (defn large-map [i j]                
   (into {}
     (map vector (range i) (repeat j))))
  
 (defn combinef [init]                
   (fn
     ([] init)
     ([m _] m)))
  
 (defn reducef [^java.util.Map m k]   
   (doto m
     (.put k (pi (.get m k)))))
  
 (def a-large-map (ConcurrentHashMap. (large-map 100000 100)))
  
 (dorun                               
   (r/fold
     (combinef a-large-map)
     reducef
     a-large-map))
 ;; IllegalArgumentException No implementation of method: :kv-reduce
  

pi calculates an approximation of the π value. The greater the number “n” the better the approximation. Relatively small numbers in the order of the hundreds generate an expensive computation.

 large-map serves the purpose of creating a large ConcurrentHashMap to be used in our example. The map keys are increasing integers although the values are always the same.

 combinef with no arguments returns the base map, the one all threads should update concurrently. Concatenation isn’t needed as the updates happen on the same mutable ConcurrentHashMap instance. ’Combinef ‘with two arguments returns one of the two (they’re the same object). combinef could be effectively replaced by (constantly m).

❹  reducef replaces an existing key with the calculated “pi.” Note the use of “doto” that allows Java operations like .put, which otherwise returns nil to the map.

❺  fold is unsuccessful, as it searches for a suitable implementation of reduce-kv which isn’t found.

We’re facing the first problem: fold fails because two polymorphic dispatches are missing: fold doesn’t have a specific parallel version for java.util.concurrent.ConcurrentHashMap, and it routes the call to reduce-kv. reduce-kv also fails because there’s an implementation for Clojure hash-map but not for Java ConcurrentHashMap. As a first step, we could provide a reduce-kv version which removes the error, but this solution isn’t enough to run the transformations in parallel:

  
 (extend-protocol                   
   clojure.core.protocols/IKVReduce
   java.util.concurrent.ConcurrentHashMap
   (kv-reduce [m f _]
     (reduce (fn [amap [k v]] (f amap k)) m m)))
  
 (time                              
   (dorun
     (r/fold
       (combinef a-large-map)
       reducef
       a-large-map)))
 ;; "Elapsed time: 41113.49182 msecs"
  
 (.get a-large-map 8190)            
 ;; 3.131592903558553
  

❶  We can add a type to a protocol by using extend-protocol. Our reduce-kv doesn’t need the value because we’re mutating the Java ConcurrentHashMap in place.

❷  fold now runs correctly. We need dorun to prevent the map to be printed on screen. We also printed a reasonably good estimate of the time elapsed for the operation to finish, which is above forty secs.

❸   To be sure that a-large-map has effectively been updated, we check the random key “8190”. It contains an approximation of “pi”, as expected.

Although we provided a suitable reduce-kv implementation, java.util.concurrent.ConcurrentHashMap doesn’t have a proper parallel fold yet. Similarly to reduce-kv, we need to provide a fold implementation by extending the correct protocol. The idea is to split the key set instead of the map and each thread operates in parallel to process the given subset:

  
 (defn foldmap [m n combinef reducef]       
   (#'r/foldvec
     (into [] (keys m))
     n
     combinef
     reducef))
  
 (extend-protocol r/CollFold                
   java.util.concurrent.ConcurrentHashMap
   (coll-fold
     [m n combinef reducef]
     (foldmap m n combinef reducef)))
  
 (def a-large-map (ConcurrentHashMap. (large-map 100000 100)))
  
 (time                                      
   (dorun
     (into {}
       (r/fold
         (combinef a-large-map)
         reducef
         a-large-map))))
 "Elapsed time: 430.96208 msecs"
  

 foldmap implements the parallel strategy for java.util.concurrent.ConcurrentHashMap. It delegates to foldvec in reducers namespace with the keys coming from the map, effectively reusing vectors parallelism.

We instruct CollFold protocol to use foldmap when  a fold is presented with a java.util.concurrent.HashMap instance.

❸  After recreating the large map (remember how it’s mutated after each execution) we try fold again resulting in the expected performance boost (from over forty seconds for the sequential case down to 430 millisecond). We also take care of transforming the ConcurrentHashMap returned by fold back into a persistent data structure for later use.

After extending CollFold protocol from the clojure.core.reducers namespace, we can see that fold effectively runs the update of the map in parallel, cutting the execution time consistently. As a comparison, this is the same operation performed on a persistent hash-map which is parallel enabled by default:

 (def a-large-map (large-map 100000 100))
  
 (time
   (dorun
     (r/fold
       (r/monoid merge (constantly {}))
       (fn [m k v] (assoc m k (pi v)))
       a-large-map)))
 ;; "Elapsed time: 17977.183154 msecs"  
  

 We can see that despite Clojure hash-map is parallel enabled, the fact that it’s a persistent data structure is playing against fast concurrent updates. This isn’t a weakness in Clojure data structure as they’re designed with a completely different goal in mind.

See Also

pmap concurrency. fold, on the other hand, allows a free worker to help a busy one dealing with a longer than expected request. As a rule of thumb, prefer pmap to enable lazy processing on predictable tasks, but use fold in less predictable scenarios where laziness is less important.

Performance Considerations and Implementation Details

=> O(n) linear

fold is implemented to recursively split a collection into chunks and send them to the fork-join framework, effectively building a tree in O(log n) passes. However each chunk is subject to a linear reduce that dominates the logarithmic traversal: the bigger the initial collection, the more calls to the reducing function, making it a linear behavior overall. Linearity of fold is unlikely to be important in performance analysis,  as other factors like the parallel execution of computational intensive tasks come into place.

Orchestration of parallel threads has a cost that should be taken into account when executing operations in parallel: like pmap, fold performs optimally for non-trivial transformations on potentially large dataset. The following simple operation for example, results in a performance degradation when executing in parallel:

  
 (require '[criterium.core :refer [quick-bench]])
 (require '[clojure.core.reducers :as r])
  
 (let [not-so-big-data (into [] (range 1000))]
   (quick-bench (reduce + not-so-big-data)))
 ;; Execution time mean : 11.481952 µs
  
 (let [not-so-big-data (into [] (range 1000))]
   (quick-bench (r/fold + not-so-big-data)))
 ;; Execution time mean : 32.683242 µs
  

As the collection gets bigger, the computation more complicated and the available cores increase, fold starts to outperform a similar sequential operation. But the potential performance boost is still not enough to grant the need for a fold, because other variables come into place such as memory requirements.

fold is designed to be an eager operation, as the chunks of input are further segmented by each worker to allow an effective work-steal algorithm. fold operations like the examples in this article need to load the entire dataset in memory before starting execution (or as part of the execution). When fold produces results which are substantially smaller than the input, there are ways to prevent the entire dataset to load in memory, for example by indexing it on disk (or a database) and include in the reducing function the necessary IO to load the data. This approach is used for example in the Iota library[1]

Now you have a good grasp on how the fold function works!


If you’re interested in learning more about the book, check it out on liveBook here and see this slide deck.


[1] The Iota library README explains how to use the library: https://github.com/thebusby/iota] which scans large files to index their rows and use that as the input collection for fold.