dosync Archive Pages Categories Tags

Communicating Sequential Processes

12 July 2013

With the arrival of core.async, ClojureScript provides a powerful advantage over other popular compile to JavaScript languages like CoffeeScript, Dart, and TypeScript. These languages fail to address the single largest source of incidental complexity for any sizeable client side application - concurrency. In my experience no amount of simple syntactic sugar, class abstraction, or type annotation can plug this particular geyser of incidental complexity. These languages offer no tools beyond the weak ones already offered by JavaScript libraries or JavaScript itself - promises and generators.

JavaScript promises don't solve the inversion of control problem - callback hell is unnested but it's still callback hell. ECMAScript 6 generators suffer in various ways from being too simplistic, you need to manage coordination by hand or provide your own scheduler. You can combine generators with promises but now you're managing two abstractions instead of one (Dart appears to suffer from the same dualism but there it's Future/Stream).

Enough rhetoric, let's see how core.async works in practice.

First let's start off with something dramatic (in fact something that should seem impossible for those familiar with JavaScript). We will coordinate three independent processes running at three different speeds via a fourth process which shows the results of the coordination without any obvious use of mutation - only recursion.

(def c (chan))

(defn render [q]
  (apply str
    (for [p (reverse q)]
      (str "<div class='proc-" p "'>Process " p "</div>"))))

(go (while true (<! (timeout 250)) (>! c 1)))
(go (while true (<! (timeout 1000)) (>! c 2)))
(go (while true (<! (timeout 1500)) (>! c 3)))

(defn peekn
  "Returns vector of (up to) n items from the end of vector v"
  [v n]
  (if (> (count v) n)
    (subvec v (- (count v) n))
    v))

(let [el  (by-id "ex0")
      out (by-id "ex0-out")]
  (go (loop [q []]
        (set-html! out (render q))
        (recur (-> (conj q (<! c)) (peekn 10))))))

As expected we see process 1 more often than process 2, and process 2 more often than process 3. It appears that we have independent processes, we can coordinate them, and there's not a callback in sight.

If you haven't fallen out of your chair, let's look at some simpler examples to build up our intuition.

The code snippets in this post may look familiar to fans of functional reactive programming, but fear not, we'll get to the good stuff eventually.

The following code convert events on a DOM element into a channel we can read from:

(defn listen [el type]
  (let [c (chan)]
    (events/listen el type #(put! c %))
    c))

I've intentionally kept the code as simple as possible to limit the amount of novelty that we need to introduce (nearly everything I will show can look prettier/shorter either with more higher order channel operations or a touch of sweetening via macro sugar). listen takes a DOM element and an event type. The event listener callback puts the DOM event into the channel, since we're not in a go block we do this with an async put!.

Channels are like the channels found in Tony Hoare's Communicating Sequential Processes. The Go community is visibly enjoying the benefits of Hoare's abstraction - but of course programming languages have offered its treasures since the 1980s (occam-pi, Concurrent ML, JCSP). Interestingly both Rob Pike and John Reppy have both discussed the applicability of CSP for the coordination of user interfaces - user interfaces are inherently asynchronous and thus concurrent.

Let's see listen in action:

(let [el  (by-id "ex1")
      out (by-id "ex1-mouse")
      c   (listen el :mousemove)]
  (go (while true
        (let [e (<! c)]
          (set-html! out (str (.-offsetX e) ", " (.-offsetY e)))))))

Mouse over the grey box below:

Only in go blocks can we appear to read and write synchronously (via <! and >! respectively) to a channel. This allows us to fully escape callback hell in our coordination code.

Note that the above example is showing the position of the mouse in the element instead of absolute relative to the document - let's fix this with our first higher order channel operation map, analogous to Clojure's map except this works on channels not sequences:

(defn map [f in]
  (let [c (chan)]
    (go (loop []
          (if-let [v (<! in)]
            (do (>! c (f v))
              (recur))
            (close! c))))
    c))

map takes a function f and a channel in and returns a new channel. All the magic happens once again inside the go block, we can read values out of the in channel as they appear, apply f and write the result to the channel we returned. This works just as well for mouse events or asynchronous results from I/O or an XMLHttpRequest.

Let's use map:

(defn location [el]
  (let [[left top] (cljs.core/map int (offset el))]
    (fn [e]
      {:x (+ (.-offsetX e) left)
       :y (+ (.-offsetY e) top)})))

(let [el  (by-id "ex2")
      out (by-id "ex2-mouse")
      c   (map (location el)
            (listen el :mousemove))]
  (go (while true
        (let [e (<! c)]
          (set-html! out (str (:x e) ", " (:y e)))))))

Mouse over the grey box below to confirm that this works:

It's important to understand that go blocks paired with loop/recur allow us to create local event loops. Normally when writing client side code you are participating in a global event loop. As with global mutable variables, global event loops defy local reasoning. As JavaScript developers we work around the global event loop by coordinating through mutable locals or mutable object fields or by adding coordination methods to our API. With core.async all these ad-hoc methods disappear because we don't need them.

To further illustrate, let's see the coordination of two different streams in the same logical local event loop.

Say we want to handle both mouse and key events:

(let [el   (by-id "ex3")
      outm (by-id "ex3-mouse")
      outk (by-id "ex3-key")
      mc   (map (location el)
             (listen el "mousemove"))
      kc   (listen js/window "keyup")]
  (go (while true
        (let [[v c] (alts! [mc kc])]
          (condp = c
            mc (set-html! outm (str (:x v) ", " (:y v)))
            kc (set-html! outk (str (.-keyCode v))))))))

Make sure the window is focused and mouse over the following grey box and type at the same time:

:

alts! gives us non-deterministic choice over multiple streams. We will read from whichever channel has information. When reading from a channel alts! will return a tuple, the first element is the value read from the channel and the second element is the channel that was read from. This allows us to conditionally handle results from different channels as you can see with our use of condp.

Note this is quite different from the usual JavaScript solutions where we tend to smear our asynchronous handling across the code base.

Let's end with a final dramatic example, we present a port of Rob Pike's Go code that demonstrates parallel search with timeouts:

(defn fake-search [kind]
  (fn [c query]
    (go
     (<! (timeout (rand-int 100)))
     (>! c [kind query]))))

(def web1 (fake-search :web1))
(def web2 (fake-search :web2))
(def image1 (fake-search :image1))
(def image2 (fake-search :image2))
(def video1 (fake-search :video1))
(def video2 (fake-search :video2))

(defn fastest [query & replicas]
  (let [c (chan)]
    (doseq [replica replicas]
      (replica c query))
    c))

(defn google [query]
  (let [c (chan)
        t (timeout 80)]
    (go (>! c (<! (fastest query web1 web2))))
    (go (>! c (<! (fastest query image1 image2))))
    (go (>! c (<! (fastest query video1 video2))))
    (go (loop [i 0 ret []]
          (if (= i 3)
            ret
            (recur (inc i) (conj ret (alt! [c t] ([v] v)))))))))

(let [el (by-id "ex4-out")
      c  (listen (by-id "search") :click)]
  (go (while true
        (<! c)
        (set-html! el (pr-str (<! (google "clojure")))))))

Click the search button below multiple times:

We can run 3 pairs of requests in parallel, choosing the fastest of each pair. In addition we set a timeout of 80 milliseconds for the whole process.

Just to drive the point home all of the examples we have covered are all running at once, including the original process example at the top of the page.

Still so far we've only seen what I would consider very trivial if impressive examples. In the next post we'll look at an advanced example - a non-toy autocompleter input field. We will see core.async's advantage over traditional object oriented approaches as well as purely reactive approaches.

You can find and build these examples for yourself here.