Aceasta este lecția de încheiere a Modulului 7 și, ca și proiectul de analiză de la finalul Modulului 6, e una practică. Construim un pipeline complet de date: extragem zilnic de la un API public, transformăm cu pandas, încărcăm în Postgres, programăm cu cron, logăm fiecare pas, alertăm la eșec și îl proiectăm astfel încât să poți re-rula orice zi din trecut în siguranță. La final vei avea un director pe care l-ai putea comite realist într-un repo.
Pipeline-ul extrage observații meteo zilnice pentru o listă de orașe de la API-ul Open-Meteo (gratuit, fără cheie, bine comportat), transformă JSON-ul în rânduri ordonate și le încarcă într-un tabel Postgres. Același tipar funcționează pentru orice API public; Open-Meteo doar ne scutește de dansul cu credențialele.
Forma
Înainte de orice cod, layout-ul. Un pipeline real nu e un singur script, ci un director cu preocupări separate:
weather_pipeline/
├── pyproject.toml
├── src/
│ └── weather/
│ ├── __init__.py
│ ├── extract.py # API → raw JSON files
│ ├── transform.py # raw JSON → Parquet
│ ├── load.py # Parquet → Postgres
│ ├── pipeline.py # orchestrates the three above
│ ├── config.py # cities, paths, DSNs
│ └── notify.py # error alerting
├── data/
│ ├── raw/ # JSON, one file per (date, city)
│ └── processed/ # Parquet, one file per date
├── logs/
└── README.md
Trei straturi: extract, transform, load. Fiecare își scrie output-ul pe disc înainte ca următorul să-l citească. Acea separare e cea care face pipeline-ul debug-abil, restartabil și idempotent. Dacă transform-ul se strică, datele brute sunt deja pe disc. Dacă load-ul se strică, datele procesate sunt deja pe disc. Nu re-extragi de la API ca să repari un bug de parsing.
Setup cu uv
uv init weather_pipeline
cd weather_pipeline
uv add httpx tenacity pandas pyarrow psycopg[binary] python-dotenv structlog sentry-sdk
uv add --dev pytest ruff mypy
Asta ne dă: httpx pentru API, tenacity pentru retry-uri, pandas și pyarrow pentru transform, psycopg 3 pentru Postgres, python-dotenv pentru secrete, structlog pentru logging structurat, sentry-sdk pentru alertare. Valori implicite moderne și sensibile pentru 2026.
Config
# src/weather/config.py
from pathlib import Path
import os
from dotenv import load_dotenv
load_dotenv()
ROOT = Path(__file__).parent.parent.parent
RAW_DIR = ROOT / "data" / "raw"
PROCESSED_DIR = ROOT / "data" / "processed"
LOG_DIR = ROOT / "logs"
for d in (RAW_DIR, PROCESSED_DIR, LOG_DIR):
d.mkdir(parents=True, exist_ok=True)
CITIES = [
{"name": "Milan", "lat": 45.46, "lon": 9.19},
{"name": "Rome", "lat": 41.90, "lon": 12.50},
{"name": "Naples", "lat": 40.85, "lon": 14.27},
{"name": "Turin", "lat": 45.07, "lon": 7.69},
{"name": "Florence", "lat": 43.77, "lon": 11.25},
]
PG_DSN = os.environ["WEATHER_PG_DSN"]
SENTRY_DSN = os.environ.get("SENTRY_DSN")
Observă: niciun secret în cod. DSN-ul vine din .env (gitignored) în dev și din variabile de mediu reale în producție.
Extract
# src/weather/extract.py
import json
from datetime import date
from pathlib import Path
import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from weather.config import CITIES, RAW_DIR
log = structlog.get_logger()
API = "https://archive-api.open-meteo.com/v1/archive"
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, max=60),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_city(client: httpx.Client, city: dict, run_date: date) -> dict:
log.info("fetch_city.start", city=city["name"], date=str(run_date))
response = client.get(
API,
params={
"latitude": city["lat"],
"longitude": city["lon"],
"start_date": run_date.isoformat(),
"end_date": run_date.isoformat(),
"hourly": "temperature_2m,precipitation,wind_speed_10m",
},
timeout=30,
)
response.raise_for_status()
return response.json()
def extract(run_date: date) -> Path:
out_dir = RAW_DIR / run_date.isoformat()
out_dir.mkdir(parents=True, exist_ok=True)
with httpx.Client(http2=True) as client:
for city in CITIES:
out_file = out_dir / f"{city['name'].lower()}.json"
if out_file.exists():
log.info("extract.skip_existing", city=city["name"])
continue
data = fetch_city(client, city, run_date)
data["_meta"] = {"city": city["name"], "run_date": run_date.isoformat()}
out_file.write_text(json.dumps(data))
log.info("fetch_city.done", city=city["name"], bytes=len(out_file.read_bytes()))
return out_dir
Trei lucruri de remarcat. Tenacity învelește apelul HTTP: 5 încercări, backoff exponențial plafonat la 60 de secunde, retry doar la erori de rețea (nu la 4xx, care nu se vor îmbunătăți). Idempotență la nivel de fișier: dacă JSON-ul pentru un oraș există deja, sare peste el. Re-rularea extract-ului pentru o dată e sigură și gratuită. Logging structurat prin structlog: fiecare eveniment e o linie JSON cu câmpuri numite, ușor de făcut grep, parsat, trimis către un agregator de loguri.
Transform
# src/weather/transform.py
import json
from datetime import date
from pathlib import Path
import pandas as pd
import structlog
from weather.config import RAW_DIR, PROCESSED_DIR
log = structlog.get_logger()
def transform(run_date: date) -> Path:
in_dir = RAW_DIR / run_date.isoformat()
rows = []
for path in sorted(in_dir.glob("*.json")):
payload = json.loads(path.read_text())
city = payload["_meta"]["city"]
hourly = payload["hourly"]
df = pd.DataFrame(hourly)
df["timestamp"] = pd.to_datetime(df["time"])
df["city"] = city
df["run_date"] = pd.Timestamp(run_date)
df = df.drop(columns=["time"])
rows.append(df)
if not rows:
raise RuntimeError(f"no raw files for {run_date}")
combined = pd.concat(rows, ignore_index=True)
# Validation — fail loudly on impossible values.
if combined["temperature_2m"].dropna().between(-60, 60).all() is False:
bad = combined[~combined["temperature_2m"].between(-60, 60, inclusive="both")]
raise ValueError(f"impossible temperatures: {bad}")
if combined.duplicated(subset=["city", "timestamp"]).any():
raise ValueError("duplicate (city, timestamp) rows after combine")
out_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
combined.to_parquet(out_file, index=False)
log.info("transform.done", rows=len(combined), out=str(out_file))
return out_file
Transform-ul citește fiecare JSON brut pentru data respectivă, denormalizează dicționarul hourly imbricat în rânduri, adaugă coloana cu orașul (API-ul nu o returnează) și validează. Validarea aparține aici, nu în load. Prinderea valorilor imposibile înainte de baza de date e mai curată: Postgres va accepta orice îi dai; tu vrei eroarea umană înainte de asta.
Output-ul Parquet e contractul cu pasul de load. Parquet în loc de CSV pentru că păstrează dtype-urile, comprimă bine și se citește rapid.
Load
# src/weather/load.py
from datetime import date
from pathlib import Path
import pandas as pd
import psycopg
import structlog
from weather.config import PG_DSN, PROCESSED_DIR
log = structlog.get_logger()
DDL = """
CREATE TABLE IF NOT EXISTS weather_hourly (
city TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
temperature_2m DOUBLE PRECISION,
precipitation DOUBLE PRECISION,
wind_speed_10m DOUBLE PRECISION,
run_date DATE NOT NULL,
PRIMARY KEY (city, timestamp)
);
"""
UPSERT = """
INSERT INTO weather_hourly (city, timestamp, temperature_2m, precipitation, wind_speed_10m, run_date)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (city, timestamp) DO UPDATE SET
temperature_2m = EXCLUDED.temperature_2m,
precipitation = EXCLUDED.precipitation,
wind_speed_10m = EXCLUDED.wind_speed_10m,
run_date = EXCLUDED.run_date;
"""
def load(run_date: date) -> int:
in_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
df = pd.read_parquet(in_file)
rows = list(df[["city", "timestamp", "temperature_2m", "precipitation", "wind_speed_10m", "run_date"]].itertuples(index=False, name=None))
with psycopg.connect(PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(DDL)
cur.executemany(UPSERT, rows)
conn.commit()
log.info("load.done", rows=len(rows), date=str(run_date))
return len(rows)
Pasul de load folosește INSERT ... ON CONFLICT ... DO UPDATE, upsert-ul Postgres. Cheia primară e (city, timestamp). Re-rularea load-ului pentru aceeași dată suprascrie rândurile existente cu cele noi; dacă o valoare s-a schimbat (pentru că sursa a corectat-o), baza de date se aliniază. Asta face pipeline-ul idempotent de la un capăt la altul: re-rularea pentru o dată trecută nu duplică, nu intră în conflict, nu necesită curățenie manuală.
Notificare
# src/weather/notify.py
import os
import sentry_sdk
def init():
dsn = os.environ.get("SENTRY_DSN")
if dsn:
sentry_sdk.init(dsn=dsn, traces_sample_rate=0.0)
În producție inițializezi Sentry la pornire și orice excepție ne-prinsă e raportată, cu traceback, locale și context. Dacă nu vrei dependența Sentry, un fallback SMTP de 20 de linii e bun — important e ca eșecurile să ajungă la un om.
Orchestratorul
# src/weather/pipeline.py
import sys
from datetime import date, timedelta
import structlog
from weather import notify
from weather.extract import extract
from weather.transform import transform
from weather.load import load
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
log = structlog.get_logger()
def run(run_date: date) -> None:
notify.init()
log.info("pipeline.start", date=str(run_date))
try:
extract(run_date)
transform(run_date)
n = load(run_date)
log.info("pipeline.done", date=str(run_date), rows=n)
except Exception:
log.exception("pipeline.failed", date=str(run_date))
raise
if __name__ == "__main__":
if len(sys.argv) > 1:
target = date.fromisoformat(sys.argv[1])
else:
target = date.today() - timedelta(days=1)
run(target)
Trei observații. Default-ul e ieri, nu azi, fiindcă datele complete pentru ziua curentă nu sunt încă gata: o greșeală frecventă în pipeline-urile programate. Un argument CLI cu data îți permite să faci backfill pentru orice zi din trecut cu python -m weather.pipeline 2026-03-15. Eșecurile re-aruncă după logare, deci cron vede ieșirea non-zero și Sentry vede excepția.
Programează-l
Pentru v1, cron:
# crontab -e
0 2 * * * cd /opt/weather_pipeline && /opt/weather_pipeline/.venv/bin/python -m weather.pipeline >> logs/cron.log 2>&1
Zilnic la 2 noaptea, cu logurile adăugate la coadă. Asta e tot. Pentru dimensiunea acestui pipeline, cron e sincer suficient. Motivele să avansezi la Prefect sau Airflow (lecția 41) sunt: mai multe pipeline-uri, dependențe între ele, un UI pentru stakeholderi non-tehnici, logică de retry mai inteligentă sau nevoia de a face backfill pentru intervale de date prin click-uri în loc de bucle shell.
Când avansezi, munca e mecanică. Versiunea Prefect a aceluiași pipeline e, în mare:
from prefect import flow, task
from datetime import date
@task(retries=3)
def t_extract(run_date): return extract(run_date)
@task
def t_transform(run_date): return transform(run_date)
@task
def t_load(run_date): return load(run_date)
@flow
def weather_flow(run_date: date | None = None):
run_date = run_date or (date.today() - timedelta(days=1))
t_extract(run_date)
t_transform(run_date)
t_load(run_date)
if __name__ == "__main__":
weather_flow.serve(name="weather-daily", cron="0 2 * * *")
Aceeași logică, învelită în decoratoare flow / task, programată de Prefect Cloud, observabilă într-un UI real. Munca pe care ai făcut-o ca să faci extract / transform / load separabile e cea care a făcut migrarea ieftină.
Backfills
Decizia se plătește în ziua în care cineva spune „am pierdut o săptămână de date, poți să rulezi din nou?” Cu acest design:
for d in 2026-03-01 2026-03-02 2026-03-03 2026-03-04 2026-03-05; do
python -m weather.pipeline $d
done
Gata. Fiecare zi e independentă, idempotentă, re-rulabilă în siguranță. Niciun „mod special de backfill”, nicio ștergere manuală, nicio surpriză cu duplicate. Proiectarea pentru backfill din ziua unu e singurul obicei cu cel mai mare leverage din data engineering.
Unde ajută asistenții AI și unde nu
E un moment bun pentru o notă onestă. Asistenții de cod (Claude, Copilot, Cursor) își câștigă pâinea pe părțile de lipici dintr-un pipeline. Decoratoare de retry cu backoff, boilerplate de logging structurat, „scrie-mi upsert-ul SQL pentru schema asta de dataframe”, „adaugă un argument CLI pentru dată” — cod plictisitor, structurat, predispus la erori, unde consistența contează și creativitatea nu. Îl scriu repede și bine.
Unde ajută cel mai puțin e arhitectura: unde să pui granița dintre extract și transform, ce să validezi în ce stadiu, cum să modelezi graful de active, care e contractul tău de idempotență. Acele decizii depind de context pe care modelul nu îl are: ciudățeniile surselor tale de date, convențiile echipei tale, toleranțele consumatorilor de mai jos. Ia-le tu; lasă asistentul să umple schela din jurul lor.
Ce lipsește
Aceasta este cea mai mică formă realistă, nu e gata de producție. Producția adaugă:
- Teste: teste unitare pentru transform, teste de integrare contra unui container de test Postgres, teste de contract contra unui răspuns API înregistrat.
- Infrastructură: containerizare, deploy pe un VM mic sau pe un runner gestionat, gestionarea secretelor prin platforma ta.
- Alertare: peste Sentry, o alertă „pipeline-ul nu a rulat” (o verificare de heartbeat lipsă, adesea via un serviciu precum Healthchecks.io).
- CI: lint, type-check și rulare de teste la fiecare PR.
- Observabilitate: trimiterea logurilor către ELK / Loki / Datadog ca să poți căuta între rulări și metrici pe numărul de rânduri ca să prinzi sub-extragerea silențioasă.
- Management de schemă: migrări via Alembic când tabelul evoluează, în loc de un
CREATE IF NOT EXISTS.
Dar versiunea pe care am construit-o aici acoperă esența: date care curg fiabil de la sursă la destinație, monitorizate, idempotente, programabile, debug-abile. Acesta e standardul, și e suficient de înalt încât majoritatea pipeline-urilor din sălbăticie nu îl ating.
Modulul 7 în recapitulare și ce urmează
Am început Modulul 7 cu o introducere în data engineering ca disciplină. Am acoperit ETL versus ELT, formate moderne (Parquet, Iceberg, Delta), object stores, dbt, asyncio, orchestratoare și acum pipeline-uri. Ai văzut trusa de unelte pe care un junior data engineer e așteptat să o navigheze în 2026.
Modulul 8, Python numeric, întoarce camera invers. Acolo unde Modulele 6 și 7 erau despre lupta cu datele și mișcarea lor, Modulul 8 e despre calculul pe ele. NumPy în profunzime, vectorizare, broadcasting, ecosistemul SciPy, un tur al locurilor unde Python atinge munca numerică pe care pandas n-o atinge. Mai puțină instalație, mai multă matematică. Ne vedem acolo.
Citații: Open-Meteo archive API, tenacity docs, psycopg 3 docs, structlog docs, Prefect docs. Consultat 2026-05-01.