The new async/await functionality in Python (available since 3.5) gives a more linear style to asynchronous applications, but requires the programmer to have an understanding of blocking/non-blocking actions. A single call to a blocking function in an otherwise asynchronously application can cause widespread problems. For example, in the case of a web server using AIOHTTP a call to a blocking function prevents any other requests being served.
Sometimes calling slow, blocking functions is unavoidable. For example, when calling a CPU-intensive function in an external library via ctypes
. If this call is made from the main thread running the asynchronous event loop then no other events will be processed until the function returns. This is experienced by users as the application “hanging”.
Consider the following example, which uses time.sleep
to represent a blocking function that takes a long time to return1. The function waits for a user-specified number of seconds, then returns a short message.
import time
def blocking_func(seconds: int) -> str:
time.sleep(seconds)
return f"Waited for {seconds} second(s)"
A simple HTTP server which calls the blocking function is given in the next snippet. This works well for a single client, but if multiple requests arrive at the same time then they will be processed synchronously. The messages returned will be as expected (based on the input), but the time taken to respond will include any time spent waiting for other requests to finish.
import asyncio
from aiohttp import web
def create_app() -> web.Application:
app = web.Application()
app.add_routes([web.get("/", view_page)])
return app
async def view_page(request: web.Request) -> web.Response:
seconds = int(request.query.get("seconds", 5))
result = blocking_func(seconds)
return web.Response(text=result)
In the example below the program is modified to offload the execution of the blocking function to a group of worker threads. This allows the application to continue serving other requests asynchronously. The changes to the original are highlighted.
-
A
ThreadPoolExecutor
is initalised increate_app
which creates three worker threads. -
The
view_page
function callsloop.run_in_executor
, passing the blocking function and number of seconds to sleep, which returns anasyncio.Future
which is awaited to get the actual result.
The execution of view_page
will not proceed until blocking_func
returns, but will yield to allow other coroutines to execute. This allows multiple requests to be served at the same time.
import asyncio
from aiohttp import web
from concurrent.futures import ThreadPoolExecutor
def create_app() -> web.Application:
app = web.Application()
app.add_routes([web.get("/", view_page)])
app["executor"] = ThreadPoolExecutor(max_workers=3)
return app
async def view_page(request: web.Request):
seconds = int(request.query.get("seconds", 5))
executor = request.app["executor"]
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_func, seconds)
return web.Response(text=result)
In this example there are only three worker threads. What happens when a fourth request is received while the three workers are already busy? The request to execute the blocking function is placed in a queue and will not start until a worker becomes available.
This places a limit on the amount of work in progress and prevents the server from becoming overloaded. Even if your application can accept 10,000 requests at the same time you may not want to process them all at once, especially if the function is resource intensive.
-
There is a non-blocking equivalent to
time.sleep
available asasyncio.sleep
↩︎