Python, dalle fondamenta Lezione 40 / 60

asyncio: quando l'I/O async paga davvero

Cosa ti compra asyncio, i pattern che funzionano, e la trappola di mischiare codice sync e async.

Python async ha la fama di essere difficile, e gran parte di quella fama viene guadagnata nelle situazioni sbagliate. La gente prende asyncio perché suona veloce, sbatte contro un muro quando il loro script CPU-bound non accelera, e conclude che tutta la cosa sia una trappola. Non è una trappola, è uno strumento con una lama stretta. Oggi capiamo dove la lama taglia davvero: lavoro I/O-bound, e in particolare lavoro I/O-bound dove hai molte cose in attesa sulla rete contemporaneamente.

Questa è la prima di tre lezioni che chiudono il Modulo 7, data engineering. Dopo questa guardiamo gli orchestrator (Airflow, Prefect, Dagster), e poi costruiamo una pipeline reale.

Il riassunto in una frase

asyncio è per la concorrenza I/O: molte operazioni che per la maggior parte del tempo stanno ferme ad aspettare che qualcosa torni indietro su un socket. Fetchare 100 URL, leggere da un pool Postgres, parlare con Redis, scrapare un’API lenta. Se il tuo collo di bottiglia è aspettare, async aiuta. Se il tuo collo di bottiglia è calcolare, un loop stretto che moltiplica numeri, parsing di JSON in un hot path, image processing, async non aiuta, e vuoi multiprocessing o, nella 3.13, free-threaded Python (ci torniamo).

Il motivo per cui async aiuta per l’I/O è meccanico. Quando il codice fa requests.get(url), il thread del SO sta bloccato finché la risposta non arriva. Con async, await client.get(url) rilascia l’event loop mentre il kernel aspetta sul socket; il loop fa girare altre coroutine finché qualcosa ha dati. Un thread, un processo, centinaia di richieste in volo.

Le basi: async def, await, asyncio.run

import asyncio
import httpx

async def fetch(url: str) -> int:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.status_code

async def main() -> None:
    code = await fetch("https://example.com")
    print(code)

asyncio.run(main())

Tre cose da notare. async def dichiara una coroutine; chiamarla restituisce un oggetto coroutine, non gira ancora. await è quello che la fa girare davvero, ed è il punto di sospensione dove l’event loop può passare a un altro task. asyncio.run(main()) è il punto d’ingresso: crea un event loop, fa girare main fino al completamento, e smonta il loop. In Python 3.13 questa è la via canonica; non crei quasi mai un loop manualmente.

L’esempio sopra è async ma non concorrente, c’è un solo fetch. La vittoria arriva quando ne hai molti.

asyncio.gather: I/O parallelo

async def fetch_many(urls: list[str]) -> list[int]:
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [r.status_code for r in responses]

gather schedula tutte le coroutine, le aspetta tutte, e ritorna i loro risultati in ordine. Se hai 100 URL, sparerai 100 request grosso modo in una volta sola, ognuna in attesa sul proprio socket, e l’intera cosa finisce all’incirca nel tempo della singola request più lenta, non nella loro somma.

Questo è il momento in cui la gente si innamora di async. È anche il momento in cui DDoS-ano accidentalmente un’API. Il che ci porta al pattern successivo.

asyncio.Semaphore: limita la tua concorrenza

Quasi ogni API ha un rate limit. Quasi ogni database ha un soffitto sul connection pool. “Spara 1000 request in una volta sola” è un ottimo modo per essere bannato, throttlato, o per esaurire i file descriptor. Usa un semaforo.

async def fetch_with_limit(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str) -> int:
    async with sem:
        response = await client.get(url)
        return response.status_code

async def fetch_many(urls: list[str], concurrency: int = 10) -> list[int]:
    sem = asyncio.Semaphore(concurrency)
    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [fetch_with_limit(client, sem, url) for url in urls]
        return await asyncio.gather(*tasks)

