Threaded workers with AIOHTTP

Posted on

The new async/await functionality in Python (available since 3.5) gives a more linear style to asynchronous applications, but requires the programmer to have an understanding of blocking/non-blocking actions. A single call to a blocking function in an otherwise asynchronously application can cause widespread problems. For example, in the case of a web server using AIOHTTP a call to a blocking function prevents any other requests being served.

Sometimes calling slow, blocking functions is unavoidable. For example, when calling a CPU-intensive function in an external library via ctypes. If this call is made from the main thread running the asynchronous event loop then no other events will be processed until the function returns. This is experienced by users as the application “hanging”.

Consider the following example, which uses time.sleep to represent a blocking function that takes a long time to return1. The function waits for a user-specified number of seconds, then returns a short message.

import time

def blocking_func(seconds: int) -> str:
    time.sleep(seconds)
    return f"Waited for {seconds} second(s)"

A simple HTTP server which calls the blocking function is given in the next snippet. This works well for a single client, but if multiple requests arrive at the same time then they will be processed synchronously. The messages returned will be as expected (based on the input), but the time taken to respond will include any time spent waiting for other requests to finish.

import asyncio
from aiohttp import web

def create_app() -> web.Application:
    app = web.Application()
    app.add_routes([web.get("/", view_page)])
    return app

async def view_page(request: web.Request) -> web.Response:
    seconds = int(request.query.get("seconds", 5))
    result = blocking_func(seconds)
    return web.Response(text=result)

In the example below the program is modified to offload the execution of the blocking function to a group of worker threads. This allows the application to continue serving other requests asynchronously. The changes to the original are highlighted.

  • A ThreadPoolExecutor is initalised in create_app which creates three worker threads.

  • The view_page function calls loop.run_in_executor, passing the blocking function and number of seconds to sleep, which returns an asyncio.Future which is awaited to get the actual result.

The execution of view_page will not proceed until blocking_func returns, but will yield to allow other coroutines to execute. This allows multiple requests to be served at the same time.

import asyncio
from aiohttp import web
from concurrent.futures import ThreadPoolExecutor

def create_app() -> web.Application:
    app = web.Application()
    app.add_routes([web.get("/", view_page)])
    app["executor"] = ThreadPoolExecutor(max_workers=3)
    return app

async def view_page(request: web.Request):
    seconds = int(request.query.get("seconds", 5))
    executor = request.app["executor"]
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, blocking_func, seconds)
    return web.Response(text=result)

In this example there are only three worker threads. What happens when a fourth request is received while the three workers are already busy? The request to execute the blocking function is placed in a queue and will not start until a worker becomes available.

This places a limit on the amount of work in progress and prevents the server from becoming overloaded. Even if your application can accept 10,000 requests at the same time you may not want to process them all at once, especially if the function is resource intensive.


  1. There is a non-blocking equivalent to time.sleep available as asyncio.sleep ↩︎