Python, de la zero Lecția 39 / 60

Lucrul cu API-uri: requests, retries, rate limits

Trusa de unelte HTTP din Python, pattern-urile de retry care nu inrautatesc lucrurile si tratarea rate limit-urilor care te tine binevenit.

Cealaltă jumătate a ingestiei sunt API-urile. Serviciul HTTP al altcuiva are datele de care ai nevoie. Scrii un client Python, tragi pagini, scrii pe disc sau într-o bază de date, repeți mâine. Sună simplu. Prima versiune naivă merge mereu. A doua oară când o rulezi, API-ul are o zi proastă și descoperi că „simplu” are o coadă lungă de moduri de eșec: timeout-uri, 503-uri, 429-uri, token-uri expirate, paginare opacă, JSON care e în secret XML. Lecția asta e trusa de unelte pentru coada aia lungă.

Bibliotecile HTTP în 2026

Trei biblioteci pe care le vei întâlni.

requests: clasicul. Doar sync. Nu s-a schimbat mult de ani; nici nu trebuie. Cel mai simplu, mai lizibil cod HTTP pe care îl poți scrie în Python. Dacă scrii un script unic și nu îți pasă de concurență, requests rămâne o alegere perfect bună în 2026.

import requests

r = requests.get("https://api.example.com/orders", params={"limit": 100}, timeout=10)
r.raise_for_status()
data = r.json()

httpx: modern, sync și async, în mare parte compatibil drop-in cu requests. Suport HTTP/2 din start. Recomandarea standard pentru cod nou în 2026 și cea pe care o aleg implicit.

import httpx

with httpx.Client(timeout=10.0) as client:
    r = client.get("https://api.example.com/orders", params={"limit": 100})
    r.raise_for_status()
    data = r.json()

Versiunea async se citește aproape identic:

import asyncio
import httpx

async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=10.0) as client:
        responses = await asyncio.gather(*(client.get(u) for u in urls))
    return [r.json() for r in responses]

Când ai 200 de URL-uri de lovit și o cerere individuală durează 200ms, versiunea sync ia 40 de secunde. Versiunea async ia 2. Diferența asta e motivul pentru care există httpx.

aiohttp: doar async, opțiunea async mai veche. Tot excelentă, tot menținută, tot comună în producție. Dacă moștenești o codebase care o folosește, ești în regulă. Pentru cod nou aș alege httpx pentru unificarea sync/async.

Pentru lecția asta vom folosi httpx. Pattern-urile se traduc la requests aproape linie cu linie.

Bazele, făcute corect

Patru lucruri de făcut bine la fiecare cerere:

r = client.get(
    "https://api.example.com/orders",
    params={"since": "2026-04-01", "limit": 100},
    headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
    timeout=10.0,
)
r.raise_for_status()
data = r.json()
  • params: parametrii de query string ca dict. Nu concatena string-uri în URL-uri; httpx îi va encoda pentru tine.
  • headers: auth, accept, user-agent. Setează un User-Agent care identifică aplicația ta; unele API-uri resping pe cele goale.
  • timeout: întotdeauna setează-l. Implicitul în unele biblioteci e „așteaptă la nesfârșit”, ceea ce e o cale grozavă să-ți blochezi pipeline-ul în spatele unei conexiuni atârnate. 10 secunde e un punct de plecare rezonabil.
  • raise_for_status(): ridică o excepție pentru răspunsuri 4xx/5xx. Fără el, codul tău continuă vesel cu r.json() pe o pagină de eroare 500 și primești o eroare confuză de parsing JSON în loc de o eroare HTTP clară.

Un client care reutilizează conexiunile e semnificativ mai rapid decât a apela httpx.get direct de fiecare dată, fiindcă pune în pool conexiunile TCP:

with httpx.Client(
    base_url="https://api.example.com",
    headers={"Authorization": f"Bearer {token}"},
    timeout=10.0,
) as client:
    for endpoint in endpoints:
        r = client.get(endpoint)
        ...

Folosește clientul. Întotdeauna. E o linie în plus și o accelerare vizibilă pentru un API vorbăreț.

Retries cu tenacity

