Python, de la zero Lecția 40 / 60

asyncio: cand async I/O chiar merita

Ce iti aduce asyncio, pattern-urile care functioneaza si capcana amestecarii codului sync cu async.

Async Python are reputația de a fi greu, iar cea mai mare parte a acelei reputații e câștigată în situațiile greșite. Oamenii apucă asyncio fiindcă sună rapid, lovesc un perete când scriptul lor CPU-bound nu se accelerează și concluzionează că tot lucrul e o capcană. Nu e o capcană, e o unealtă cu o lamă îngustă. Astăzi aflăm unde taie efectiv lama: muncă I/O-bound și, mai exact, muncă I/O-bound în care ai multe lucruri care așteaptă pe rețea în același timp.

Aceasta e prima dintre cele trei lecții care închid Modulul 7, data engineering. După asta ne uităm la orchestratori (Airflow, Prefect, Dagster), apoi construim un pipeline real.

Rezumatul într-o singură frază

asyncio e pentru concurență I/O: multe operații care în mare parte stau așteptând să se întoarcă ceva pe un socket. Fetch la 100 de URL-uri, citire dintr-un pool Postgres, vorbit cu Redis, scraping la un API lent. Dacă bottleneck-ul tău e așteptarea, async ajută. Dacă bottleneck-ul tău e calculul, o buclă strânsă care înmulțește numere, parsing JSON pe hot path, procesare de imagini, async nu ajută și vrei multiprocessing sau, în 3.13, free-threaded Python (revenim la asta).

Motivul pentru care async ajută la I/O e mecanic. Când codul face requests.get(url), threadul OS stă blocat până ajunge răspunsul. Cu async, await client.get(url) eliberează event loop-ul în timp ce kernelul așteaptă pe socket; loop-ul rulează alte coroutine până când ceva are date. Un thread, un proces, sute de cereri în zbor.

Bazele: async def, await, asyncio.run

import asyncio
import httpx

async def fetch(url: str) -> int:
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.status_code

async def main() -> None:
    code = await fetch("https://example.com")
    print(code)

asyncio.run(main())

Trei lucruri de observat. async def declară o coroutine; apelarea ei returnează un obiect coroutine, încă nu rulează. await e ce o rulează efectiv și e punctul de suspendare unde event loop-ul poate comuta la altă sarcină. asyncio.run(main()) e punctul de intrare: creează un event loop, rulează main până la finalizare și demontează loop-ul. În Python 3.13 aceasta e calea canonică; aproape niciodată nu mai creezi manual un loop.

Exemplul de mai sus e async, dar nu concurent: e un singur fetch. Câștigul vine când ai multe.

asyncio.gather: I/O paralel

async def fetch_many(urls: list[str]) -> list[int]:
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [r.status_code for r in responses]

gather programează toate coroutine-le, le așteaptă pe toate și returnează rezultatele lor în ordine. Dacă ai 100 de URL-uri, vei trage 100 de cereri aproape simultan, fiecare așteptând pe propriul socket, iar întreg lucrul se termină aproximativ în timpul celei mai lente cereri singulare, nu în suma lor.

Acesta e momentul în care oamenii se îndrăgostesc de async. E și momentul în care, accidental, fac DDoS pe un API. Ceea ce ne aduce la pattern-ul următor.

asyncio.Semaphore: limitează-ți concurența

Aproape fiecare API are un rate limit. Aproape fiecare bază de date are un plafon de pool de conexiuni. „Trage 1000 de cereri odată” e o cale grozavă să fii banat, throttled sau să epuizezi file descriptors. Folosește un semafor.

async def fetch_with_limit(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str) -> int:
    async with sem:
        response = await client.get(url)
        return response.status_code

async def fetch_many(urls: list[str], concurrency: int = 10) -> list[int]:
    sem = asyncio.Semaphore(concurrency)
    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [fetch_with_limit(client, sem, url) for url in urls]
        return await asyncio.gather(*tasks)

