Clojure Concurrency Tutorial
Do you want concurrency tools to rock your cores in Clojure?
By the end of this guide, you'll have the tools you need to dramatically improve the concurrency of your software. After reading this, if you can't find a way to correctly share resources and make better use of the entire capacity of the computer, you can ask me for help. Look at the end of the guide for how.
Table of Contents
- Catalog of Primitives
- Delay - I may not need to calculate this
- Promise - I'll check back here for the answer
- Future - Please calculate this in another thread
- A word about
deref
- What's in the box? - Atom - Keeping a single value consistent over time
- Ref - Keep multiple values in a consistent relationship
- Var - Redefine stuff as you program
- Agent - Queue up work to aggregate a value
- Volatile - Lightweight mutable state
- Lock - Keep others out while you're in
- core.async - Workers with conveyor belts
- Threads - Use your cores
- Executor Service - Run lots of tasks in a thread pool
- Manifold - Abstractions for data sources
pmap
- Simple parallelism with one more letter- Reducers - Transformation pipelines in parallel
- Fork/Join - Break your work into chunks, do the chunks on your cores, put them back together
- Concurrency vs parallelism
- JVM Thread model
- Immutable Data Structures
- Pure Functions
- Independence
Concurrency Primitive Catalog
So let's go over a bunch of different primitives with examples.
Delay
Wouldn't it be great if you could write some code that calculates a value, but never run the code if no one wants the value? And wouldn't it also be great if the value was only calculated once, no matter how many threads need the value?
There's a Clojure function called
delay
. It creates an
object called a Delay. A Delay ensures that some code is either run zero
or one time. It's run zero times if the Delay is never deref
ed. And
it's run once if it is deref
ed, regardless of how many times it's
deref
ed.
Here's an example:
;; make a delay that does some calculation
(def the-answer (delay (* 100 (long-calculation 45))))
;; that long calculation has not been done at this point
(when need-the-answer?
;; the thread will block until the answer is calculated
(println (deref the-answer)))
;; if you don't need the answer, it was never run
Now, that's a nice way of avoiding work in a single thread if you're going to use the work some of the time but not all of the time. Delays are also useful for a shared resource between threads.
;; make a delay that initializes a shared resource
;; if we don't put it in a Delay, the resource will be initialized during compilation
(def resource (delay (initialize-shared-resource)))
;; start 100 threads that use the resource
(defn -main []
(dotimes [x 100]
(doto (Thread. (fn []
(let [resource @resource]
;; use the resource here ...
))
.start)))
The magic is that only one of those threads will initialize it. The rest of the threads will block waiting for it.
Thanks to Dan Lebrero for explaining how Delay is useful for concurrency.
Promise
Clojure has a function called
promise
. A Promise is
like a coupon for a back rub that you get from your significant other.
It's not a present itself, but instead a Promise to give you a present
later.
Calling promise
will make a coupon for a value you promise to deliver
sometime in the future. Usually, you are going to calculate that value
in another thread.
Here's how you would use it.
;; make a Promise
(def the-answer (promise))
;;
start working on the promise in a new thread
(doto (Thread. (fn []
;;; do a lot of work
;;; ...
;;; then deliver on your promise
(deliver the-answer 42)))
.start)
;; in the original thread, do your own work
;; ...
;; then get the answer
;; (this will block if it's not there)
(println (deref the-answer))
Promises are a simple way to communicate between threads. One Thread calculates the answer. The other waits for it. (We'll look at how to create Threads later.)
You can create as many Promises as you want. But each one can only get one answer. Once the answer is delivered, it can't be delivered again.
Future
A Future is similar to a Promise. The main difference is that Futures
will evaluate an expression for you in another Thread. You don't have to
set the thread up. Here's how the same thing works using
future
.
(def the-answer (future
;; do a lot of work in a new thread
;; ...
;; then deliver the answer
;; the value of the last expression is delivered
42))
;; in the main thread, do some work
;; ...
;; then get the answer (which blocks until the answer is done)
(println (deref the-answer))
One thing that trips up people new to Futures is that they swallow
exceptions. If the code you run in the Future throws an exception, you
won't hear about it until you deref
it. When you deref
it, the
exception will be thrown again in the current thread.
Let's look at some code that demonstrates this:
;; the Exception gets thrown but stored in the Future
(def f (future (throw (Exception. "Hello from the future!"))))
(deref f) ;; this will throw the Exception
A word (or two) about deref
We've seen deref
a
couple of times now. And we're going to see it a bunch more times. So
it's worth spending a minute or two more on it.
What does deref
mean?
It stands for dereference. You see, both Promises and Futures are
types of references. They're not the values themselves. They're pointers
to the values. They're like boxes where the answer will be when the
calculation is finished. You call deref
on it and it gets whatever is
in the box. We can also abbreviate deref
by prepending a @
to the
reference, like this:
@reference ;; same as (deref reference)
Promises and Futures are references that might not be complete yet. In
Clojure speak, when a promise has been delivered, we say it is
realized. There's even a function called realized?
to check if it
has an answer. You can check if there's something in the box before you
block getting the answer.
Whenever you block, it's also important sometimes to limit the amount of
time you wait. Like in real life, you may want to say "if I don't hear
from you by noon, I'm going to lunch by myself". You can use a variant
of deref
that has a timeout in it.
;; wait four seconds (4,000 ms)
;; if we don't have an answer by then, deref will return :cheeseburger
(deref the-answer 4000 :cheeseburger)
Only blocking variants of references can use the timeout version. For instance, Atoms, which we'll see real soon (I promise) always have a value. So there's no need to wait, ever.
Ok, let me deliver on that promise. ;)
Atom
Atoms are maybe the most popular concurrency primitive in Clojure right now. Don't quote me on that, because I don't have any data. But I find that I use them quite a lot more than other things.
Why do I use them so much?
Because they capture the essence of sharing a single piece of information.
Let's say I'm working with nine other people. We are collecting donations from people and sharing out how much we've collected. To work together, we keep a running sum on a chalkboard. John's job is to tweet out every five minutes how much money we have. He glances over at the chalkboard, sees the number "42", and tweets out this:
"We have collected a total of $42!!"
Except that's wrong. Jane was in the middle of writing out the number 42,332. She had to glance down to remember the last three digits, just when John was checking the board.
It sounds dumb when people do it, but this is how computers work. They're dumb. If you say "check the board and tweet out what you see", that's what you get.
But Atoms are a solution.
What's the real problem? The deep root of the problem is that you can write partial answers on the board. If you could write the whole number at once, this never would have happened. Atoms ensure that you can only write consistent values.
An Atom holds a current value. That value has to be valid. You then send it a pure function which calculates the next value. The Atom guarantees that anybody derefing the Atom always gets either the old value or the new value, and never anything in between.
Here's another problem. It's one you're probably familiar with:
Let's say Jane and Jim both collect $10 at about the same time. They both run back to the chalkboard and see this value:
$70,400
Great! They both pull out their calculators, type in the number, then add 10. Jane finishes first, erases the chalkboard, and write $70,410. She runs back out to get more donations. Then Jim erases the board. And writes $70,410. The same number!
They both were following a correct algorithm. Get the number, add your additions, then write it on the board. But when two people are involved, they need to coordinate a little better. Again, this is dumb when people are doing it, but it's what happens when threads share memory without coordination.
Let's see an example of using Atoms to prevent this problem.
;; create an Atom with initially no money
(def donation-count (atom 0))
;; start 9 people collecting money (9 threads)
(dotimes [_ 9]
(doto (Thread. (fn []
;; wait three seconds
(Thread/sleep 3000)
;; go collect $1
(swap! donation-count inc)
;; do it again
(recur)))
.start))
;; start one person tweeting
(doto (Thread. (fn []
;; wait 100 seconds
(Thread/sleep 100000)
(tweet (str "We collected $" @donation-count " total!"))))
.start)
There's our friend deref
again. An Atom is a reference to a value, and
derefing it gives you the value. But what's
swap!
?
swap!
is the function for modifying the current value of the Atom.
Let's look at the arguments:
(swap!
donation-count ;; the Atom
inc ;; the function
)
swap!
takes the current value of the Atom, calls the function on it
(in this case, inc
), and sets the new value. However, just before
setting the new value, it checks to make sure the old value is still in
there. If it's different, it starts over. It calls the function again
with the new value it found. It keeps doing this until it finally writes
the value. Because it can check the value and write it in one go, it's
an atomic operation, hence the name.
That means the function can get called multiple times. That means it needs to be a pure function. Another thing is that you can't control the order of the function calls. If multiple threads are swapping to an Atom at the same time, order is out of the window. So make sure your functions are independent of order, like we talked about before.
For instance, incrementing (inc
) is independent of order. After all
the inc
s are in, you'll have the same answer, regardless of order.
So, with all of those requirements, what benefits do you get from an Atom?
Let me explain.
Atoms give you a very important guarantee: you can look at the current
value of the Atom (with deref
) at any time and know that, at that
time, the value was current. It was valid. It might not be current
forever, but it was a valid answer. For instance, in our donations
example, the value in the Atom
is always the sum of some subset of
donations. Any time you check the Atom, it was a valid count of money.
Let's look a bit more at the swap!
function. We were calling inc
,
which adds one. What if you want to add more than just $1 each time?
(swap! donation-count (fn [x] (+ x 10)))
Okay, see, we can add 10, or whatever number you want. But this is such
a common case that there's a shortcut. See, you're calling +
with the
current value of the Atom (here called x
) and a second argument
(10
). Here's the same thing but using the shortcut:
(swap! donation-count + 10)
This shortcut trips people up. The arguments are in a funny order, but we can take it step by step:
(swap!
donation-count ;; the Atom
+ ;; the function
;; ;; current value of Atom goes here!
10 ;; the second argument
;; ;; the third argument
;; ;; the fourth argument, etc
)
I've used semicolons up there as placeholders for arguments. But we can see what's going on there: the current value of the Atom gets put as the first argument.
If you want to learn more about atoms, their interface, and examples of their use, check out Clojure Atom: A Complete Guide.
Okay, I'm going to wipe the sweat off my brow. Because next up we're looking at the biggest, most powerful reference type in Clojure.
Ref
Listen, back in 2008/2009, everybody was talking about Refs. Why? Because they were Software Transactional Memory (STM), which was so hot in Clojure and one of the things that made it special. After a few years, it turns out that Atoms, which are much simpler, are good enough for most purposes. But the STM is still in there and people do use it.
When should you use it?
If we look at an Atom, it only holds one value. That value can be complex (a nested collection, for instance), but it's just one value. If you have two Atoms, there's no way to make sure they stay in relationship. For example, you can't make one Atom always be twice the value of another. Why? Because some thread could read the At oms after you change one but before you change the other.
Refs solve this problem. You can read from and modify multiple Refs inside of a transaction. Observers on the outside of the transaction cannot see the intermediate values. Nice!
Basically, Refs let you read and write to two chalkboards without any time passing between the first chalkboard and the second.
Let's look at some code:
(def total-donations (ref 0))
(def count-donations (ref 0))
;; start 9 people collecting money
(dotimes [_ 9]
(doto (Thread. (fn []
;; go collect $10
;; ...
(dosync
;; record $10
(alter total-donations + 10)
;; record one donation
(alter count-donations inc))
;; do it again
(recur)))
.start))
;; start one person tweeting the total
(doto (Thread. (fn []
;; wait 100 seconds
(Thread/sleep 100000)
(tweet (str "We collected $" @total-donations " total!"))))
.start)
;; start one person tweeting the average
(doto (Thread. (fn []
;; wait 100 seconds
(Thread/sleep 100000)
(when (pos? @count-donations)
(tweet (str "Average donation: $" (double (dosync (/ @total-donations @count-donations))))))))
.start)
dosync
means "do
synchronized". It means that everything in there is within a
transaction. Transactions give you some guarantees: if any funny
business happens in that transaction (an exception is thrown, for
example), the transaction will be aborted like it never happened. Inside
of that transaction, you have a consistent view of all of the Refs you
dereference. Any changes you make are visible to you from the inside,
but not to others outside of the transaction, until the transaction
completes. It's like you're in a little time bubble.
As a cost for these guarantees, you have to obey a few rules. First, no side effects in a transaction. The transaction can be run multiple times. Also notice that like Atoms, you can't guarantee the order.
Also notice that instead of using swap!
, you use
alter
. But it's very
similar. You can even do the extra arguments thing like we do above.
Var
We need to talk about Vars a little. But before I do, let me say this: you almost never use Vars directly as a concurrency primitive. So I'm not going to go deep into them. But they're important. We have to talk about them.
Whenever you define a variable using def
or defn
, you create a Var.
They're references like Refs and Atoms. That means they're mutable. You
can change the value of a Var. And we use that all the time while we're
doing interactive development. We define a function, we realize it's
not quite right, so we redefine it. That redefinition changes what's
called the root value of the Var.
In addition to the root value, Vars can have a different value per-thread. This lets different threads use different dynamic scopes with the same Var.
Vars are one of those things that recede into the background. You rarely use them explicitly. They are there only to support interactive development and dynamic scope.
Agent
For some reason, whenever you mention Agents, people think of Actors, which are found in Erlang. They're definitely not the same. An Actor receives messages, does some work based on which message they receive, then listens for more messages. Agents, on the other hand, hold state like an Atom or a Ref.
It's best to compare Agents to Atoms. Like Atoms, Agents are
uncoordinated. You can't modify two Agents with any kind of
guarantees. The difference is all about which thread does the work. When
you swap!
an Atom, the processing happens in the current thread. The
thread keeps retrying the computation until it successfully saves to the
Atom (or throws an exception). Bu
t everything you do to an Atom happens
in the current thread.
Calling send
on an Agent,
on the other hand, runs the computation on another thread. The call to
send
returns immediately after adding a job to a work queue that will
be processed by a thread pool. So it's like an Atom, but stuff happens
on another thread.
Let's look at the interface to Agent. send
is the Agent equivalent of
swap!
and alter
. Like I said, send
will process the computation in
a thread pool. You use thread pools for quick tasks, like adding or
string concatenation. If your task will take a long time---a long
computation or I/O---you could quickly overwhelm all of the threads in
the pool and keep them too busy to take on more work. If your function
does do lots of work, Clojure gives you a function called
send-off
, which runs
each task in its own thread. Use that if you're doing I/O or a lot of
computation.
Let's write a simple summing function that stores the answer in an Agent.
(def sum (agent 0)) ;; create an agent initialized to 0
(def numbers [0 9 3 4 5 5 4 44 4 2 5 6 7 775 ...])
(doseq [x numbers]
(send sum + x)) ;; add x to the current value
(await sum) ;; wait until all sent actions are done
(println @sum) ;; should have the answer
It might be easy to think that this happens in parallel. Even though it's happening on multiple threads, it's not parallel. Each Agent has its own queue of tasks, and they are done in the order they are received, one at a time. So it's not parallel. If you want parallelism with Agents, you have to have many Agents.
How can we make this sum parallel? Easy. Just make multiple Agents in a loop.
;; make 10 agents initialized to zero
(def sums (map agent (repeat 10 0)))
(def numbers (range 1000000)) ;; one million numbers
;; loop through all numbers and round-robin the agents
(doseq [[x agent] (map vector numbers (cycle sums))]
(send agent + x))
;; wa
it at most 10 seconds
(apply await-for 10000 sums)
;; sum up the answers in all ten agents
(println (apply + (map deref sums)))
Of course, summing is just an example. It's probably not worth queuing up a task that just adds (addition is faster than queuing). But this shows how to do any kind of work in parallel.
In this example, all agents get the same number of tasks. What happens if some tasks take longer than others? That means that some Agents will be idle while others are still working. How can you prevent that?
The answer, predictably, is to add another level of indirection.
Instead of round-robin, we should add our numbers to a queue that the agents pull from when they're ready for more work.
;; make 10 agents initialized to zero
(def sums (map agent (repeat 10 0)))
(def numbers (agent (range 1000000))) ;; one million numbers in an agent
(defn dequeue-and-add [sum-agent]
(letfn [(add [current-sum x]
;; do the addition
(let [new-sum (+ current-sum x)]
;; when we're done, schedule the next dequeue
(send numbers dequeue)
;; return the new value of the Agent
new-sum))
(dequeue [xs]
;; check if there's more to do
(when (not (empty? xs))
;; send the first number to the Agent
(send sum-agent add (first xs)))
;; return the other numbers for other Agents
(rest xs))]
(send numbers dequeue)))
;; start all 10 Agents working
(doseq [sum-agent sums]
(dequeue-and-add sum-agent))
;; wait for all the numbers to be cleared from the queue
(loop []
(when (seq @numbers)
(Thread/sleep 1000)
(recur)))
;; sum up the answers in all ten agents
(println (apply + (map deref sums)))
We're using a hand off pattern. In dequeue-and-add
above, add
triggers dequeue
and dequeue
triggers add
. dequeue
takes a
number from the numbers
Agent and add
adds it to the summing Agent.
We can't use
await
on
this one because the Agents use a hand off pattern. There will be times
when there are no tasks queued on an Agent because it is waiting for
another task to finish on another Agent. We have to be clever and wait
for the queue to process. You should probably add a timeout to that, in
case the queue never does empty.
What about errors?
Well, that's really interesting. If your task throws an exception, it
gets stored inside the Agent (not as the Agent's state). Once there's an
exception, no more tasks will be processed. You can check if an Agent
has failed by calling
agent-error
on it.
It will return the exception (or nil
if there isn't one). Then you can
clear it with
restart-agent
.
You can have the Agent automatically handle its own errors with
set-error-handler!
.
You pass it a function of two arguments (the agent and the exception)
and you can handle the error.
Agents are really flexible, but as you can see, the code can get kind of
complicated. For doing stuff in parallel, I'd probably use other
options, like reducers (for pure functions) and ExecutorService
(for
blocking tasks).
Agents are nice for things where you have a value that needs to be accumulated to over time and you want to do a calculation on it in another thread. They can also do side effects in the function you send them, because they're queued up instead of competing.
Volatile
Clojure 1.7 introduced a new type of mutable reference called
volatile
. Volatiles are used primarily as a way to hold mutable state
inside of a stateful transducer. They are, however, useful in certain
limited cases outside of transducers.
We'll get to that. But first, what are they? Volatiles are mutable references, like atoms, refs, and agents. However, they do not impose any transactional disciplines as the others do. The only concession to concurrency is that volatiles force other threads to get fresh values of it every time.
That requires a bit of background. Please excuse the Java. I'll keep it easy.
Let's say I have 2 cores on my machine, and two threads running. I also have some flag variable that is shared between them.
Shared Flag
boolean keepRunning = true;
Thread 1
void run() {
Thread.sleep(10000); // sleep for 10 seconds
keepRunning = false;
}
Thread 2
void run() {
while(keepRunning) { // stop looping when flag is false
Thread.sleep(1000); // sleep for 1 second
println("Still running!");
}
}
One thread is supposed to communicate with the other thread. After 10
seconds, Thread 1 will tell Thread 2 to stop by setting the flag to
false
.
Here's the thing: this may or may not work.
Modern CPUs share memory between the cores, but each core has its own local cache. The cores are allowed to copy data they will use from main memory into its cache. If it does that, it will never check the main memory for changes again. Thread 2 will loop forever.
So, depending on how the cache is used, the cores involved, and even how the JIT has inlined methods and cached values locally, etc, this could have different behavior. Ouch.
Java's solution is the keyword volatile
, where this primitive gets its name.
You put volatile
in front of the variable, and it will force all reads to
bypass the cache and go to main memory. It will be slower, but it will be
correct.
Shared Flag
volatile boolean keepRunning = false;
Back to Clojure. Volatiles in Clojure, under the
hood,
are just a single mutable variable with volatile
on them. It makes
sure that reads are not cached.
As such, volatiles are helpful in communicating something from one thread (the writer) to many threads (the readers). They can be used for a single thread to hold and mutate state for later. Or for one thread to tell other threads something, like when to stop looping.
Let's look at our Java example translated to Clojure.
(def keep-running? (volatile true))
(def thread1 (doto (Thread. (fn []
(Thread/sleep 10000)
(vreset! keep-running? false)))
(.start)))
(def thread2 (doto (Thread. (fn []
(while @keep-running?
(Thread/sleep 1000)
(println "Still running!"))))
(.start))
Or how about as a mute switch?
(def mute? (volatile false))
(defn mute! []
(vreset! mute? true))
(defn unmute! []
(vreset! mute? false))
;; only call this from the UI thread!!
(defn toggle-mute! []
(vswap! mute? not))
;; start 10 threads
(doseq [id (range 10)]
(while true ;; run forever
(Thread/sleep 1000)
(when-not @mute? ;; only print while unmuted
(println "Ping!" id))))
When to use them
In general, volatiles are a lot like atoms. However, no atomicity is enforced
like you would get with atoms and swap!
. They are much faster and more
dangerous to use.
There are three scenarios where you can use volatiles safely.
1. Single thread writing and reading
In the simplest case, you're not doing any concurrency since you're not sharing the resource. In this case, volatiles are fine. However, if you're doing this, there is probably a more functional way to write your code that will be just as fast. I don't recommend this option for most use cases.
Regardless, here's an example for clarity (but don't do this):
;; sum numbers 1 to 1000000
(let [sum (volatile! 0)]
(dotimes [n 1000000]
(vswap! sum + n))
@sum)
2. n>1 threads writing, m>=1 threads reading
If you've got a different thread (or multiple threads) reading the values in the volatile, you are sharing it so you have to be careful. Further, if you've got multiple writers, you've got to be extra careful. Writes are not atomic. Different threads can overwrite each other.
In this case, the way to stay safe is to never read the volatile in the writing
threads. That means never use deref
(or @
) and also never use vswap!
.
vswap!
does an implicit read to calculate the next value. The writing threads
have to use vreset!
.
If you do need to read from the volatile or you can't follow this rule, use an atom instead.
Here's an example. Let's say you've got ten threads checking ten humidity sensors in your factory. Each sensor gets a new reading about every 5 seconds. You want to have a place to store the latest reading, regardless of which sensor it came from. Another thread can poll that place every second. This example is contrived, which may imply that this case isn't that common.
(def latest-humidity (volatile! nil))
;; writing threads
(dotimes [sensor-id 10]
(doto (Thread. (fn []
(while true
(Thread/sleep 5000) ;; sleep 5 seconds
(let [r (read-sensor sensor-id)]
(vreset! latest-humidity r)))))
(.start)))
;; polling thread (can have multiple reader threads)
(doto (Thread. (fn []
(while true
(Thread/sleep 1000)
(let [r @latest-humidity]
(println "Latest humidity reading" r)))))
(.start))
3. 1 thread writing, m>=1 threads reading
If you only have one thread writing, it's much easier. You can read and
write from the same thread. That means you can use the full spectrum of
deref
, vswap!
, and vreset!
.
Let's expand on the humidity sensor example. This time, it will be much more useful. We still have ten sensors. This time, we want to record the highest humidity recorded that day. If the humidity gets too high, the goods in the factory will be ruined. To use a volatile, we have to do all of the writing in a single thread.
(def highest-humidity (volatile! Long/MIN_VALUE))
;; single writing thread
(doto (Thread. (fn []
(while true
(Thread/sleep 5000) ;; sleep 5 seconds
;; loop through all sensors
(dotimes [sensor-id 10]
(let [r (read-sensor sensor-id)]
;; we can use vswap! to keep track of the max
(vswap! highest-humidity max r))))))
(.start))
;; polling thread (can have multiple reader threads)
(doto (Thread. (fn []
(while true
(Thread/sleep 1000)
(let [r @latest-humidity]
(println "Highest humidity reading" r)))))
(.start))
Those examples should work, but I'd like to reiterate: I would probably use an atom unless performance was critical and it was clear that using atoms was a bottleneck. Atoms are easier to work with, and they have a similar interface. Their only downside compared to volatiles is that atoms are slower.
Lock
Locks are the traditional, old-school way of coordinating access to resources. Like a lock on your bathroom, software locks make sure only one person is using that resource (the bathroom) at the same time. We call that mutual exclusion: if I'm in the bathroom, you can't be in here. I exclude you and you exclude me. While a thread has a lock, it can act like it's the only thread that has that resource. It's a very low-level way of coordinating, but sometimes that's exactly what you want.
All JVM objects have a built-in lock. You usually don't notice, but it's
how Java implements the synchronized
keyword. So all you need to
create a lock is to create an Object
.
Let's solve an actual problem using locks. If you have many threads all printing to the console at the same time, very often you'll see that the lines are mixed up. Two threads that print at exactly the same time will send their characters at the same time, and the line is just messed up.
(defn log [& args]
(apply println args))
;; Thread 1
(log "INFO 2017-4-29: Starting database connection.")
;; Thread 2
(log "WARNING 2017-4-29: Cannot find configuration file, using defaults.")
Instead of the following, which is what you want:
INFO 2017-4-29: Starting database connection.
WARNING 2017-4-29: Cannot find configuration file, using defaults.
You get something like this:
INFO 20WARN17-4-29: StartingING 2017-4-29: Cannot find configuration file, database connection.
The characters from the two println
s got mixed up. They were being
sent to standard out at the same time from two different Threads.
What's the solution?
Lock an object so that only one Thread can be in some code at the same time.
;; construct an Object just for its lock
(def log-lock (Object.))
(defn log [& args]
(locking log-lock
(apply println args)))
Now you can call log
in many different Threads. The lines will always
come out okay.
Like I said, locking
is a low-level tool. I don't know if I've ever
used it in production. But if you have some resource shared by multiple
Threads, this is a simple way to let them share it safely.
core.async
If you've written Clojure, you've probably heard of core.async. core.async is a great library for doing parallel processing. The reason is simple: it's a very lightweight way to break down tasks and communicate between the tasks.
You create a task by using the
go
macro. All of the
code inside of a go
block will be executed in a Process. Processes
are lighter weight than Threads and so are better for breaking stuff up
even more than you normally would with Threads.
Processes can communicate using lightweight queues called Channels. The patterns are endless, but just as an example, you could create one Process that puts values onto a Channel and another Process that consumes them. Channels ensure that Processes wait for each other and that each value on the Channel is delivered only once.
Let's look at some code:
(require [clojure.core.async :as async])
;; create a channel with a buffer of up to 100 values
(def number-chan (async/chan 100))
;; Atom where we keep the sum
(def sum (atom 0))
;; start 100 go processes taking numbers from number-chan
;; and adding them to the sum Atom
(dotimes [_ 100]
(go
(loop []
(let [number (async/<! number-chan)]
(swap! sum + number))
(recur))))
;; create a go process that adds the numbers 0-1 million
;; to the channel
(go
(doseq [x (range 1000000)]
(async/>! number-chan x)))
You take from a Channel with the function
<!
and put to a
Channel with >!
. If
you try to take and there is no value, your Process will "park" and wait
for a value. If you put and there is no room for new values, your
Process will "park" and wait for room on the Channel. Because things
wait, it means you can coordinate. You can say "Let's each go do some
work and meet back here when we're done."
core.async uses Communicating Sequential Processes (CSP), which is an algebra for building concurrent systems. It's the same abstraction that the Go programming language uses.
This is just a very short introduction to a very big topic. You can learn more about core.async from these resources:
- Clojure core.async: a presentation by Rich Hickey at Strange Loop.
- Clojure core.async: my course on the topic.
- core.async Patterns: an advanced course where you learn interesting patterns for using core.async.
- Mastering Concurrent Processes with core.async: The excellent chapter from Clojure for the Brave and True.
Threads
You can create Threads in Java very easily by constructing the
java.lang.Thread
class. Java has the concept of a Runnable
, which is an interface with
a method called run()
that returns nothing. If you want to run
something in another thread, you pass a Runnable
to the constructor of
a Thread
, then start it. Luckily, Clojure has thought of this. You can
use a function of zero arguments directly as a Runnable
.
;; create a Thread
(def thread (Thread. (fn [] ;; 0 arguments
;; this will run in a new Thread
(println 1 2 3))))
;; the thread won't run until you start it
(.start thread)
Threads are easy to use in Java, but there are some things to keep in mind. First of all, Threads cannot be stopped from the outside. The only way to stop a Thread is for the function you pass it to reach the end of execution. That means if you create an infinite loop, that Thread will run forever. Just be aware!
So how do you stop a JVM Thread? You have to code it to watch for a signal. Maybe you set up a Promise that it checks. When the Promise is delivered, the Thread will stop executing.
Which brings us to the second point, there is no built-in way to communicate with a Thread. The Threads share memory, so they have access to all of the objects in scope. So you can use something like a Promise or a core.async channel.
Finally, there's no way to communicate out of the Thread. The return value of your function is discarded. If you do need to get a value out, you can put the value in a Promise, or otherwise store it in a Ref or Atom. For heavy-duty communication between Threads, consider core.async.
Wait. There's one more thing:
The JVM won't exit until all Threads are finished. When you're running
T
hreads and you want to shut down the JVM, you have to call
System.exit()
. In Clojure, that looks like this:
(System/exit 0) ;; 0 means success, it's the Unix exit code
- Defining and Starting a Thread: A tutorial from Oracle.
- Java Threads: A lesson in my JVM course about starting and stopping threads.
Excutor Service
The Java standard library contains something called an
ExecutorService
. If you have a bunch of similar tasks that need to be
run in parallel, ExecutorService
is your friend.
What does it do?
You set up a thread pool and a queue feeding that thread pool. Those
make up your ExecutorService
. You hand the ExecutorService
"tasks".
The tasks get queued up and pulled off by the threads in the pool,
executing them in parallel.
Tasks are just instances of Callable
. Since in Clojure functions are
instances of Callable
, it's super easy to just pass it functions of no
arguments.
Let's see some code:
(import 'java.util.concurrent.ExecutorService)
(import 'java.util.concurrent.Executors)
;; create a thread pool with 4 threads
(def service (Executors/newFixedThreadPool 4))
;; submit a task and save the Future
(def f (.submit ^ExecutorService service ;; we need to hint to tell it
^Callable (fn [] ;; to use the Callable version
;; do some work
...)))
;; block on the Future
(println @f)
Clojure Futures are also run in a similar way. If you need custom
control of the thread pool, you can do it yourself with an
ExecutorService
.
Manifold
Manifold is an interesting library from Zach Tellman. It is used as part of the pervasive asynchrony found in Aleph, the high-throughput, asynchronous networking library.
Th e interesting thing about Manifold is that it provides another level of indirection which captures the essence of Futures, Promises, core.async Channels, and even RxJava and ReactiveStreams.
If you're going to be doing a lot of asynchronous communication between your threads, Manifold could be for you.
pmap
I had to include
clojure.core/pmap
here,
just for completeness. It's a parallel implementation of
clojure.core/map
. However, you should note that it's very naïve. It is
still lazy, like regular map
, so it won't begin executing until a
value is needed. It keeps just ahead. However, the function you pass it
does get run in other Threads. If it is computationally intensive, you
may want to try pmap
. Add one more letter and it makes your code
parallel.
Reducers
Clojure reducers is a great library for executing things in parallel. Under the hood, it uses Fork/Join. It also is custom-tuned to work with all of Clojure's built-in data structures.
To execute things in parallel with reducers, use
clojure.core.reducers/fold
. It is like reduce
except it has two
functions: one for reducing and one for combining. The collection you
pass it will be broken into chunks. Each chunk will be reduced with the
reducing function. The results are combined using the combining
function.
(require '[clojure.core.reducers :as r])
(def numbers (vec (range 1000000)))
;; sum numbers in parallel
;; when combining function and reducing function are the same,
;; you can use this arity
(r/fold + numbers)
;; sum only even numbers
;; r/filter does not create intermediate lists
(r/fold + (r/filter even? numbers))
The reducers library comes with Clojure. The main benefit is that fold
can run using Fork/Join, and so efficiently use many cores. In addition,
the standard sequence operations like map
, filter
, drop
, take
,
and cat
(concatenate) do not create intermediate sequences
as they do
with the clojure.core
versions. They are specifically built to work in
parallel using Fork/Join.
Fork/Join
Another system built into the Java standard library is called Fork/Join. Fork/Join is a way to break up a job into tiny tasks that get distributed to all of your cores. Fork/Join then reassembles the pieces into an answer.
Let's write our own summation system again.
(import 'java.util.concurrent.RecursiveTask)
(import 'java.util.concurrent.ForkJoinPool)
(def pool (ForkJoinPool.))
(defn summation [numbers]
(proxy [RecursiveTask] []
(compute []
(if (<= (count numbers) 512)
;; if the vector is small enough,
;; we just reduce over them
(reduce + 0 numbers)
;; otherwise, we split the vector roughly in two
;; and recursively run two more tasks
(let [half (quot (count numbers) 2)
f1 (summation (subvec numbers 0 half))
f2 (summation (subvec numbers half))]
;; do half the work in a new thread
(.fork f2)
;; do the other half in this thread and combine
(+ (.compute f1) (.join f2)))))))
(defn sum [numbers]
(.invoke pool (summation (vec numbers))))
(def answer (sum (range 1000000)))
If we sum a great big list of numbers, it will get split into two halves
until the list is 512 items or less. When we split, we recursively call
.fork
on one half, which will queue up the task for another thread.
Meanwhile, since we have the thread, we can continue computing the first
half. .join
will block on the second half that is executing in another
thread.
I know this isn't the best way to sum numbers, since addition doesn't really care about order. But it is a good way to split up work done on an associative operation. You will need to tune the size of the work done in one thread to make it worth it to fork. In general, you probably will want to use Clojure red ucers for this kind of work. See below.
The secret to understanding concurrency and parallelism
A common question is "What is the difference between concurrency and parallelism?". If it's confusing you, don't worry. They're almost the same and the distinction is mostly academic. However, you're probably already used to thinking about concurrency and parallelism. We actually encounter the difference all the time in the real world.
Let's look at an example: bank tellers.
Imagine you walk into a bank. You see a row of tellers and a line of people waiting. How many bank transactions can happen at the same time? Easy. It's the same as the number of tellers. The parallelism of the bank is how many things can happen at the exact same time. If there are four tellers, four things can happen at the same time.
Even though there are only four tellers, all of those clients waiting in line will be helped before the end of the day. They're basically competing over the scarce resource of the teller's time. But because of the concurrency system set up (the queue), they all know that their business will be handled eventually.
Definitions
concurrency: ability to share resources (such as CPUs, RAM, databases, etc.) safely and efficiently
parallelism: adding more resources to get the answer faster
Thanks to Brian Goetz for these definitions.
Want another example?
How about a bathroom shared among roommates. How many people can use it at once? One. But somehow using a lock, the eight roommates can share it.
We can increase the parallelism by adding bathrooms. If you've got ten bathrooms but only eight roommates, we've actually got more parallelism than concurrency. The work of using the bathroom can't really be broken down any more to take advantage of those two extra rooms. You can't go to the bathroom faster by using two bathrooms.
Let's go back down to one bathroom.
With eight roommates sharing one bathroom, can you imagine maybe one roommate doesn't get their fair share of bathroom time? Maybe they are slower running to the bathroom when it's vacant. Those roommates could use a better system for sharing that bathroom than a simple lock. This is where concurrency primitives come in. Concurrency primitives are simple tools that you can use to build concurrent systems that have the properties you need to solve your problem.
Want some real-world examples of concurrency primitives?
- Locks
- Queues
- Schedules
- Conveyor belts
We'll see more in a bit, where we'll catalog the ones that you get with Clojure.
But first, we saw how to increase parallelism. But how do you increase concurrency? We already know you can't increase concurrency in the bathroom example. The "unit of work" can't be split up any more. One person can't use two bathrooms at the same time!
But you can do it in the bank example.
Let's say you have 100 bank tellers (100 parallelism) and 10 clients in line. Obviously, no one will have to wait---except for those 90 tellers who are not busy! Is there anything we can do to keep them busy to make the client's bank experience go faster?
Yes. Because if you ask those clients, each one has multiple transactions. One person has four checks to deposit. Another wants to do three separate transfers, etc. The bank has set up their work so that each transaction is entirely independent. You can hand each teller a check to deposit (with the deposit slip) and all of your work can be done at the same time. One way to increase concurrency is to break down the work to be done into independent chunks. You'd be surprised how much can be broken up without much work.
For instance:
Let's say you have to fetch 20 web pages. Each GET request is independent of the others, so you can do them all at the same time. No problem.
That's an easy example of independent work. We'll get deeper into what it means to be "independent" later, because there are different types of independence .
But that's for later.
Let's dive into the computer world and see the basics that Clojure and the JVM give us.
Threads and the JVM
In one computer, the parallelism is the number of cores. Each core can execute instructions independently and run at full speed. The JVM does not give you direct access to these cores. You have to go through the operating system, which gives you processes and threads. These are both concurrency mechanisms so that different programs can share the limited number of cores.
The JVM lets you create native operating system threads. All of the threads share memory.
So let's look at this more closely:
As far as the JVM is concerned, you can have unlimited threads. And they're all able to read and write to the same memory. Obviously, all of those threads will need some help working together.
So let's give them some help!
Let's get into the concurrency tools we've got in Clojure.
Immutable data structures
The first one is kind of not that obvious because it's everywhere in Clojure. But it's one of those important things that makes all of the other concurrency stuff possible.
What is it?
The immutable data structures. Clojure uses immutable data by default. Why is this important? Let's imagine a bank where you could reuse checks. Let's write all the information in pencil. That could work, with a lot of discipline. Or you could make checks one-time-use (you create one, use it, then throw it away).
Or what if we wrote the amount of money in your account in chalk on a chalkboard. Whenever you deposit a check, we just erase the old number and write the new number. Sounds great, right?
No! It sounds terrible. Change banks immediately!
Or how about a bathroom shared by eight people with no lock! You can do it. Just be disciplined. Knock every time. What could go wrong? Start looking for a new place to live (or run to the hardware store to buy a lock).
It sounds ridiculous, but that's basically what we do all the time when we use mutable data structures. You can still write concurrent systems, but your job is harder and your success depends on discipline instead of easy rules like "wait your turn in line".
So let's all just take a moment and appreciate the calming and concurrent nature of immutable data structures.
Now we can move on to the next tool that really helps us.
Pure functions
The next thing that really helps with concurrency is using pure functions. Clojure doesn't help much with this, except that it makes it easy to make pure functions. But you don't get pure functions by default. And Clojure can't tell you whether a function is pure.
What is a pure function?
A pure function gives you the same return value for the same inputs all the time. You can call it whenever you want as many times as you want and you'll always get the same return value.
Wait! What things can make a function return different things?
What if the function reads a global mutable variable?
What if the function asks for user input?
What if the function fetches something from the network?
What if the function does different things depending on the time?
Or what if the function generates a different random number each time?
Basically, they can't depend on anything that changes. The time changes. The network changes. The user changes.
There's one more thing about pure functions you have to know about: not only can they not depend on changing stuff, pure functions can't change anything themselves.
Let's look at some examples of changing stuff:
Printing to the screen.
Posting to a web server.
Writing to disk.
Changing a global variable.
Sending an email.
And the classic: launching a missile. 🚀
Why are pure functions important?
Because they're easy to understand. They don't depend on the history of the program. They always act the same. They're like a rock solid friend---you can always trust them.
But there's another reason.
Many concurrency primitives will sometimes call your function multiple times. They need to. Why?
Let's look at an exampl e.
Imagine you're working with a stock market agent. You've got some money and want to invest. The agent comes to you, tells you how much money you have and the current stock price. You do some mental arithmetic and write down your order. The agent runs back to the market but there's so much competition, the price changes before the order gets put in. So the agent comes back.
And what do they ask?
The same question, but with different arguments. Here's your budget now, here's the price now. You plug them into the same mental function, and get a new order.
Sometimes it happens. Things change between when you calculate a thing and when the thing needs to happen. So you recalculate. But it's better to give that agent an order first in case it gets in and recalculate if it doesn't. That's called "optimistic".
What would happen if your function wasn't pure? It would get called many times and have more than one effect. It would print out something many times instead of one. Or it might launch two missiles instead of one. Oops. Keep it pure, folks.
Okay, there's just one more idea before we get to the catalog of primitives. It's an important idea, but I understand if you want to skip ahead. In fact, skip around as much as you want!
Independence
We talked before about being able to break up tasks into smaller tasks so you can get more concurrency and take advantage of more cores. How small do things need to be?
The short answer is "as small as possible". Why?
Let's say we have twenty people who have to carry 100 rocks of different sizes up a hill. They've each got a backpack. How do you distribute the rocks among the twenty backpacks so you can carry them all? It's like a game of Tetris. You could spend all day trying to figure out how to fit them all optimally. Most of the time, you end up with some rocks that just won't fit. So you move them around a little, trying to find some extra space somewhere. But it's a lot of work and most of the time you can't fit them all in. This is a well-k nown problem called the Knapsack Problem.
What's the solution?
Break the rocks up. If you break them into gravel, you can literally pour them into the backpacks. And it's really easy. You don't have to think about it. But then you get to the end, and some still don't fit.
What's the solution?
Break them up even more! If you break them into sand, there's less empty space between them. And you can fill them up even more.
What does this have to do with concurrency?
It turns out that the same thing happens with tasks on your cores. If you have big tasks, you have to spend a lot of time figuring out which core would run which task to optimize the execution. But with tiny tasks, you don't think at all. You just pour them in wherever they fit. And more will fit! So break up your jobs into independent tasks and you will be able to run on more cores.
There's one caveat, though.
Sometimes you can break your task up into such small pieces, it's not worth the overhead of sending it to another core. For instance, adding two numbers is really fast on a computer. You might as well do it in the current thread instead of sending it to another machine. When you send it to another machine, you have to make a message, put it in a queue, and wait for the queue to process. It's not worth it! For many parallel execution examples, you'll see simple operations like addition used, just to make it easy to understand, even if you wouldn't really use that example in real life. However, in real-world calculations, you often have the opposite problem, which is that tasks are too big. And those could be broken up more.
So what does independent mean?
There are different types of independence. We've already looked at one type of independence. Pure functions are independent of time. It doesn't matter how many times you call them or when you call them. They always give the same answer. So that's one independence.
What's another?
Well, another one we could look at is order independence. For instance, if I need to fetch 100 different websites, it doesn't really matter what order I do them in. It does matter when I do the requests, because I could make the request when the server is down, or the page changes over time.
But the order doesn't matter. The answer from server A does not depend on the answer from server B. I could do them in any order.
And that's really important.
Let's look at an example. What if you're shopping at a grocery store with ten checkout lines? If you finish shopping before another person, does that mean you're going to finish checking out before them? No! If you're anything like me, you tend to choose the wrong line. That person who finished after you gets lucky and you watch them finish before you even started.
Ugh. It sucks.
But let's look at it analytically: each checkout is independent of the order. You still get the same answer. Everyone got their food. Everyone got paid. You got the same result. But what wasn't independent? You can't checkout before you shop!
Order independence is really strong. You can't always guarantee it, but when you can, you have a lot of options for breaking up that work. In this case, there are ten checkout centers, each with their own queue. The queues can move independently.
Mathematicians call order independence commutativity. Here's an example:
a + b + c = b + a + c
That is, addition is commutative. Why does commutativity matter? Because you can't always guarantee the order. If you divide up tasks among four cores, you can't guarantee they'll finish in the same order. If you need order, you won't be able to divide the tasks up.
Another type of independence?
Grouping. Grouping is a little hard to explain, so I'll need a nice example. I hope I won't disappoint.
Let's say I want to build the longest horizontal lego tower in the known universe. So to get started, I lay out all the legos in the order I want them. The color pattern, that is, the order is important. However, if I keep the order of the pieces, does it matter which ones I connect first? Not really. I'll get the same tower. Mathematicians call this "grouping". It's like in this equation:
a + (b + c) = (a + b) + c
The grouping is the parentheses. The letters are in the same order, but the operations happen differently. It's a subtle but important difference. Mathematicians call this associativity.
Why is this important?
Well, if I'm really going to make the longest lego tower in the known universe, I need to get some help. After laying out the legos, I can get some friends to help. All I have to do is to roughly divide up sections of legos among my friends, we can all connect up our legos (maintaining order!), then connect up the sections when we're done.
Because I can group them however I want and get the same answer, it lets me be flexible dividing up the work. I know I'll be able to put the pieces back together and get the same answer.
There are more types of independence, like idempotence. Idempotence means that if something happens twice (or more), it's the same as it happening once. So, for instance, if you add the same value to a set twice, it's the same as it happening once. Why is this important?
Well, here's another example. You're probably familiar with it.
Have you ever been on a site where you buy something and after you click "Buy" it yells at you:
"Don't click the buy button twice! It will charge you twice!"
Wouldn't the site be better with less yelling and more idempotence? Yes. The "submit payment" operation should be idempotent so you don't have to yell at the nice people trying to give you money. If you accidentally click twice, it should only charge you once.
Conclusion and further help
Concurrency is a skill that takes time to learn. If you have a problem that you can't solve with this guide, I want to help you! You can ask any question during my office hours. Or send me a question and I'll answer it.