Unele eșecuri sunt tranzitorii. Serverul a sughițat. Conexiunea TCP a fost resetată. Un load balancer a flipuit. Răspunsul corect e „așteaptă o clipă și încearcă din nou”. Răspunsul greșit e „eșuează zgomotos către utilizator și pune o reluare manuală pe lista celui de la on-call”.

Biblioteca e tenacity. Decoratorul e @retry și compune câteva politici mai mici.

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)
import httpx

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type((httpx.TransportError, httpx.HTTPStatusError)),
    reraise=True,
)
def fetch(client: httpx.Client, url: str) -> dict:
    r = client.get(url)
    r.raise_for_status()
    return r.json()

Ce zice asta: încearcă până la 5 ori; așteaptă 1s, 2s, 4s, 8s între încercări (limitat la 60s); încearcă din nou la erori de rețea și erori HTTP; dacă tot eșuăm după 5 încercări, ridică din nou ultima excepție ca apelantul să vadă o eroare reală.

Matematica din spatele exponential backoff: fiecare retry dublează așteptarea. Motivul nu e superstiție, e că atunci când un serviciu e supraîncărcat, retry-ul imediat doar adaugă încărcare. Exponential backoff lasă coada să se golească. Adaugă un strop de jitter (variație aleatoare în timpul de așteptare) ca o mie de clienți să nu reîncerce toți în același instant:

from tenacity import wait_random_exponential

wait=wait_random_exponential(multiplier=1, max=60),

Asta e „exponențial până la 60s, cu jitter”. Documentația tenacity numește acesta implicitul recomandat pentru a vorbi cu servicii externe și e ce iau prima dată.

Ce să reîncerci, ce nu

Nu fiecare eroare merită un retry. Împărțirea e cam așa:

Retry: erori 5xx de server, 408 request timeout, 429 too many requests, erori de rețea (ConnectError, ReadTimeout, RemoteProtocolError). Astea sunt tranzitorii.

Nu retry: erori 4xx de client (cu excepția 408/429). 400 bad request înseamnă că cererea ta era malformată: retry-ul nu schimbă nimic. 401 înseamnă că token-ul tău e invalid: retry-ul doar bate la endpoint-ul de auth. 404 înseamnă că resursa nu există. 422 înseamnă că validarea a eșuat.

Tenacity îți dă granularitatea:

def is_retryable(exc: BaseException) -> bool:
    if isinstance(exc, httpx.TransportError):
        return True
    if isinstance(exc, httpx.HTTPStatusError):
        status = exc.response.status_code
        return status >= 500 or status in (408, 429)
    return False

@retry(
    stop=stop_after_attempt(5),
    wait=wait_random_exponential(multiplier=1, max=60),
    retry=retry_if_exception(is_retryable),
    reraise=True,
)
def fetch(client: httpx.Client, url: str) -> dict:
    r = client.get(url)
    r.raise_for_status()
    return r.json()

Acela e decoratorul de retry de calitate de producție. Fură-l.

Rate limits: respectă răspunsul

Când un API îți spune să încetinești, încetinește. Majoritatea API-urilor semnalează asta cu HTTP 429 și un header Retry-After:

HTTP/1.1 429 Too Many Requests
Retry-After: 30

Onorează-l:

def fetch(client: httpx.Client, url: str) -> dict:
    while True:
        r = client.get(url)
        if r.status_code == 429:
            wait_s = float(r.headers.get("Retry-After", "5"))
            log.warning("rate limited, sleeping %ss", wait_s)
            time.sleep(wait_s)
            continue
        r.raise_for_status()
        return r.json()

Asta e podeaua: citește header-ul, dormi, reîncearcă. Tavanul e throttling proactiv: limitează-te singur înainte ca API-ul să fie nevoit. Dacă API-ul permite 100 cereri pe minut, rulează clientul tău la 90/minut și nu vei vedea niciodată un 429.

Biblioteca limits îți dă un rate-limiter curat:

from limits import RateLimitItemPerMinute
from limits.storage import MemoryStorage
from limits.strategies import MovingWindowRateLimiter

storage = MemoryStorage()
limiter = MovingWindowRateLimiter(storage)
quota = RateLimitItemPerMinute(90)

