Token Buckets with core.async
Summary: Token Bucket is a simple algorithm for rate limiting a resource. It's easy to understand because you can reason about it in terms of real-world objects. core.async makes this algorithm very clear and easy.
Token + Bucket^1
You know you've got a good abstraction when you find lots of algorithms that are easy and clear to write using them. One algorithm that I really like because it's practical and simple is called the Token Bucket. It's used for rate limiting a resource. And it's very easy to write in core.async.
Let me digress a little. Here in the US, we have a complicated voting system. Not only do your presidential votes count only indirectly, but the counting is done with machines that vary from state to state. Some of those systems are so complicated. It's a little scary because it's hard to know how those machines work.
I mean, relatively hard. It's obviously possible to know, if you study enough. But compare it to papers in a box. I know there's all sorts of room for corruption in the papers in a box system. But it's understandable by children. Well, as long as they can count.
The token bucket is similarly simple. There's a bucket. You put tokens in it at a certain rate (for instance, once per hour). If you have a token, you can take an action (ride the subway, send a packet, read a file), then you lose the token (it's good for one ride). If you don't have a token, you can wait. Everything slows down to the rate of tokens falling into the bucket.
It's so simple. It so easily corresponds to a real-world situation you can imagine. I love that type of algorithm.
Let's build Token Bucket in core.async.
First, we need a bucket. For that, we'll use a core.async channel with a buffer. Let's just start with size 10.
(def bucket (chan 10))
Bucket is done. Now, we need something to add tokens to the bucket at a given rate.
(go (while true (>! bucket :token) (<! (timeout 1000))))
That will add one token to the bucket every second.
We can rate limit an existing channel by forcing it to take a token before values get through.
(defn limit-rate [c] (let [out (chan)] (go (loop  (let [v (<! c)] (if (nil? v) ;; c is closed (close! out) (do (<! bucket) ;; wait for a token (>! out v) (recur)))))) out))
Ok, it's not that simple. There are two corner cases.
- What happens when nobody takes a token out of the bucket? Do you keep putting coins in?
The answer is yes. In the next hour, there are two tokens, so two can come through. But then . . .
- What do you do when the bucket gets really full and a whole bunch of people take tokens out at the same time?
Well, you let them all through. One token, one ride. There's no other coordination. And that means it's really important to choose the size of your bucket.
The number of tokens your bucket can hold is called the burstiness. It's because when the bucket is full, you could get a rampage of people trying to get on the subway. How many people should be allowed through at that point? The burstiness is the maximum that should come through at a time.
We have our two parameters: the rate and the burstiness. Let's incorporate all of that.
(defn limit-rate [c r b] (let [bucket (chan b) ;; burstiness out (chan)] (go (while true (>! bucket :token) (<! (timeout (int (/ 1000 r)))))) ;; rate (go (loop  (let [v (<! c)] (if (nil? v) ;; channel is closed (close! out) (do (<! bucket) ;; wait for a token (>! out v) (recur)))))) out))
The burstiness is taken care of in the size of the buffer. The buffer will fill up if no one takes a token. Since we're using blocking buffers, putting tokens into a full bucket will block until something takes a token---exactly as the algorithm describes.
Well, that's it. It's easy. And now we have a way to limit the rate of a channel. We can use it to limit the rates of other things, too, like a function call or access to a database. I use it to rate limit an API.
core.async makes this algorithm nice and easy to use. There's a library that does this for you in a very convenient package. It's called Throttler. Bruno Vecchi has done the work of making this work well as a library. If you'd like to learn core.async, I recommend my LispCast Clojure core.async videos. It's a gentle and fun introduction to a great topic. You will learn everything you need to write Token Bucket and more!