async with sem lascia entrare al massimo concurrency coroutine nel blocco protetto in una volta sola; le altre aspettano. Adesso puoi dargli 1000 URL e solo 10 sono in volo in qualsiasi momento. Tara concurrency su qualsiasi cosa il servizio target tolleri: 10 è conservativo, 50 è ragionevole per un’API amichevole, 100+ se sai davvero cosa c’è dall’altra parte.

asyncio.Queue: producer / consumer

Quando il lavoro non è una lista fissa, uno stream di job che arrivano nel tempo, o lavoro generato da altro lavoro, una queue calza meglio di gather.

async def producer(queue: asyncio.Queue, urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
    for _ in range(NUM_WORKERS):
        await queue.put(None)  # sentinel per worker

async def consumer(queue: asyncio.Queue, client: httpx.AsyncClient, results: list) -> None:
    while True:
        url = await queue.get()
        if url is None:
            queue.task_done()
            break
        response = await client.get(url)
        results.append((url, response.status_code))
        queue.task_done()

NUM_WORKERS = 10

async def main(urls: list[str]) -> list[tuple[str, int]]:
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)
    results: list = []
    async with httpx.AsyncClient() as client:
        workers = [asyncio.create_task(consumer(queue, client, results)) for _ in range(NUM_WORKERS)]
        await producer(queue, urls)
        await asyncio.gather(*workers)
    return results

Il pattern: un producer mette il lavoro su una queue limitata; N consumer la tirano. Limitata così il producer non può riempire la memoria più velocemente di quanto i consumer la drenano. I sentinelli None sono il modo più semplice per dire ai consumer di fermarsi; in produzione useresti la cancellation, ma il pattern del sentinel va bene per gli script.

Async context manager e async iterator

La lezione 6 ha coperto with e gli iteratori. Async ha gli equivalenti. Tutto ciò che fa I/O all’ingresso o all’uscita diventa async with:

async with httpx.AsyncClient() as client:
    ...
async with asyncpg.create_pool(dsn) as pool:
    ...

Tutto ciò che fa streaming di dati pigramente diventa async for:

async with httpx.AsyncClient() as client:
    async with client.stream("GET", url) as response:
        async for chunk in response.aiter_bytes():
            process(chunk)

Non sono concetti separati, sono le versioni async-flavored di pattern che già conosci. La keyword async davanti significa “questo può sospendere il loop mentre stiamo entrando / uscendo / iterando”.

Le librerie che contano

Non puoi await requests.get(...), requests è sync. Tutto il punto di async è che ogni cosa nella catena sia non-bloccante. Quindi l’ecosistema ha un set parallelo di librerie:

  • HTTP: httpx (sync e async in una libreria sola, il default moderno), aiohttp (async-first, più vecchio, ancora ottimo per i server).
  • Postgres: asyncpg è il più veloce, punto. Oppure psycopg 3 con la API async.
  • Redis: redis-py ha l’async incorporato dalla 4.x.
  • MongoDB: motor, il driver async ufficiale.
  • S3 / cloud: aioboto3, aiobotocore.
  • File: aiofiles, ma l’I/O su disco di solito non è dove vuoi async, vedi la sezione successiva.

Se una libreria che ti serve non ha una versione async, hai due opzioni: far girare la versione sync in un worker thread, o saltare async del tutto.

La trappola del mischiare sync / async

Qui è dove la gente si fa male. Due regole.

Regola uno: non chiamare codice sync bloccante da dentro una coroutine. Se la tua funzione async chiama time.sleep(5) o requests.get(url), l’intero event loop si ferma per quei 5 secondi. Ogni altro task in volo si mette in pausa. Tutta l’illusione collassa.

# Male, blocca l'event loop
async def fetch(url):
    return requests.get(url).text

# Bene, usa un client async
async def fetch(url):
    async with httpx.AsyncClient() as client:
        return (await client.get(url)).text

Regola due: se devi assolutamente chiamare codice sync, spingilo in un thread.

import asyncio