def fetch(client: httpx.Client, url: str) -> dict:
    while not limiter.hit(quota, "api.example.com"):
        time.sleep(0.1)
    r = client.get(url)
    r.raise_for_status()
    return r.json()

Sau scrie-ți propriul token bucket: sunt cam 30 de linii. Ideea e gestionarea conștientă a rate-ului. API-urile te limitează fiindcă cineva din echipa lor de ops a fost trezit la 3 dimineața din cauza unui client care se purta urât. Nu fi acel client.

Unde e posibil, batch. Dacă API-ul are un endpoint bulk (POST /orders/lookup care ia 100 de ID-uri), folosește-l în loc de 100 de GET-uri individuale.

Pattern-uri de paginare

Trei aromuri pe care le vei întâlni, în ordinea cam crescătoare a plăcutului.

Offset/limit: ?offset=0&limit=100, apoi ?offset=100&limit=100, etc. Clasicul, dar are un defect: dacă rândurile sunt inserate în timp ce paginezi, poți rata sau duplica rânduri. Acceptabil pentru dataset-uri statice, dubios pentru cele live.

def paginate_offset(client: httpx.Client, url: str, limit: int = 100):
    offset = 0
    while True:
        r = client.get(url, params={"offset": offset, "limit": limit})
        r.raise_for_status()
        page = r.json()["data"]
        if not page:
            return
        yield from page
        offset += len(page)

Cursor-based: API-ul returnează un token opac next_cursor; îl pasezi înapoi ca să obții pagina următoare. Stabil peste scrieri, implicitul modern.

def paginate_cursor(client: httpx.Client, url: str):
    cursor: str | None = None
    while True:
        params = {"cursor": cursor} if cursor else {}
        r = client.get(url, params=params)
        r.raise_for_status()
        body = r.json()
        yield from body["data"]
        cursor = body.get("next_cursor")
        if not cursor:
            return

Link header (RFC 5988): API-ul pune URL-ul paginii următoare în header-ul HTTP Link. GitHub folosește asta. Mai puțin comun, dar elegant: nu construiești URL-ul următor, doar îl urmezi.

def paginate_link(client: httpx.Client, url: str):
    while url:
        r = client.get(url)
        r.raise_for_status()
        yield from r.json()
        # httpx parses Link headers into r.links
        next_link = r.links.get("next")
        url = next_link["url"] if next_link else None

Citește documentația, alege-l pe cel potrivit, scrie un generator. Generatoarele sunt forma ideală aici: apelantul nu trebuie să știe câte pagini sunt sau să-i pese de granițele paginilor.

Autentificare

Variațiile:

  • Chei API în headere: cea mai comună și locul corect pentru ele. Authorization: Bearer <token> sau un header custom precum X-API-Key.
  • Chei API în query string-uri: unele API-uri legacy fac asta. Evită unde e posibil: query string-urile ajung în log-urile de access ale serverului și în istoricul browserului.
  • OAuth 2.0 client credentials: pentru server-la-server. POST la un endpoint de token, primești înapoi un access token, îl folosești o oră, refresh.
  • OAuth 2.0 authorization code: pentru flow-uri „acționează în numele unui utilizator”. Redirecționări de browser, scope-uri, refresh tokens. Nu e ce vrei pentru job-uri batch.

Un mic wrapper de refresh-token pentru flow-ul client-credentials:

import time
from dataclasses import dataclass

@dataclass
class Token:
    value: str
    expires_at: float

