Eight different ways to implement an asyncronous loop in python
asyncio was first added to the python standard library more than 10 years ago. Asynchronous I/O had already been possible before that, by using libraries such as twisted or gevent. But asyncio was an attempt to bring the community together and standardize on a common solution.
So far this didn’t really work out for me. Each time I have to work with asyncio I get frustrated. I find myself longing for the simplicity of callbacks in JavaScript.
But maybe I just don’t understand asyncio properly yet. I learn best by trying to recreate the thing I want to learn about. So in this post I will retrace the history of asynchronous programming. I will concentrate on python, but I guess much of this translates to other languages. Hopefully this will allow me to better understand and appreciate what asyncio is doing. And hopefully you will enjoy accompanying me on that journey.
If you are interested, all eight implementations are available on github.
Setup
The following script outputs random numbers at random intervals:
#!/bin/bash
while true; do
sleep $(($RANDOM % 5))
echo $RANDOM
done
I will run two instances of that script in parallel:
= subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE) proc2
The processes will start immediately and run in parallel to our python code. We have to make sure to stop them when the program exits, e.g. because we press Ctrl-C:
def cleanup():
proc1.terminate()
proc2.terminate()
proc1.wait() proc2.wait()
We cannot use proc.stdout.readline()
because it blocks
until a complete line is available. So here is some code that helps us
to get the last complete line we received:
class LineReader:
def __init__(self, file):
self.file = file
self.buffer = b''
self.line = ''
def read_line(self):
= os.read(self.file.fileno(), 1024)
chunk if not chunk:
raise ValueError
self.buffer += chunk
= self.buffer.split(b'\n')
lines if len(lines) > 1:
self.line = lines[-2].decode('utf-8')
self.buffer = lines[-1]
= LineReader(proc1.stdout)
reader1 = LineReader(proc2.stdout) reader2
What we want to do is to always render the latest complete line from each process as well as the current time:
def render():
= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
now print(' '.join([now, reader1.line, reader2.line]))
With these basics out of the way, we can start with the loop itself.
Implementation 1: Blocking Loop
Putting it all together we get our first implementation: The blocking loop:
import datetime
import os
import subprocess
class LineReader:
def __init__(self, file):
self.file = file
self.buffer = b''
self.line = ''
def read_line(self):
= os.read(self.file.fileno(), 1024)
chunk if not chunk:
raise ValueError
self.buffer += chunk
= self.buffer.split(b'\n')
lines if len(lines) > 1:
self.line = lines[-2].decode('utf-8')
self.buffer = lines[-1]
def cleanup():
proc1.terminate()
proc2.terminate()
proc1.wait()
proc2.wait()
def render():
= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
now print(' '.join([now, reader1.line, reader2.line]))
= subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc2
= LineReader(proc1.stdout)
reader1 = LineReader(proc2.stdout)
reader2
try:
while True:
for reader in [reader1, reader2]:
reader.read_line()
render()finally:
cleanup()
In this version, reader.read_line()
will block until
data is available. So it will first wait for data from
proc1
, then wait for data from proc2
, then
render, repeat. This is not really async yet.
Implementation 2: Busy Loop
import fcntl
def set_nonblock(fd):
= fcntl.fcntl(fd, fcntl.F_GETFL)
fl | os.O_NONBLOCK)
fcntl.fcntl(fd, fcntl.F_SETFL, fl
set_nonblock(proc1.stdout.fileno())
set_nonblock(proc2.stdout.fileno())
try:
while True:
for reader in [reader1, reader2]:
try:
reader.read_line()except BlockingIOError:
pass
render()finally:
cleanup()
These are just the parts of the code that changed. I used
fnctl
to set the file descriptor to non-blocking mode. In
this mode, os.read()
will raise a
BlockingIOError
if there is nothing to read. This is great
because we cannot get stuck on a blocking read. However, this loop will
just keep trying and fully saturate the CPU. This is called a busy loop
and obviously not what we want.
Implementation 3: Sleepy Loop
import time
try:
while True:
for reader in [reader1, reader2]:
try:
reader.read_line()except BlockingIOError:
pass
1)
time.sleep(
render()finally:
cleanup()
By simply adding a sleep()
we get the benefits of both
of the first two implementation: We cannot get stuck on a blocking read,
but we also do not end up in a busy loop. This is still far from perfect
though: If data arrives quickly we introduce a very noticeable delay of
1 second. And if data arrives slowly we wake up much more often than
would be needed. We can adjust the sleep duration to the specific case,
but it will never be perfect.
Implementation 4: Select Loop
import selectors
= selectors.DefaultSelector()
selector
selector.register(proc1.stdout, selectors.EVENT_READ, reader1)
selector.register(proc2.stdout, selectors.EVENT_READ, reader2)
try:
while True:
for key, mask in selector.select(10):
key.data.read_line()
render()finally:
cleanup()
What we actually want to do is sleep until one of the file
descriptors is ready. That is exactly what selectors
are for. There are different system calls that can be used to implement
a selector. It got its name from select
, but nowadays your
are more likely to use epoll
. The selectors module will
automatically pick the best option.
In the code above I also added a timeout of 10 seconds to the select call. This allows us to update the time even if none of the file descriptors become available for some time.
So with this implementation we have our first real async loop, and from here on out we will stick with selectors and only restructure the code surrounding them.
Implementation 5: Callback Loop
class Loop:
def __init__(self):
self.selector = selectors.DefaultSelector()
self.times = []
def set_timeout(self, callback, timeout):
= time.time()
now self.times.append((callback, now + timeout))
def set_interval(self, callback, timeout):
def wrapper():
callback()self.set_timeout(wrapper, timeout)
self.set_timeout(wrapper, 0)
def register_file(self, file, callback):
self.selector.register(file, selectors.EVENT_READ, callback)
def unregister_file(self, file):
self.selector.unregister(file)
def run(self):
while True:
= time.time()
now = min((t - now for _, t in self.times), default=None)
timeout
for key, mask in self.selector.select(timeout):
key.data()
= []
keep = time.time()
now for callback, t in self.times:
if t < now:
callback()else:
keep.append((callback, t))self.times = keep
def callback1():
try:
reader1.read_line()except ValueError:
loop.unregister_file(proc1.stdout)
render()
def callback2():
try:
reader2.read_line()except ValueError:
loop.unregister_file(proc2.stdout)
render()
= Loop()
loop
loop.register_file(proc1.stdout, callback1)
loop.register_file(proc2.stdout, callback2)10)
loop.set_interval(render,
try:
loop.run()finally:
cleanup()
This implementation improves on the previous one by being much more modular. You can register files with callbacks that will be executed whenever the file is ready. There is also a much more sophisticated system for timeouts and intervals, similar to what you might know from JavaScript.
Aside: Everything is a File
So far our loops can react to files and timeouts, but is that enough? My first hunch is that in unix, “everything is a file”, so this should get us pretty far. But let’s take a closer look.
I was surprised to learn that processes have not been files in unix for the longest time. So much for that saying. There have been some hacky workarounds. But to my knowledge, the first proper solution is
pidfd_open
which was first included in Linux 5.3 in 2019.Signals can interrupt your program at any time, e.g. in the middle of writing bits to stdout. If you are not careful, this can mess up your state. Luckily there is a simple solution you can use to integrate signals into your select loop: The self-pipe trick:
import signal def register_signal(sig, callback): def on_signal(*args): b'.') os.write(pipe_w, def wrapper(): 1) os.read(pipe_r, callback() = os.pipe() pipe_r, pipe_w signal.signal(sig, on_signal) loop.register_file(pipe_r, wrapper)
Network connections use sockets which can be used with select. Unfortunately, most libraries that implement specific network protocols (e.g. HTTP) are not really reusable because they do not expose the underlying socket. Some years ago there was a push to create more reusable protocol implementations which produced the hyper project. Unfortunately it didn’t really gain traction.
Another issue with reusing existing code is that python likes to buffer a lot. This can have surprising effects when the selector tells you that the underlying file descriptor is empty, but there is still data available in the python buffer.
Implementation 6: Generator Loop
We are getting closer to asyncio, but there is still a lot of conceptual ground to cover. Before we get to async/await, we have to talk about generators.
Motivation
As I said in the introduction, I personally really like the callback approach. It is simple, just a selector and some callback functions. Compared to that I find asyncio with its coroutines and tasks and futures and awaitables and transports and protocols and async iterators and executors just confusing.
But I recently read Nathaniel J. Smith’s posts on trio, an alternative async loop for python, and I must admit that there are some solid arguments for async/await there. It boils down to this:
Splitting asynchronous execution into a setup and a callback phase does more harm then good.
Let’s look at an example:
In all the code samples so far I created subprocesses and made sure that they are terminated when the process exits:
= subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc
try:
do_something()finally:
proc.terminate() proc.wait()
Now let’s say this is not the whole program, but just one function.
And let’s further say that do_something()
is
asynchronous:
def foo():
= subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc
def callback():
proc.terminate()
proc.wait()
10) loop.set_timeout(callback,
We have a couple of issues here:
callback()
gets executed directly byloop.run()
, so if it raises an exception the stack trace will not containfoo()
.- We need to share state (in this case
proc
) between setup and callback. In this case I did that by using a closure, but that’s not very elegant. - There is no way to use a
try … finally
here, so if there is an exception in the gap between setup and callback, the cleanup code is never executed. This is the big one.
Functions are a curious concept that allows us to get stack traces, easily share state, and do cleanup. The callback approach tries to get by without these benefits. The async/await approach instead tries to keep them by allowing to pause the execution of functions.
The yield expression
The yield expression has been part of python since PEP 255 (2001) and got extended considerably in PEP 342 (2005). It allows to pause execution of a function and hand control back to the caller.
In its simplest form it can be used in a for loop:
def foo():
print('yielding 1')
yield 1
print('yielding 2')
yield 2
for x in foo():
print(x)
# yielding 1
# 1
# yielding 2
# 2
Another common use is to define context managers:
from contextlib import contextmanager
@contextmanager
def bar():
= subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc try:
yield proc
finally:
proc.terminate()
proc.wait()
with bar() as proc:
do_something()
A function that uses yield is called a generator function. Instead of a normal value it returns a generator. The first example can be rewriting roughly like this:
class FooGenerator:
def __init__(self):
self.state = 0
def __iter__(self):
return self
def __next__(self):
if self.state == 0:
self.state += 1
return 1
elif self.state == 1:
self.state += 1
return 2
else:
raise StopIteration()
= iter(FooGenerator())
gen while True:
try:
= next(gen)
x print(x)
except StopIteration:
break
It is important to distinguish these two conceptual layers. For
example, raising StopIteration
only makes sense in a
generator, not in a generator function.
There are a few more things you can do with generators:
returning from an iterator sets
StopIteration.value
:def foo(): yield return 2 = foo() generator next(generator) # execute up to the first yield try: next(generator) except StopIteration as e: print(e.value) # 2
generator.send(data)
passes control back to the generator, but it also gives the yield expression a value:def foo(): = yield data print(data) = foo() generator next(generator) # execute up to the first yield 'test1') # prints 'test1' and raises StopIteration generator.send(
generator.throw(exc)
passes control back to the generator, but it makes the yield expression raise an exception:def foo(): while True: try: yield except TypeError: print('type error') = foo() generator next(generator) # execute up to the first yield TypeError) # prints 'type error' generator.throw(ValueError) # raises ValueError generator.throw(
generator.close()
is likegenerator.throw(GeneratorExit)
yield from foo
is likefor item in foo: yield item
For a more in-depth discussion of generators I can recommend the introduction to async/await by Brett Cannon.
The Loop
import datetime
import os
import selectors
import subprocess
import time
= selectors.DefaultSelector()
selector = ['', '', '']
data
class LineReader:
def __init__(self, file):
self.file = file
self.buffer = b''
self.line = ''
def read_line(self):
= os.read(self.file.fileno(), 1024)
chunk if not chunk:
raise ValueError
self.buffer += chunk
= self.buffer.split(b'\n')
lines if len(lines) > 1:
self.line = lines[-2].decode('utf-8')
self.buffer = lines[-1]
class Task:
def __init__(self, gen):
self.gen = gen
self.files = set()
self.times = set()
self.done = False
self.result = None
def set_result(self, result):
self.done = True
self.result = result
def init(self):
try:
self.files, self.times = next(self.gen)
except StopIteration as e:
self.set_result(e.value)
def wakeup(self, files, now):
try:
if self.done:
return
elif any(t < now for t in self.times) or files & self.files:
self.files, self.times = self.gen.send((files, now))
except StopIteration as e:
self.set_result(e.value)
def close(self):
self.gen.close()
def run(gen):
= Task(gen)
task try:
task.init()while not task.done:
= time.time()
now = min((t - now for t in task.times), default=None)
timeout = {key.fileobj for key, mask in selector.select(timeout)}
files
task.wakeup(files, time.time())return task.result
finally:
task.close()
def sleep(t):
yield set(), {time.time() + t}
def gather(*generators):
= [Task(gen) for gen in generators]
subtasks try:
for task in subtasks:
task.init()while True:
= set().union(
wait_files *[t.files for t in subtasks if not t.done]
)= set().union(
wait_times *[t.times for t in subtasks if not t.done]
)= yield wait_files, wait_times
files, now for task in subtasks:
task.wakeup(files, now)if all(task.done for task in subtasks):
return [task.result for task in subtasks]
finally:
for task in subtasks:
task.close()
def render():
0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
data[print(' '.join(data))
def popen(cmd, i):
= subprocess.Popen(cmd, stdout=subprocess.PIPE)
proc = LineReader(proc.stdout)
reader
selector.register(proc.stdout, selectors.EVENT_READ)try:
while True:
yield {proc.stdout}, set()
reader.read_line()= reader.line
data[i]
render()except ValueError:
pass
finally:
selector.unregister(proc.stdout)
proc.terminate()
proc.wait()
def clock():
while True:
yield from sleep(10)
render()
def amain():
yield from gather(
'./random.sh'], 1),
popen(['./random.sh'], 2),
popen([
clock(),
)
run(amain())
This is the complete code and not just the changed bits because there are so many changes all over the place. I hope with all the buildup it doesn’t seem too crazy. Still there are some new concepts that I will try to expand on.
First, note how all the setup and teardown for each individual
subprocess is now bundled in popen()
. This is exactly what
I was talking about before: We can now trivially do cleanup. This is
made possible by the little yield expression that pokes a hole in this
function somewhere in the middle.
The terminals on the one end of the communication are expressions
like yield {file}, set()
or
yield set(), {timeout}
. This means “pause this generator
until this condition is met”.
On the other end of the communication there is run()
which will figure out which files are available and send that up the
chain.
In between there is gather()
which takes the information
from both ends and figures out which of its subtasks should be unpaused.
Most other function just pass through the messages by using
yield from
.
All this is mediated by Task
objects which keep track of
the conditions and state of generators.
Implementation 7: async/await Loop
From here it is a small step to async/await. Generators that are used for asynchronous execution have already been called “coroutines” in PEP 342. PEP 492 (2015) deprecated that approach in favor of “native coroutines” and async/await.
Native coroutines are not really different from generator-based
coroutines. You can still get the underlying generator by calling
coro.__await__()
. New syntax was introduced to keep the two
concepts apart: Iterators use yield
, coroutines use
async/await
. These two code snippets are more or less
identical:
async def foo():
await sleep(10)
class FooCoroutine:
def __await__(self):
return (yield from sleep(10).__await__())
This is a minor change, but still it changes the syntax all over the place. So here is the full async/await implementation:
import datetime
import os
import selectors
import subprocess
import time
= selectors.DefaultSelector()
selector = ['', '', '']
data
class LineReader:
def __init__(self, file):
self.file = file
self.buffer = b''
self.line = ''
def read_line(self):
= os.read(self.file.fileno(), 1024)
chunk if not chunk:
raise ValueError
self.buffer += chunk
= self.buffer.split(b'\n')
lines if len(lines) > 1:
self.line = lines[-2].decode('utf-8')
self.buffer = lines[-1]
return self.line
class AYield:
def __init__(self, value):
self.value = value
def __await__(self):
return (yield self.value)
class Task:
def __init__(self, coro):
self.gen = coro.__await__()
self.files = set()
self.times = set()
self.done = False
self.result = None
def set_result(self, result):
self.done = True
self.result = result
def init(self):
try:
self.files, self.times = next(self.gen)
except StopIteration as e:
self.set_result(e.value)
def wakeup(self, files, now):
try:
if self.done:
return
elif any(t < now for t in self.times) or files & self.files:
self.files, self.times = self.gen.send((files, now))
except StopIteration as e:
self.set_result(e.value)
def close(self):
self.gen.close()
def run(coro):
= Task(coro)
task try:
task.init()while not task.done:
= time.time()
now = min((t - now for t in task.times), default=None)
timeout = {key.fileobj for key, mask in selector.select(timeout)}
files
task.wakeup(files, time.time())return task.result
finally:
task.close()
async def sleep(t):
await AYield((set(), {time.time() + t}))
async def gather(*coros):
= [Task(coro) for coro in coros]
subtasks try:
for task in subtasks:
task.init()while True:
= set().union(
wait_files *[t.files for t in subtasks if not t.done]
)= set().union(
wait_times *[t.times for t in subtasks if not t.done]
)= await AYield((wait_files, wait_times))
files, now for task in subtasks:
task.wakeup(files, now)if all(task.done for task in subtasks):
return [task.result for task in subtasks]
finally:
for task in subtasks:
task.close()
def render():
0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
data[print(' '.join(data))
async def popen(cmd, i):
= subprocess.Popen(cmd, stdout=subprocess.PIPE)
proc = LineReader(proc.stdout)
reader
selector.register(proc.stdout, selectors.EVENT_READ)try:
while True:
await AYield(({proc.stdout}, set()))
reader.read_line()= reader.line
data[i]
render()except ValueError:
pass
finally:
selector.unregister(proc.stdout)
proc.terminate()
proc.wait()
async def clock():
while True:
await sleep(10)
render()
async def amain():
await gather(
'./random.sh'], 1),
popen(['./random.sh'], 2),
popen([
clock(),
)
run(amain())
Implementation 8: asyncio
So which kinds of loop does asyncio use? After reading PEP 3156 I would say: That’s complicated.
At the core, asyncio is a simple callback loop. The relevant
functions are called add_reader(file, callback)
and
call_later(delay, callback)
.
But then asyncio adds a second layer using async/await. A simplified version looks roughly like this:
import asyncio
class Future:
def __init__(self):
self.callbacks = []
self.result = None
self.execution = None
self.done = False
def _set_done(self):
self.done = True
for callback in self.callbacks:
self)
callback(
def set_result(self, result):
self.result = result
self._set_done()
def set_exception(self, exception):
self.exception = exception
self._set_done()
def add_done_callback(self, callback):
self.callbacks.append(callback)
def __await__(self):
yield self
class Task:
def __init__(self, coro):
self.gen = coro.__await__()
def wakeup(self, future=None):
try:
if future and future.exception:
= self.gen.throw(future.exception)
new_future else:
= next(self.gen)
new_future self.wakeup)
new_future.add_done_callback(except StopIteration:
pass
async def sleep(t):
= Future()
future None)
loop.call_later(t, future.set_result, await future
async def amain():
print('start')
try:
await sleep(5)
loop.stop()finally:
print('finish')
= asyncio.new_event_loop()
loop = Task(amain())
task
task.wakeup() loop.run_forever()
When we call task.wakeup()
, the coroutine
amain()
starts executing. It prints 'foo'
,
creates a future, and tells the loop to resolve that future in 5
seconds. Then it yields that future back down to wakeup()
,
which registeres itself as a callback on the future. Now the loop starts
running, waits for 5 seconds, and then resolves the future. Because
wakeup()
was added as a callback, it is now called again
and passes control back into amain()
, which prints
'finish'
, stops the loop, and raises
StopIteration
.
In the earlier coroutine examples, I yielded files and timeouts as conditions. Since this version is hosted on a callback loop, it instead yields futures that wrap loop callbacks.
This approach works reasonably well. But I also see some issues with it.
Limited support for files
You may have noticed that I did not implement the full subprocess example this time. This is because asyncio’s coroutine layer doesn’t really support files.
Futures represent actions that are completed when the callback is called. File callbacks are called every time data is available for reading. This disconnect can probably be bridged somehow, but this post is already long enough and I didn’t want to go down yet another rabbit hole.
Futures are not a monad
If you know some JavaScript you have probably come across Promises. Promises are basically the JavaScript equivalent of Futures. However, they have a much nicer API. They are basically a monad, and every Haskell fan can give you an impromptu lecture about the awesomeness of monads. Consider the following snippets that do virtually the same:
Promise.resolve(1)
.then(x => x + 1)
.finally(() => console.log('done'));
import asyncio
def increment(future):
try:
+ 1)
future2.set_result(future.result() except Exception as e:
future2.set_exception(e)
def print_done(future):
print('done')
= asyncio.new_event_loop()
loop
= loop.create_future()
future1
future1.add_done_callback(increment)1)
future1.set_result(
= loop.create_future()
future2
future2.add_done_callback(print_done)
loop.run_until_complete(future2)
Naming Confusion
So far we have “Coroutines”, “Futures”, and “Tasks”. The asyncio
documentation also uses the term “Awaitables” for anything that
implements __await__()
, so both Coroutines and Futures are
Awaitables.
What really makes this complicated is that Task
inherits
from Future
. So in some places, Coroutines and Futures can
be used interchangably because they are both Awaitables – and in other
places, Coroutines and Futures can be used interchangably because
Coroutines can automatically be wrapped in Tasks which makes them
Futures.
I wonder whether it would have been better to call Tasks “CoroutineFutures” instead. Probably not. That makes them sound like they are a simple wrapper, when in fact they are the thing that is actually driving most of the coroutine layer.
In any case I believe the asyncio documentation could benefit from a
clear separation of layers. First should be a description of the high
level coroutine API including sleep()
and
gather()
. The second part could be about the callback
layer, including call_later()
and
add_reader()
. The third and final part could explain the
low level plumbing for those people who want to dive deep. This is the
only part that needs to mention terms like “Awaitable”, “Task”, or
“Future”.
Conclusion
These were eight different versions of asynchronous loops. I have certainly learned something. A bit about async primitives on linux and a lot about generators and coroutines in python. I hope this post serves as a helpful reference for future endeavors.
The big question remains: Which approach is better? The simple cleanup in the coroutine approach is a huge advantage, but it comes at the cost of significant complexity compared to callbacks. The thought that we have to limit ourselves to one of them is not great. So here’s to hoping we will someday find an approach that combines the benefits of both.