async with sem lasă cel mult concurrency coroutine-uri în blocul protejat odată; restul așteaptă. Acum poți să-i dai 1000 de URL-uri și doar 10 sunt în zbor în orice moment. Reglează concurrency la cât tolerează serviciul tău țintă: 10 e conservator, 50 e rezonabil pentru un API prietenos, 100+ dacă chiar știi ce e de cealaltă parte.

asyncio.Queue: producător / consumator

Când munca nu e o listă fixă, un flux de joburi care sosesc în timp, sau muncă generată de altă muncă, o coadă se potrivește mai bine decât gather.

async def producer(queue: asyncio.Queue, urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
    for _ in range(NUM_WORKERS):
        await queue.put(None)  # sentinel per worker

async def consumer(queue: asyncio.Queue, client: httpx.AsyncClient, results: list) -> None:
    while True:
        url = await queue.get()
        if url is None:
            queue.task_done()
            break
        response = await client.get(url)
        results.append((url, response.status_code))
        queue.task_done()

NUM_WORKERS = 10

async def main(urls: list[str]) -> list[tuple[str, int]]:
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)
    results: list = []
    async with httpx.AsyncClient() as client:
        workers = [asyncio.create_task(consumer(queue, client, results)) for _ in range(NUM_WORKERS)]
        await producer(queue, urls)
        await asyncio.gather(*workers)
    return results

Pattern-ul: un producător pune muncă într-o coadă mărginită; N consumatori trag din ea. Mărginită ca producătorul să nu poată umple memoria mai repede decât consumatorii o golesc. Sentinelele None sunt cea mai simplă cale de a spune consumatorilor să se oprească; în producție ai folosi anulare, dar pattern-ul cu sentinel e bun pentru scripturi.

Async context managers și async iterators

Lecția 6 a acoperit with și iteratorii. Async are echivalentele. Orice face I/O la intrare sau ieșire devine async with:

async with httpx.AsyncClient() as client:
    ...
async with asyncpg.create_pool(dsn) as pool:
    ...

Orice transmite date leneș devine async for:

async with httpx.AsyncClient() as client:
    async with client.stream("GET", url) as response:
        async for chunk in response.aiter_bytes():
            process(chunk)

Astea nu sunt concepte separate, sunt versiunile cu aromă async ale pattern-urilor pe care deja le cunoști. Cuvântul cheie async în față înseamnă „asta poate suspenda loop-ul în timp ce intrăm / ieșim / iterăm”.

Bibliotecile care contează

Nu poți face await requests.get(...), requests e sync. Tot rostul async-ului e ca tot ce e în lanț să fie non-blocking. Așa că ecosistemul are un set paralel de biblioteci:

  • HTTP: httpx (sync și async într-o singură bibliotecă, implicitul modern), aiohttp (async-first, mai vechi, încă excelent pentru servere).
  • Postgres: asyncpg e cel mai rapid, punct. Sau psycopg 3 cu API-ul async.
  • Redis: redis-py are async încorporat din 4.x.
  • MongoDB: motor, driverul async oficial.
  • S3 / cloud: aioboto3, aiobotocore.
  • Fișiere: aiofiles, dar I/O pe disc de obicei nu e unde vrei async, vezi secțiunea următoare.

Dacă o bibliotecă de care ai nevoie nu are versiune async, ai două opțiuni: rulează versiunea sync într-un thread de worker, sau renunță complet la async.

Capcana amestecării sync / async

Aici se rănesc oamenii. Două reguli.

Regula unu: nu apela cod sync care blochează din interiorul unei coroutine. Dacă funcția ta async apelează time.sleep(5) sau requests.get(url), întregul event loop se oprește pentru acele 5 secunde. Orice altă sarcină în zbor se oprește. Toată iluzia se prăbușește.

# Rau, blocheaza event loop-ul
async def fetch(url):
    return requests.get(url).text

# Bun, foloseste un client async
async def fetch(url):
    async with httpx.AsyncClient() as client:
        return (await client.get(url)).text

Regula doi: dacă chiar trebuie să apelezi cod sync, împinge-l într-un thread.

import asyncio