class TokenManager:
    def __init__(self, client_id: str, client_secret: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self._token: Token | None = None

    def get(self, client: httpx.Client) -> str:
        if self._token and self._token.expires_at > time.time() + 60:
            return self._token.value
        r = client.post(self.token_url, data={
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        })
        r.raise_for_status()
        body = r.json()
        self._token = Token(body["access_token"], time.time() + body["expires_in"])
        return self._token.value

Bufferul de 60 de secunde e intenționat: face refresh puțin înainte de expirare ca să nu primești un 401 la mijlocul unei cereri pentru că ceasul a deviat.

Webhook-uri vs polling

O paranteză rapidă. Dacă API-ul oferă webhook-uri („vom face POST la URL-ul tău când se întâmplă ceva”), aproape întotdeauna sunt mai bune decât polling-ul. Te oprești din a bate un API cu „ceva nou?” de douăsprezece ori pe oră și lași sursa să împingă când chiar sunt noutăți. Costul e să rulezi un mic server HTTP care să le primească. Pentru integrări de volum mare sau latență mică, asta e calea.

Polling-ul e bun pentru job-uri batch de frecvență joasă, sau când nu controlezi infrastructura ca să primești webhook-uri, sau când API-ul nu le suportă.

O notă despre clienți API generați de AI

Asistenții AI sunt excelenți la producerea de boilerplate pentru genul ăsta de cod. „Scrie-mi un client httpx cu retry-uri tenacity care paginate prin cursor și gestionează rate limits” îți dă 80% dintr-un client funcțional în 15 secunde. Decoratoarele de retry, buclele de paginare, flow-urile de auth: toate foarte pattern-shaped, toate lucruri pe care AI le face aproape perfect.

Capcana: asistenții AI uneori inventează nume de endpoint-uri care nu există. Au văzut zece mii de clienți API și au pattern-matched pe al tău cu unul similar și vor produce cu încredere cod care apelează GET /api/v2/users/me/orders când API-ul real are GET /v2/customer/orders. Verifică întotdeauna numele de endpoint-uri, numele de parametri și formele de răspuns față de documentația API reală înainte să livrezi. Tratează output-ul AI-ului ca o schiță a structurii, nu ca sursa adevărului pentru API-ul însuși.

Un exemplu lucrat: pull paginat în Parquet

Punând totul împreună: un script care trage fiecare pagină dintr-un API paginat, reîncearcă la erori, respectă rate limits și scrie în Parquet:

"""pull_orders.py - fetch all orders, write to Parquet."""
from __future__ import annotations
import logging
import time
from pathlib import Path

import httpx
import pyarrow as pa
import pyarrow.parquet as pq
from tenacity import (
    retry,
    retry_if_exception,
    stop_after_attempt,
    wait_random_exponential,
)

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("pull")

API = "https://api.example.com"
TOKEN = "..."


def is_retryable(exc: BaseException) -> bool:
    if isinstance(exc, httpx.TransportError):
        return True
    if isinstance(exc, httpx.HTTPStatusError):
        return exc.response.status_code >= 500 or exc.response.status_code in (408, 429)
    return False


@retry(
    stop=stop_after_attempt(5),
    wait=wait_random_exponential(multiplier=1, max=60),
    retry=retry_if_exception(is_retryable),
    reraise=True,
)
def fetch(client: httpx.Client, url: str, params: dict | None = None) -> dict:
    r = client.get(url, params=params)
    if r.status_code == 429:
        wait_s = float(r.headers.get("Retry-After", "5"))
        log.warning("429 rate limited, sleeping %ss", wait_s)
        time.sleep(wait_s)
        r.raise_for_status()  # triggers retry
    r.raise_for_status()
    return r.json()


def paginate(client: httpx.Client, path: str):
    cursor: str | None = None
    while True:
        params = {"limit": 200, "cursor": cursor} if cursor else {"limit": 200}
        body = fetch(client, path, params=params)
        yield from body["data"]
        cursor = body.get("next_cursor")
        if not cursor:
            return


def main(out: Path) -> None:
    headers = {"Authorization": f"Bearer {TOKEN}", "User-Agent": "narcis-pull/1.0"}
    with httpx.Client(base_url=API, headers=headers, timeout=15.0) as client:
        rows = list(paginate(client, "/v1/orders"))
    log.info("fetched %d rows", len(rows))
    table = pa.Table.from_pylist(rows)
    pq.write_table(table, out, compression="zstd")
    log.info("wrote %s", out)


if __name__ == "__main__":
    main(Path("orders.parquet"))

Asta e toată forma: paginează curat, reîncearcă la eșecuri tranzitorii, onorează rate limits, scrie un fișier coloanar tipat la final. Pune-l sub cron sau orchestratorul tău, îndreaptă pipeline-ul de ingestie din lecția 38 către output-ul Parquet și ai cusut împreună două jumătăți dintr-un ETL real.

Lecția 41 e unde înfășurăm un orchestrator în jurul întregului ansamblu, dar asta e o poveste pentru săptămâna viitoare.

Citări

Caută