Clojure

Transducers have become quite popular recently. It’s a new feature of the non-released Clojure 1.7. As of writing this, Сlojure 1.7-alpha5 version is the latest development release, but there also appeared a fair number of ports of transducers for such languages as Python, Ruby, JavaScript, PHP, Java, C++, Lua, Erlang. To tell the truth, it’s a bit disconcerting, the library of reducers was added long ago (in Clojure 1.5). But no one said anything about them; nothing was ported, though they seem to be performing similar things. Or not?

Let’s find out what these reducers & transducers are made for (do we really need them?), how they work and how we can use them… Finally, we will decide whether it’s time to throw reducers away.

It would be wrong to describe concepts emerged in Clojure outside the context of the given language. Therefore, below you will find many listings in Clojure. Plus, there will be no mathematical analysis. Anyway, it’s good to have the basic knowledge of Clojure (especially the concept of sequences), but it’s not necessary to know Haskell.

Warning you that all the provided here listings of standard functions are changed a lot, sometimes even “slightly” broken. All for the sake of simplicity. Oh, and on the picture is that very burrito.

Folding…

So, the Clojure language is functional, and therefore an ordinary imperative loop is not good.

Okay, then. We have reduce!

(defn my-reduce
 ([rf coll]  ;; this option is for convenience
  (if-let [s (seq coll)]
    (my-reduce rf (first s) (next s))
    (rf)))
 ([rf acc coll]
   (if-let [[x & xs] (seq coll)]
     (recur rf (rf acc x) xs)
     acc)))