def expensive_sync_thing(path: str) -> bytes:
    with open(path, "rb") as f:  # blocking file I/O
        return f.read()

async def main():
    data = await asyncio.to_thread(expensive_sync_thing, "big.bin")

asyncio.to_thread rulează funcția în pool-ul implicit de threaduri și îți returnează un awaitable. Event loop-ul continuă să ruleze alte sarcini în timp ce threadul își face munca care blochează. Folosește asta pentru: operații de filesystem, drivere sync de baze de date pe care nu le poți înlocui, muncă CPU care e mică (munca CPU mai mare ar trebui să meargă la multiprocessing sau un process pool).

În direcția cealaltă, apelarea async din sync, ai nevoie de asyncio.run, dar doar dacă nu rulează deja un loop. În interiorul unui loop existent, nu apelezi asyncio.run din nou; faci await direct. Confuzia aici e responsabilă pentru jumătate din întrebările „RuntimeError: This event loop is already running” de pe Stack Overflow.

asyncio vs threading vs multiprocessing vs no-GIL

Pentru 2026, tabloul e:

  • asyncio: concurență I/O, single-threaded. Cel mai bun când ai multe operații de rețea concurente și un stack complet async.
  • threading: concurență I/O fără colorarea async; merge cu biblioteci sync. GIL-ul înseamnă că nu ajută codul CPU-bound în CPython standard.
  • multiprocessing: paralelism real pentru muncă CPU-bound, cu costul overhead-ului de proces și al serializării.
  • Free-threaded Python (3.13+, opt-in): un build fără GIL. Threadurile pot rula bytecode Python în paralel pe bune. Încă experimental, încă are compromisuri de performanță pe calea single-threaded, dar aterizează. Pentru muncă CPU-bound în 2026, acesta e viitorul; pentru concurență I/O, nu schimbă mult calculul, async rămâne mai curat.

Regula degetului: I/O-bound și mult, async. CPU-bound, procese (sau no-GIL când e stabil). I/O-bound și mic, scrie cod sync și nu te mai îngrijora.

Când să NU folosești asyncio

Majoritatea scripturilor Python. Un job batch nocturn care lovește un API și scrie un CSV nu are nevoie de async. Un web scraper care aduce 5 pagini nu are nevoie de async. Un pipeline de date care citește un fișier Parquet și scrie în Postgres nu are nevoie de async.

Taxa async e reală: îți colorează semnăturile funcțiilor, complică testarea, face stack traces mai greu de citit și forțează fiecare bibliotecă pe care o folosești să fie ori async, ori bridgeable la async. Plătește taxa aia când concurența e ideea. Sări peste când nu e.

Un exemplu real: 1000 de endpoint-uri, politicos

Pentru încheiere, scriptul canonic:

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str) -> dict:
    async with sem:
        try:
            response = await client.get(url, timeout=10)
            return {"url": url, "status": response.status_code, "len": len(response.content)}
        except httpx.HTTPError as e:
            return {"url": url, "status": None, "error": str(e)}

async def main(urls: list[str]) -> list[dict]:
    sem = asyncio.Semaphore(20)
    async with httpx.AsyncClient(http2=True) as client:
        tasks = [fetch(client, sem, url) for url in urls]
        return await asyncio.gather(*tasks)

if __name__ == "__main__":
    urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
    results = asyncio.run(main(urls))
    ok = sum(1 for r in results if r["status"] == 200)
    print(f"{ok}/{len(results)} succeeded")

Asta e forma. Semafor pentru politețe, un singur client pentru reutilizarea conexiunilor, excepții prinse per-task ca un eșec să nu scufunde tot batch-ul, rezultate agregate la final. Rulează asta pe 1000 de URL-uri și vei vedea ceva ca 30 de secunde timp total la un job care, secvențial, ar lua 20 de minute.

Lecția următoare: când nici măcar un script async deștept nu e suficient și ai nevoie de un orchestrator real. Airflow, Prefect, Dagster și cum să alegi.


Citări: docs asyncio, docs httpx async, PEP 703, making the GIL optional. Consultat 2026-05-01.

Caută