Cealaltă jumătate a ingestiei sunt API-urile. Serviciul HTTP al altcuiva are datele de care ai nevoie. Scrii un client Python, tragi pagini, scrii pe disc sau într-o bază de date, repeți mâine. Sună simplu. Prima versiune naivă merge mereu. A doua oară când o rulezi, API-ul are o zi proastă și descoperi că „simplu” are o coadă lungă de moduri de eșec: timeout-uri, 503-uri, 429-uri, token-uri expirate, paginare opacă, JSON care e în secret XML. Lecția asta e trusa de unelte pentru coada aia lungă.
Bibliotecile HTTP în 2026
Trei biblioteci pe care le vei întâlni.
requests: clasicul. Doar sync. Nu s-a schimbat mult de ani; nici nu trebuie. Cel mai simplu, mai lizibil cod HTTP pe care îl poți scrie în Python. Dacă scrii un script unic și nu îți pasă de concurență, requests rămâne o alegere perfect bună în 2026.
import requests
r = requests.get("https://api.example.com/orders", params={"limit": 100}, timeout=10)
r.raise_for_status()
data = r.json()
httpx: modern, sync și async, în mare parte compatibil drop-in cu requests. Suport HTTP/2 din start. Recomandarea standard pentru cod nou în 2026 și cea pe care o aleg implicit.
import httpx
with httpx.Client(timeout=10.0) as client:
r = client.get("https://api.example.com/orders", params={"limit": 100})
r.raise_for_status()
data = r.json()
Versiunea async se citește aproape identic:
import asyncio
import httpx
async def fetch_all(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=10.0) as client:
responses = await asyncio.gather(*(client.get(u) for u in urls))
return [r.json() for r in responses]
Când ai 200 de URL-uri de lovit și o cerere individuală durează 200ms, versiunea sync ia 40 de secunde. Versiunea async ia 2. Diferența asta e motivul pentru care există httpx.
aiohttp: doar async, opțiunea async mai veche. Tot excelentă, tot menținută, tot comună în producție. Dacă moștenești o codebase care o folosește, ești în regulă. Pentru cod nou aș alege httpx pentru unificarea sync/async.
Pentru lecția asta vom folosi httpx. Pattern-urile se traduc la requests aproape linie cu linie.
Bazele, făcute corect
Patru lucruri de făcut bine la fiecare cerere:
r = client.get(
"https://api.example.com/orders",
params={"since": "2026-04-01", "limit": 100},
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
timeout=10.0,
)
r.raise_for_status()
data = r.json()
params: parametrii de query string ca dict. Nu concatena string-uri în URL-uri; httpx îi va encoda pentru tine.headers: auth, accept, user-agent. Setează un User-Agent care identifică aplicația ta; unele API-uri resping pe cele goale.timeout: întotdeauna setează-l. Implicitul în unele biblioteci e „așteaptă la nesfârșit”, ceea ce e o cale grozavă să-ți blochezi pipeline-ul în spatele unei conexiuni atârnate. 10 secunde e un punct de plecare rezonabil.raise_for_status(): ridică o excepție pentru răspunsuri 4xx/5xx. Fără el, codul tău continuă vesel cur.json()pe o pagină de eroare 500 și primești o eroare confuză de parsing JSON în loc de o eroare HTTP clară.
Un client care reutilizează conexiunile e semnificativ mai rapid decât a apela httpx.get direct de fiecare dată, fiindcă pune în pool conexiunile TCP:
with httpx.Client(
base_url="https://api.example.com",
headers={"Authorization": f"Bearer {token}"},
timeout=10.0,
) as client:
for endpoint in endpoints:
r = client.get(endpoint)
...
Folosește clientul. Întotdeauna. E o linie în plus și o accelerare vizibilă pentru un API vorbăreț.
Retries cu tenacity
Unele eșecuri sunt tranzitorii. Serverul a sughițat. Conexiunea TCP a fost resetată. Un load balancer a flipuit. Răspunsul corect e „așteaptă o clipă și încearcă din nou”. Răspunsul greșit e „eșuează zgomotos către utilizator și pune o reluare manuală pe lista celui de la on-call”.
Biblioteca e tenacity. Decoratorul e @retry și compune câteva politici mai mici.
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
import httpx
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((httpx.TransportError, httpx.HTTPStatusError)),
reraise=True,
)
def fetch(client: httpx.Client, url: str) -> dict:
r = client.get(url)
r.raise_for_status()
return r.json()
Ce zice asta: încearcă până la 5 ori; așteaptă 1s, 2s, 4s, 8s între încercări (limitat la 60s); încearcă din nou la erori de rețea și erori HTTP; dacă tot eșuăm după 5 încercări, ridică din nou ultima excepție ca apelantul să vadă o eroare reală.
Matematica din spatele exponential backoff: fiecare retry dublează așteptarea. Motivul nu e superstiție, e că atunci când un serviciu e supraîncărcat, retry-ul imediat doar adaugă încărcare. Exponential backoff lasă coada să se golească. Adaugă un strop de jitter (variație aleatoare în timpul de așteptare) ca o mie de clienți să nu reîncerce toți în același instant:
from tenacity import wait_random_exponential
wait=wait_random_exponential(multiplier=1, max=60),
Asta e „exponențial până la 60s, cu jitter”. Documentația tenacity numește acesta implicitul recomandat pentru a vorbi cu servicii externe și e ce iau prima dată.
Ce să reîncerci, ce nu
Nu fiecare eroare merită un retry. Împărțirea e cam așa:
Retry: erori 5xx de server, 408 request timeout, 429 too many requests, erori de rețea (ConnectError, ReadTimeout, RemoteProtocolError). Astea sunt tranzitorii.
Nu retry: erori 4xx de client (cu excepția 408/429). 400 bad request înseamnă că cererea ta era malformată: retry-ul nu schimbă nimic. 401 înseamnă că token-ul tău e invalid: retry-ul doar bate la endpoint-ul de auth. 404 înseamnă că resursa nu există. 422 înseamnă că validarea a eșuat.
Tenacity îți dă granularitatea:
def is_retryable(exc: BaseException) -> bool:
if isinstance(exc, httpx.TransportError):
return True
if isinstance(exc, httpx.HTTPStatusError):
status = exc.response.status_code
return status >= 500 or status in (408, 429)
return False
@retry(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, max=60),
retry=retry_if_exception(is_retryable),
reraise=True,
)
def fetch(client: httpx.Client, url: str) -> dict:
r = client.get(url)
r.raise_for_status()
return r.json()
Acela e decoratorul de retry de calitate de producție. Fură-l.
Rate limits: respectă răspunsul
Când un API îți spune să încetinești, încetinește. Majoritatea API-urilor semnalează asta cu HTTP 429 și un header Retry-After:
HTTP/1.1 429 Too Many Requests
Retry-After: 30
Onorează-l:
def fetch(client: httpx.Client, url: str) -> dict:
while True:
r = client.get(url)
if r.status_code == 429:
wait_s = float(r.headers.get("Retry-After", "5"))
log.warning("rate limited, sleeping %ss", wait_s)
time.sleep(wait_s)
continue
r.raise_for_status()
return r.json()
Asta e podeaua: citește header-ul, dormi, reîncearcă. Tavanul e throttling proactiv: limitează-te singur înainte ca API-ul să fie nevoit. Dacă API-ul permite 100 cereri pe minut, rulează clientul tău la 90/minut și nu vei vedea niciodată un 429.
Biblioteca limits îți dă un rate-limiter curat:
from limits import RateLimitItemPerMinute
from limits.storage import MemoryStorage
from limits.strategies import MovingWindowRateLimiter
storage = MemoryStorage()
limiter = MovingWindowRateLimiter(storage)
quota = RateLimitItemPerMinute(90)
def fetch(client: httpx.Client, url: str) -> dict:
while not limiter.hit(quota, "api.example.com"):
time.sleep(0.1)
r = client.get(url)
r.raise_for_status()
return r.json()
Sau scrie-ți propriul token bucket: sunt cam 30 de linii. Ideea e gestionarea conștientă a rate-ului. API-urile te limitează fiindcă cineva din echipa lor de ops a fost trezit la 3 dimineața din cauza unui client care se purta urât. Nu fi acel client.
Unde e posibil, batch. Dacă API-ul are un endpoint bulk (POST /orders/lookup care ia 100 de ID-uri), folosește-l în loc de 100 de GET-uri individuale.
Pattern-uri de paginare
Trei aromuri pe care le vei întâlni, în ordinea cam crescătoare a plăcutului.
Offset/limit: ?offset=0&limit=100, apoi ?offset=100&limit=100, etc. Clasicul, dar are un defect: dacă rândurile sunt inserate în timp ce paginezi, poți rata sau duplica rânduri. Acceptabil pentru dataset-uri statice, dubios pentru cele live.
def paginate_offset(client: httpx.Client, url: str, limit: int = 100):
offset = 0
while True:
r = client.get(url, params={"offset": offset, "limit": limit})
r.raise_for_status()
page = r.json()["data"]
if not page:
return
yield from page
offset += len(page)
Cursor-based: API-ul returnează un token opac next_cursor; îl pasezi înapoi ca să obții pagina următoare. Stabil peste scrieri, implicitul modern.
def paginate_cursor(client: httpx.Client, url: str):
cursor: str | None = None
while True:
params = {"cursor": cursor} if cursor else {}
r = client.get(url, params=params)
r.raise_for_status()
body = r.json()
yield from body["data"]
cursor = body.get("next_cursor")
if not cursor:
return
Link header (RFC 5988): API-ul pune URL-ul paginii următoare în header-ul HTTP Link. GitHub folosește asta. Mai puțin comun, dar elegant: nu construiești URL-ul următor, doar îl urmezi.
def paginate_link(client: httpx.Client, url: str):
while url:
r = client.get(url)
r.raise_for_status()
yield from r.json()
# httpx parses Link headers into r.links
next_link = r.links.get("next")
url = next_link["url"] if next_link else None
Citește documentația, alege-l pe cel potrivit, scrie un generator. Generatoarele sunt forma ideală aici: apelantul nu trebuie să știe câte pagini sunt sau să-i pese de granițele paginilor.
Autentificare
Variațiile:
- Chei API în headere: cea mai comună și locul corect pentru ele.
Authorization: Bearer <token>sau un header custom precumX-API-Key. - Chei API în query string-uri: unele API-uri legacy fac asta. Evită unde e posibil: query string-urile ajung în log-urile de access ale serverului și în istoricul browserului.
- OAuth 2.0 client credentials: pentru server-la-server. POST la un endpoint de token, primești înapoi un access token, îl folosești o oră, refresh.
- OAuth 2.0 authorization code: pentru flow-uri „acționează în numele unui utilizator”. Redirecționări de browser, scope-uri, refresh tokens. Nu e ce vrei pentru job-uri batch.
Un mic wrapper de refresh-token pentru flow-ul client-credentials:
import time
from dataclasses import dataclass
@dataclass
class Token:
value: str
expires_at: float
class TokenManager:
def __init__(self, client_id: str, client_secret: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Token | None = None
def get(self, client: httpx.Client) -> str:
if self._token and self._token.expires_at > time.time() + 60:
return self._token.value
r = client.post(self.token_url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
})
r.raise_for_status()
body = r.json()
self._token = Token(body["access_token"], time.time() + body["expires_in"])
return self._token.value
Bufferul de 60 de secunde e intenționat: face refresh puțin înainte de expirare ca să nu primești un 401 la mijlocul unei cereri pentru că ceasul a deviat.
Webhook-uri vs polling
O paranteză rapidă. Dacă API-ul oferă webhook-uri („vom face POST la URL-ul tău când se întâmplă ceva”), aproape întotdeauna sunt mai bune decât polling-ul. Te oprești din a bate un API cu „ceva nou?” de douăsprezece ori pe oră și lași sursa să împingă când chiar sunt noutăți. Costul e să rulezi un mic server HTTP care să le primească. Pentru integrări de volum mare sau latență mică, asta e calea.
Polling-ul e bun pentru job-uri batch de frecvență joasă, sau când nu controlezi infrastructura ca să primești webhook-uri, sau când API-ul nu le suportă.
O notă despre clienți API generați de AI
Asistenții AI sunt excelenți la producerea de boilerplate pentru genul ăsta de cod. „Scrie-mi un client httpx cu retry-uri tenacity care paginate prin cursor și gestionează rate limits” îți dă 80% dintr-un client funcțional în 15 secunde. Decoratoarele de retry, buclele de paginare, flow-urile de auth: toate foarte pattern-shaped, toate lucruri pe care AI le face aproape perfect.
Capcana: asistenții AI uneori inventează nume de endpoint-uri care nu există. Au văzut zece mii de clienți API și au pattern-matched pe al tău cu unul similar și vor produce cu încredere cod care apelează GET /api/v2/users/me/orders când API-ul real are GET /v2/customer/orders. Verifică întotdeauna numele de endpoint-uri, numele de parametri și formele de răspuns față de documentația API reală înainte să livrezi. Tratează output-ul AI-ului ca o schiță a structurii, nu ca sursa adevărului pentru API-ul însuși.
Un exemplu lucrat: pull paginat în Parquet
Punând totul împreună: un script care trage fiecare pagină dintr-un API paginat, reîncearcă la erori, respectă rate limits și scrie în Parquet:
"""pull_orders.py - fetch all orders, write to Parquet."""
from __future__ import annotations
import logging
import time
from pathlib import Path
import httpx
import pyarrow as pa
import pyarrow.parquet as pq
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_random_exponential,
)
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("pull")
API = "https://api.example.com"
TOKEN = "..."
def is_retryable(exc: BaseException) -> bool:
if isinstance(exc, httpx.TransportError):
return True
if isinstance(exc, httpx.HTTPStatusError):
return exc.response.status_code >= 500 or exc.response.status_code in (408, 429)
return False
@retry(
stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=1, max=60),
retry=retry_if_exception(is_retryable),
reraise=True,
)
def fetch(client: httpx.Client, url: str, params: dict | None = None) -> dict:
r = client.get(url, params=params)
if r.status_code == 429:
wait_s = float(r.headers.get("Retry-After", "5"))
log.warning("429 rate limited, sleeping %ss", wait_s)
time.sleep(wait_s)
r.raise_for_status() # triggers retry
r.raise_for_status()
return r.json()
def paginate(client: httpx.Client, path: str):
cursor: str | None = None
while True:
params = {"limit": 200, "cursor": cursor} if cursor else {"limit": 200}
body = fetch(client, path, params=params)
yield from body["data"]
cursor = body.get("next_cursor")
if not cursor:
return
def main(out: Path) -> None:
headers = {"Authorization": f"Bearer {TOKEN}", "User-Agent": "narcis-pull/1.0"}
with httpx.Client(base_url=API, headers=headers, timeout=15.0) as client:
rows = list(paginate(client, "/v1/orders"))
log.info("fetched %d rows", len(rows))
table = pa.Table.from_pylist(rows)
pq.write_table(table, out, compression="zstd")
log.info("wrote %s", out)
if __name__ == "__main__":
main(Path("orders.parquet"))
Asta e toată forma: paginează curat, reîncearcă la eșecuri tranzitorii, onorează rate limits, scrie un fișier coloanar tipat la final. Pune-l sub cron sau orchestratorul tău, îndreaptă pipeline-ul de ingestie din lecția 38 către output-ul Parquet și ai cusut împreună două jumătăți dintr-un ETL real.
Lecția 41 e unde înfășurăm un orchestrator în jurul întregului ansamblu, dar asta e o poveste pentru săptămâna viitoare.
Citări
- Documentația httpx, https://www.python-httpx.org/, consultat 2026-05-01.
- Documentația requests, https://requests.readthedocs.io/, consultat 2026-05-01.
- Documentația tenacity, https://tenacity.readthedocs.io/, consultat 2026-05-01.
- RFC 5988 (Web Linking), https://datatracker.ietf.org/doc/html/rfc5988, consultat 2026-05-01.