def expensive_sync_thing(path: str) -> bytes:
    with open(path, "rb") as f:  # I/O su file bloccante
        return f.read()

async def main():
    data = await asyncio.to_thread(expensive_sync_thing, "big.bin")

asyncio.to_thread fa girare la funzione nel thread pool di default e ti restituisce un awaitable. L’event loop continua a far girare altri task mentre il thread fa il suo lavoro bloccante. Usalo per: operazioni sul filesystem, driver di database sync che non puoi sostituire, lavoro CPU che è piccolo (lavoro CPU più grosso dovrebbe andare in multiprocessing o un process pool).

Andando nella direzione opposta, chiamare async da sync, serve asyncio.run, ma solo se nessun loop è già in esecuzione. Dentro un loop esistente, non chiami asyncio.run di nuovo; fai await direttamente. La confusione qui è responsabile di metà delle domande “RuntimeError: This event loop is already running” su Stack Overflow.

asyncio vs threading vs multiprocessing vs no-GIL

Per il 2026, il quadro è:

  • asyncio: concorrenza I/O, single-threaded. Migliore quando hai molte operazioni di rete concorrenti e uno stack interamente async.
  • threading: concorrenza I/O senza la colorazione async; funziona con librerie sync. Il GIL significa che non aiuta il codice CPU-bound nello CPython standard.
  • multiprocessing: vero parallelismo per lavoro CPU-bound, al costo dell’overhead di processo e della serializzazione.
  • Free-threaded Python (3.13+, opt-in): una build no-GIL. I thread possono far girare bytecode Python in parallelo davvero. Ancora sperimentale, ha ancora trade-off di performance sul percorso single-threaded, ma sta atterrando. Per il lavoro CPU-bound nel 2026, questo è il futuro; per la concorrenza I/O, non cambia molto i conti, async è ancora più pulito.

Regola del pollice: I/O-bound e tanto, async. CPU-bound, processi (o no-GIL quando stabile). I/O-bound e piccolo, scrivi codice sync e smettila di preoccuparti.

Quando NON usare asyncio

La maggior parte degli script Python. Un batch job notturno che colpisce un’API e scrive un CSV non ha bisogno di async. Uno scraper web che fetcha 5 pagine non ha bisogno di async. Una pipeline di dati che legge un file Parquet e scrive su Postgres non ha bisogno di async.

La tassa async è reale: colora le firme delle tue funzioni, complica il testing, rende gli stack trace più difficili da leggere, e costringe ogni libreria che usi a essere o async o ponteggiabile ad async. Pagala quando la concorrenza è il punto. Saltala quando non lo è.

Un esempio reale: 1000 endpoint, educatamente

Per chiudere, lo script canonico:

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str) -> dict:
    async with sem:
        try:
            response = await client.get(url, timeout=10)
            return {"url": url, "status": response.status_code, "len": len(response.content)}
        except httpx.HTTPError as e:
            return {"url": url, "status": None, "error": str(e)}

async def main(urls: list[str]) -> list[dict]:
    sem = asyncio.Semaphore(20)
    async with httpx.AsyncClient(http2=True) as client:
        tasks = [fetch(client, sem, url) for url in urls]
        return await asyncio.gather(*tasks)

if __name__ == "__main__":
    urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
    results = asyncio.run(main(urls))
    ok = sum(1 for r in results if r["status"] == 200)
    print(f"{ok}/{len(results)} succeeded")

Quella è la forma. Semaforo per l’educazione, client singolo per il riuso della connessione, eccezioni catturate per task così un singolo fallimento non affonda l’intero batch, risultati aggregati alla fine. Fallo girare su 1000 URL e vedrai qualcosa come 30 secondi totali su un job che, in sequenza, ci metterebbe 20 minuti.

Lezione successiva: quando anche uno script async astuto non basta, e ti serve un orchestrator vero. Airflow, Prefect, Dagster, e come scegliere.


Citations: asyncio docs, httpx async docs, PEP 703 — making the GIL optional. Retrieved 2026-05-01.

Cerca