Of course, reduce is implemented in a slightly different way, but it’s not important for us now, so let’s just forget about it. The rf function (let's call it a reduct function) accepts two arguments here. The first one is some “sliding state”, while the second one is an element from the coll sequence. If the initial state is not defined, (first coll) or (rf) are used. We are running through the entire coll collection and invoke rf for each element. At that, we “drag” the acc state. When there are no elements left, simply return acc.

Here’s a small example. Suppose there is a list of strings and we want to calculate their total length.

Here’s the imperative code with a loop:

(defn length-of-strings [strings]
  (with-local-vars [acc 0]  ;; Yes, Clojure has local variables!
    (doseq [c strings]
      (var-set
        acc
        (+ @acc (count c))))  ;; that’s pretty much it
    @acc))

The loop state is a simple acc counter (number). We believe it is equal to (+ @acc (count c)) at each iteration.

Now, one more time, using reduce:

(defn length-of-strings [coll]
  (my-reduce
    (fn ([acc c] (+ acc (count c))))  ;; our reduct-function
    0                                 ;; initial value
    coll))

If we «forget» about laziness for some time, it is possible to implement many primitive operations, such as map or filter.

(defn my-map [f coll]
  (my-reduce
    (fn [acc c] (conj acc (f c)))
    []
    coll))
(defn my-filter [p coll]
  (my-reduce
    (fn [acc c] (if (p c) (conj acc c) acc))
    []
    coll))

The provided reduce variant is not suitable for the implementation of take, as the loop always runs through the entire sequence (it’s not some Haskell where everything is lazy).

In order to overcome this drawback, a special reduced marker and the corresponding reduced? predicate have been added to the 1.5 version of Clojure.

They also rewrote reduce and got something like this:

(defn my-reduce
  ([rf coll]
   (if-let [s (seq coll)]
     (my-reduce rf (first s) (next s))
     (rf)))
  ([rf acc coll]
   (if-let [[x & xs] (seq coll)]
     (let [ret (rf acc x)]
       (if (reduced? ret)
         @ret
         (recur rf ret xs)))
     acc)))

As soon as the reduct-function returns (reduced ...), the loop terminates and returns the value of @ret.

(defn take-r [n coll]
  (my-reduce
    (fn [[n1 acc] c]
      (if (pos? n1)
        [(dec n1) (conj acc c)]
        (reduced acc)))
    [n []]
    coll))
;; the function supports infinite sequences 
(take-r 5 (range))
;; => [0 1 2 3 4]

We should not forget about the great reductions function. It’s the analogy of reduce, but it returns a lazy list of all intermediate acc values, not only the last one. Its very useful for debugging. Write the algorithm step in the form of a function, run reduce on the collection with the input data. If something is not right, replace reduce with reductions, run it in REPL and get all the intermediate steps. It's not that easy with loops – we’ll have to create debugging workarounds, which is not very convenient.

Sometimes, reductions is useful as it is. For example, to calculate factorials:

;; => *lazy* sequence of factorials
(def factorials (reductions *' (cons 1 (map inc (range)))))
(nth factorials 20)
;; => 2432902008176640000

Clojure uses sequences to traverse collections. If we traverse a vector, a hash table, or a simple iterator, a fair amount of temporary objects will be created.

An obvious optimization here is to implement a specialized reduce variant for the collections needing it. If the collection is not suitable for such optimization, it’s better to use the standard implementation, similar to that provided at the beginning of the article. There’s a special clojure.core.protocol/CollReduce protocol for this purpose. When a collection object supports it, this implementation will be in use inside clojure.core/reduce. Therefore, reduce in Clojure is usually faster that the similar doseq loop.

Transformers

Transformer is a function that accepts one reduct-function and returns a new one.

For example, here’s the “increase by 1” transformer:

(defn inc-t [rf]
  (fn [acc c] (rf acc (inc c))))
;; here’s the example use
(reduce + 0 (map inc [1 2 3 4]))
;; => 14
(reduce (inc-t +) 0 [1 2 3 4])
;; => 14

It is possible to generalize this by allowing to use any function instead of inc:

(defn map-t [f]
  (fn [rf]
    (fn [acc c] (rf acc (f c)))))
(def inc-t (map-t inc))
(def dec-t (map-t dec))
;; ...
(reduce (inc-t +) 0 [1 2 3 4])
;; => 14

Here’s a “filter” transformer:

(defn filter-t [pred]
  (fn [rf]
    (fn [acc c]
      (if (pred c)
         (rf acc c)
         acc))))
(def odd?-t (filter-t odd?))
(def even?-t (filter-t even?))
;; the example
(reduce (even?-t *) 1 [1 2 3 4])
;; => 8

Is it possible to combine several transformers? Of course!

(defn odd?-inc-t [rf]
  (odd?-t (inc-t rf)))
;; ..or in a more standard way
(def odd?-inc-t (comp (filter-t odd?) (map-t inc)))
;; that is logically equivalent to..
(def odd?-inc-t
  (comp
    (fn [rf]
      (fn [acc c]
        (if (odd? c) (rf acc c) acc)))
    (fn [rf]
      (fn [acc c]
        (rf acc (inc c))))))
;; which will give the equivalent of this function
(defn odd?-inc-t [rf]
  (fn [acc c]
    (if (odd? c)
      (rf acc (inc c))
      acc)))
;; example use
(reduce * 1 (->> [1 2 3 4 5] (map inc) (filter odd?)))
;; => 15
(reduce (odd?-inc-t *) 1 [1 2 3 4 5])
;; ==> 15

It is worth noting that transformers are in reverse order. If we want elements of the collection to be processed by the A transformer before they reach B, we should stick them together as (comp A B). Here’s a trick:

(def cc (vec (range 1000000)))
(time (reduce + 0 (->> cc (filter odd?) (map inc))))
;; "Elapsed time: 171.390143 msecs"
;; => 250000500000
(time (reduce ((comp (filter-t odd?) (map-t inc)) +) 0 cc))
;; "Elapsed time: 93.246015 msecs"
;; => 250000500000

What a considerable speed increase! Of course, it all depends on many details and nuances. That’s why the gain may be different in reality. Anyway, I just want to say that you shouldn’t take this piece of code as a benchmark.

But in general, the results are not surprising. When using map and filter, two intermediate sequences are created. We traverse the initial vector and create a temporary list of filtered values. Then, we traverse this list and build another one, but with increased elements. Finally, we run through, summing values.

On the other hand, the variant with transformers does not create any temporary collections. Instead, both odd? and inc are applied to initial elements.

Where Are My Reducers?

It was all good till the 1.5 version introduced a new standard library – clojure.core.reducers. That’s right, a separate library; we’ll have to import it explicitly. It has also announced its own versions of map, filter, take-while, and others. Of course, they are not compatible with the regular versions from clojure.core. Therefore, it is better to write (require '[clojure.core.reducers :as r]) instead of the simple (use 'clojure.core.reducers).

So, what is a reducer? A short and silly definition: reducer is any object we can reduce through. In the terms of clojure.core.reducers, any collection is a reducer. A hash table is a reducer too, as well as java.lang.String, and, nil, of course too. Have a look at the definition:

(defn reducer [coll xf]
  ;; `xf` - is a transformer 
  (reify
    clojure.core.protocols/CollReduce
    (coll-reduce [this f1]
      (let [f2 (xf f1)]
        (clojure.core.protocols/coll-reduce coll f2 (f2))))
    (coll-reduce [this f1 init]
      (let [f2 (xf f1)]
        (clojure.core.protocols/coll-reduce coll f2 init)))))

The coll collection is taken, and a new one is returned, using which we can run reduce, and that’s it. We can neither add an element, nor remove one. We cannot even traverse elements. But a reduct-function will be passed through the xf transformer before every reduce run.

(def nums [1 2 3 4 5])
(def nums+1 (reducer nums inc-t))
(reduce + 0 nums)
;; => 15
(reduce + 0 nums+1)
;; => 20

As already mentioned, the reducers library has declared its own variants of map, filter, take-while and others. All of them accept a reducer and return a new one, to which a corresponding transformer is “attached”.

That's how clojure.core.reducers/map could look like (it certainly looks very different):

(def map-r [f coll]
  (reducer coll (map-t f)))

And now, a few more examples of how we can use all of this:

(require '[clojure.core.reducers :as r])
(def nums [1 2 3 4 5 6 7 8])
(type (map inc nums))
;; => clojure.lang.LazySeq
(reduce conj [] (map inc nums))
;; => [2 3 4 5 6 7 8 9]
(type (r/map inc nums))
;; => clojure.core.reducers$folder$reify__1234
;; it’s not a sequence at all
(reduce conj [] (r/map inc nums))
;; => [2 3 4 5 6 7 8 9]
;; but it still can reduce
(reduce conj [] (r/filter odd? nums))
;; => [1 3 5 7]
(reduce + 0 (->> nums (r/map inc) (r/map inc)))
;; => 52
;; ~~ (+ 0 (inc (inc 1)) (inc (inc 2)) ...)
(reduce + 0 (->> nums (r/filter odd?) (r/map inc)))
;; => 20
;; ~~ (+ 0 (inc 1) (inc 3) ...)

Let's Parallel

To be honest, “reducers” is not the best name. It would be better to name them “folders”, as in addition to the CollReduce protocol, a more important, CollFold protocol is declared in the library:

(defprotocol CollFold
  (coll-fold [coll n combinef reducef]))

It’s similar, but there are two reduct functions now, and also some n argument has been added. The idea is that several threads can traverse some collections. In short: divide it into blocks of n elements size, then fold each block with the help of #(reduce reducef (combinef) %), and then fold the list of results (one per block) once more, but now using #(reduce combinef %).

A reducer being able to fold itself in parallel is called a folder.

The CollFold protocol supports 2 standard collections: of a vector and a hash table.

(def v (vec (range 10000000)))
;; linearly, 1 thread
(time (reduce + v))
;; "Elapsed time: 648.897616 msecs"
;; => 49999995000000
;; several threads
(time (r/coll-fold v 512 + +))
;; "Elapsed time: 187.414147 msecs"
;; => 49999995000000

All standard reducers, for which it makes sense, implement CollFold. For example, r/map, r/filter, r/mapcat, r/flatten. On the other hand, r/take, r/take-while, r/drop do not support parallelization. Above is the implementation of r/map. Here’s the updated version:

(def map-r [f coll]
  ;; just replaced `reducer` with `folder`
  (folder coll (map-t f)))

It is not necessary to use coll-fold directly, as there’s a fold wrapper for everyday needs. It has a value set by default for n (the block size): 512. Anyway, the hint is clear: reducers are obviously meant for big collections (>1K elements). Once again: do not use coll-fold directly, call fold for this.

Oh, there’s foldcat. A kind of a speeded-up (due to multithreading) variant of #(reduce conj [] %). This function returns the objects of clojure.core.reducers.Cat that implement Counted, Sequable, and CollFold.

(r/map inc [1 2 3])
;; => # [2 3 4]
;; what’s with the speed...
(def v (vec (range 1000000)))
(time (count (reduce conj [] (r/map inc v))))
;; "Elapsed time: 90.124397 msecs"
;; => 1000000
;; not good, and what if we use `foldcat`
(time (count (r/foldcat (r/map inc v))))
;; "Elapsed time: 25.054988 msecs"
;; => 1000000
(time (count (r/foldcat (r/map inc (r/foldcat (r/map inc v))))))
;; "Elapsed time: 32.054988 msecs"
;; => 1000000
;; the result is `foldcat`, by the way, it is foldable as well (Hello to multithreading)
(satisfies? r/CollFold (r/foldcat []))
;; => true

Now on the scene… Transducers!

Unlike reducers, transducers are not a separate library. It is rather a concept (i.e. an idea) that will be integrated directly into the clojure.core module. Waiting for this in version 1.7 (it won’t take long).

In short, transducers are the same transformers, but called otherwise after the rebranding. Well, almost the same. From now on, reduct-functions can accept not only 0 and 2 arguments, but also 1. So a transducer is a function from the 0-1-2-ary reduct function to the new 0-1-2-ary one.

(def typical-transducer
  (fn [rf]
    (fn ([] ...)         ;; return the initial element
        ([acc] ...)      ;; not clear...
        ([acc c] ...)))  ;; here’s everything important, as before 
;; the new `map-t` version is 33% better than the old 
(defn map-t-improved [f]
  (fn [rf]
    (fn ([] (rf))                    ;; forwarding on
        ([acc] (rf acc))             ;; forwarding on
        ([acc c] (rf acc (f c))))))  ;; replace `c` with `(f c)`

Just like before, the 0-ary function can be invoked, if the initial element is required. The 2-ary variant is used for the reduction. The 1-ary variant is called at the very end of the whole work (on completing reduce). We need it for cases when it is necessary to “write” new elements after the last one.

Example: the dedupe transducer that removes consecutive duplicates from a collection:

(defn my-dedupe []
  (fn [rf]
    ;; be careful, the state!
    (let [prev (atom ::none)]
      (fn  ;; that’s our reduct function
        ([] (rf))
        ([acc] (rf acc))
        ([acc c]
          (let [p @prev]
            (reset! prev c)
            (if (= p c)
              acc
              (rf acc c))))))))
(def rf ((my-dedupe) +))
(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 7
(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 6
;; oops... `rf` is not empty, we cannot use it twice

It should be noted that our transducer returns a new reduct-function that has the mutable state and can make 3 different things (1 per arity). Just like an object. At the same time, a transducer does not have the state, it just functions as a kind of a factory.

Partition-all is provided as the example of the 1-ary variant of the reduct function. The simplified implementation:

(defn partition-all-t [n]
  (fn [rf]
    (let [buffer (java.util.ArrayList. n)]  ;; the state!
      (fn
        ([] (rf))
        ([acc]
          (if (.isEmpty buffer)
            ;; if the buffer is empty –  forward on
            (rf acc)
            ;; otherwise...
            (let [v (vec (.toArray buffer))  ;; convert the buffer into a vector
                  acc' (rf acc v)]           ;; reset with the help of the 2-ary `rf`
              ;; now we can forward it on
              (rf acc'))))
        ([acc c]
          (.add buffer c)
          (if (= n (.size buffer))
            ;; if the buffer if overfilled - "reset" it
            (let [v (vec (.toArray buffer))]
              (.clear buffer)
              (rf acc v))
            ;; if otherwise – do nothing
            acc))))))
;; use what we’ve made (we did not specify the initial, as (conj) => [])
(reduce ((partition-all-t 3) conj) (range 10))
; >> ClassCastException java.lang.Long cannot be cast to clojure.lang.IPersistentCollection
;; is not working...
;; well, what if we define []... 
(reduce ((partition-all-t 3) conj) [] (range 10))
;; => [[0 1 2] [3 4 5] [6 7 8]]
;; works, but it is incorrect...

Hmm… Neither the 0-ary, not the 1-ary variants of ((partition-all-t 3) conj) have been called – the regular reduce does not know anything about these innovations. It invokes the 0-ary variant only if the collection is empty, and it does not ever invoke the 1-ary variant.

Therefore, a new transduce function has been created. Unlike the “outdated” reduce, it always invokes (rf), if the initial state is not defined explicitly. This function also calls (rf acc), just once. Also, transduce invokes our transducer and hides the mutable reduct function. In other words, all the dirty work is performed “under the hood”.

;; pass the mutable transducer, not the result of its operation
(transduce (partition-all-t 3) conj (range 10))
;; => [[0 1 2] [3 4 5] [6 7 8] [9]]
;; and... it works!
;; the composition of transducers (works again)
(transduce (comp (filter odd?) (partition-all-t 3)) conj (range 10))
;; => [[1 3 5] [7 9]]
And what it we try using transduce instead of reduce? 
 (reduce (identity -) 0 [1 2 3 4])
;; => -10
;; ~~ (- (- (- (- 0 1) 2) 3) 4)
(transduce identity - 0 [1 2 3 4])
;; => 10
;; incorrect!

It turns out that we cannot replace reduce with transducer, as the new requirement of the 1-ary reduct function prevents us from doing this. In our example, transduce invokes (- acc) after the computations, which changes the result sign to the opposite. completing will help to solve this problem:

((completing -) 3 2)
;; => 1
((identity -) 1)
;; => -1
((completing -) 1)
;; => 1
(transduce completing - 0 [1 2 3 4])
;; => -10
;; Now it is correct!

The language core has introduced special functions for working with transformers transducers. A standard set of the transducers has also been added. Not to produce plenty of new functions (there are too many of them already), it has been decided to upgrade the existing map, filter, take, interpose, mapcat and company:

(map inc [1 2 3])
;; => (2 3 4)
(map inc)
;; => #
;; it is a transducer!
;; we can also do it this way
(transduce (map inc) + [1 2 3])
;; => 9
(transduce (comp (map inc) (filter even?)) + [1 2 3])
;; => 6
;; ~~ (+ (inc 1) (inc 3))  => 6

In addition to transduce, there are also several functions for working with transducers:

;; apply the transducer to the collection 
(sequence (map inc) [1 2 3])
;; => (2 3 4)
;; it’s equivalent to the following code
(transduce (map inc) conj [1 2 3])
;; => [2 3 4]
;; But...
;; the `sequence` function performs the transducer *lazily* !
;; this does not work for `transduce` 
(take 5 (sequence (map inc) (range)))
;; => (1 2 3 4 5)
;; the transducer support has also been added to the `into` function
(into [9] (map inc) [1 2 3])
;; => [9 2 3 4]

But eduction is the most interesting function. It returns a proxy object, on which we can invoke seq, reduce, or get a java-iterator. It is expected that this object will simply call transduce or sequnce. It's small, but quite a useful thing.

(def odds (eduction (filter odd?) (range)))
(def evens (eduction (remove odd?) (range)))
;; we can work like with sequential
(take 5 odds)
;; => (1 3 5 7 9)
;; a sequence of the first 100 500 numbers  
;; but there will be no reference to the beginning – the sequence will gather GC
(nth odds 100500)
;; => 2010001
;; reduce will be invoked at once here(no temporary LazyCol)
;; ~= (reduce ((filter even?) ((take 100500) +)) 0 (range))
(transduce (take 100500) + evens)
;; => 10100149500

Stop, stop, stop! It looks suspiciously like clojure.core.reducers/reducer… we could fold that one, while it is allowed to run seq here. Thus, we can throw r/reducer away. But not r/folder, he supports multithreading!

(require '[clojure.core.reducers :as r])
(def v (vec (range 1000000)))
(time (transduce (map inc) + v))
;; "Elapsed time: 120.193971 msecs"
;; => 500000500000
(time (r/fold + (r/folder v (map inc))))
;; "Elapsed time: 37.597224 msecs"
;; => 500000500000
;; but be careful!
(transduce (take 100500) + v)
;; => 5050074750
(r/fold + (r/reducer v (take 100500)))
;; => 5050074750
;; correct
;; reducer is outdated; it is better to use eduction
(r/fold + (eduction (take 100500) v))
;; => 5050074750
(reduce + (r/folder v (take 100500)))
;; => 5050074750
;; correct even in this case
(r/fold + (r/folder v (take 100500)))
;; => 109071345018
;; oops...
;; not every transducer can work in parallel (can be converted to a folder)

When using transducers, greater flexibility/abstraction, as well as greater performance are achieved, as compared to regular map/filter/etc (based on lazy sequences). Note that we are talking about Clojure sequences: in the level of abstraction and speed, transducers are comparable to regular iterators/enumerators/generators (they are usually called in different ways in different languages).

But let’s get back to Clojure. core.async used to have plenty of functions like map>, map<, filter<, filter>, etc. They are removed now (well, not removed, but deprecated). But they have allowed to specify a transformer transducer when creating a channel:

;; do not forget to connect the library to project.clj
(require '[clojure.core.async :as a])
;; just a regular transducer
(def xf (filter odd?))
;; and a channel with the buffer
(def ch (a/chan 10 xf))
;; put the numbers from 0 to 9 in the channel and close 
(a/onto-chan ch (range 10))
;; get numbers out of the channel 
(a/ [1 3 5 7 9]

We can hang a transducer only on buffered channels. Before the element is in the buffer, our transducer processes it. Pipelines also work with transducers.

Summing Up

Various reducers/transducers are the generalization of the fold operation. Therefore, they require a reduct function with 2 arguments for their useful work.

In addition to the 2-ary variant, it is better to define the 0-ary one as well, as we can use it in case the initial folding state is not set. But we cannot use it, if the collection is not empty. In this case, reduce will take its first element. And transduce does not act so meanly: either the initial state is passed to it explicitly, or the 0-ary call of the reduct function is used.

On the other hand, transduce requires more from our reduct function, namely, the 1-ary variant that quite often does not do anything in the general case. Seriously, ([x] x) is the most meaningful implementation in this case. But we are so lazy and don’t want to rewrite the old (0/2-ary) functions. That's why we use the completing wrapper that adds an empty 1-ary variant.

Reducers are based on transformers. A transformer = a function of the rf -> rf type. In fact, a reducer is the collection, to which we have bound the transformer. Every time we invoke reduce for this collection, at first the transformer “damages“ our reduct function.

A transducer ~= a transformer, but it also requires the support for the 1-ary reduct function. So we always define the 1-ary and declare proudly: “I certainly do not use the outdated transformers, only transducers”.

With all of this, transducers are not limited to work with collections. We can attach them to channels, input-output threads, queues, observers, etc.

Summary:

Subscribe to Kukuruku Hub

Or subscribe with RSS

0 comments