Python, de la zero Lecția 42 / 60

Proiect de data engineering: construieste un pipeline real

De la sursa la destinatie, cu monitorizare, idempotenta si planificare. Lectiile Modulului 7 facute tangibile.

Aceasta este lecția de încheiere a Modulului 7 și, ca și proiectul de analiză de la finalul Modulului 6, e una practică. Construim un pipeline complet de date: extragem zilnic de la un API public, transformăm cu pandas, încărcăm în Postgres, programăm cu cron, logăm fiecare pas, alertăm la eșec și îl proiectăm astfel încât să poți re-rula orice zi din trecut în siguranță. La final vei avea un director pe care l-ai putea comite realist într-un repo.

Pipeline-ul extrage observații meteo zilnice pentru o listă de orașe de la API-ul Open-Meteo (gratuit, fără cheie, bine comportat), transformă JSON-ul în rânduri ordonate și le încarcă într-un tabel Postgres. Același tipar funcționează pentru orice API public; Open-Meteo doar ne scutește de dansul cu credențialele.

Forma

Înainte de orice cod, layout-ul. Un pipeline real nu e un singur script, ci un director cu preocupări separate:

weather_pipeline/
├── pyproject.toml
├── src/
│   └── weather/
│       ├── __init__.py
│       ├── extract.py     # API → raw JSON files
│       ├── transform.py   # raw JSON → Parquet
│       ├── load.py        # Parquet → Postgres
│       ├── pipeline.py    # orchestrates the three above
│       ├── config.py      # cities, paths, DSNs
│       └── notify.py      # error alerting
├── data/
│   ├── raw/               # JSON, one file per (date, city)
│   └── processed/         # Parquet, one file per date
├── logs/
└── README.md

Trei straturi: extract, transform, load. Fiecare își scrie output-ul pe disc înainte ca următorul să-l citească. Acea separare e cea care face pipeline-ul debug-abil, restartabil și idempotent. Dacă transform-ul se strică, datele brute sunt deja pe disc. Dacă load-ul se strică, datele procesate sunt deja pe disc. Nu re-extragi de la API ca să repari un bug de parsing.

Setup cu uv

uv init weather_pipeline
cd weather_pipeline
uv add httpx tenacity pandas pyarrow psycopg[binary] python-dotenv structlog sentry-sdk
uv add --dev pytest ruff mypy

Asta ne dă: httpx pentru API, tenacity pentru retry-uri, pandas și pyarrow pentru transform, psycopg 3 pentru Postgres, python-dotenv pentru secrete, structlog pentru logging structurat, sentry-sdk pentru alertare. Valori implicite moderne și sensibile pentru 2026.

Config

# src/weather/config.py
from pathlib import Path
import os
from dotenv import load_dotenv

load_dotenv()

ROOT = Path(__file__).parent.parent.parent
RAW_DIR = ROOT / "data" / "raw"
PROCESSED_DIR = ROOT / "data" / "processed"
LOG_DIR = ROOT / "logs"

for d in (RAW_DIR, PROCESSED_DIR, LOG_DIR):
    d.mkdir(parents=True, exist_ok=True)

CITIES = [
    {"name": "Milan", "lat": 45.46, "lon": 9.19},
    {"name": "Rome", "lat": 41.90, "lon": 12.50},
    {"name": "Naples", "lat": 40.85, "lon": 14.27},
    {"name": "Turin", "lat": 45.07, "lon": 7.69},
    {"name": "Florence", "lat": 43.77, "lon": 11.25},
]

PG_DSN = os.environ["WEATHER_PG_DSN"]
SENTRY_DSN = os.environ.get("SENTRY_DSN")

Observă: niciun secret în cod. DSN-ul vine din .env (gitignored) în dev și din variabile de mediu reale în producție.

Extract

# src/weather/extract.py
import json
from datetime import date
from pathlib import Path

import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

from weather.config import CITIES, RAW_DIR

log = structlog.get_logger()
API = "https://archive-api.open-meteo.com/v1/archive"

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, max=60),
    retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
    reraise=True,
)
def fetch_city(client: httpx.Client, city: dict, run_date: date) -> dict:
    log.info("fetch_city.start", city=city["name"], date=str(run_date))
    response = client.get(
        API,
        params={
            "latitude": city["lat"],
            "longitude": city["lon"],
            "start_date": run_date.isoformat(),
            "end_date": run_date.isoformat(),
            "hourly": "temperature_2m,precipitation,wind_speed_10m",
        },
        timeout=30,
    )
    response.raise_for_status()
    return response.json()

def extract(run_date: date) -> Path:
    out_dir = RAW_DIR / run_date.isoformat()
    out_dir.mkdir(parents=True, exist_ok=True)
    with httpx.Client(http2=True) as client:
        for city in CITIES:
            out_file = out_dir / f"{city['name'].lower()}.json"
            if out_file.exists():
                log.info("extract.skip_existing", city=city["name"])
                continue
            data = fetch_city(client, city, run_date)
            data["_meta"] = {"city": city["name"], "run_date": run_date.isoformat()}
            out_file.write_text(json.dumps(data))
            log.info("fetch_city.done", city=city["name"], bytes=len(out_file.read_bytes()))
    return out_dir

Trei lucruri de remarcat. Tenacity învelește apelul HTTP: 5 încercări, backoff exponențial plafonat la 60 de secunde, retry doar la erori de rețea (nu la 4xx, care nu se vor îmbunătăți). Idempotență la nivel de fișier: dacă JSON-ul pentru un oraș există deja, sare peste el. Re-rularea extract-ului pentru o dată e sigură și gratuită. Logging structurat prin structlog: fiecare eveniment e o linie JSON cu câmpuri numite, ușor de făcut grep, parsat, trimis către un agregator de loguri.

Transform

# src/weather/transform.py
import json
from datetime import date
from pathlib import Path

import pandas as pd
import structlog

from weather.config import RAW_DIR, PROCESSED_DIR

log = structlog.get_logger()

def transform(run_date: date) -> Path:
    in_dir = RAW_DIR / run_date.isoformat()
    rows = []
    for path in sorted(in_dir.glob("*.json")):
        payload = json.loads(path.read_text())
        city = payload["_meta"]["city"]
        hourly = payload["hourly"]
        df = pd.DataFrame(hourly)
        df["timestamp"] = pd.to_datetime(df["time"])
        df["city"] = city
        df["run_date"] = pd.Timestamp(run_date)
        df = df.drop(columns=["time"])
        rows.append(df)

    if not rows:
        raise RuntimeError(f"no raw files for {run_date}")

    combined = pd.concat(rows, ignore_index=True)

    # Validation — fail loudly on impossible values.
    if combined["temperature_2m"].dropna().between(-60, 60).all() is False:
        bad = combined[~combined["temperature_2m"].between(-60, 60, inclusive="both")]
        raise ValueError(f"impossible temperatures: {bad}")
    if combined.duplicated(subset=["city", "timestamp"]).any():
        raise ValueError("duplicate (city, timestamp) rows after combine")

    out_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
    combined.to_parquet(out_file, index=False)
    log.info("transform.done", rows=len(combined), out=str(out_file))
    return out_file

Transform-ul citește fiecare JSON brut pentru data respectivă, denormalizează dicționarul hourly imbricat în rânduri, adaugă coloana cu orașul (API-ul nu o returnează) și validează. Validarea aparține aici, nu în load. Prinderea valorilor imposibile înainte de baza de date e mai curată: Postgres va accepta orice îi dai; tu vrei eroarea umană înainte de asta.

Output-ul Parquet e contractul cu pasul de load. Parquet în loc de CSV pentru că păstrează dtype-urile, comprimă bine și se citește rapid.

Load

# src/weather/load.py
from datetime import date
from pathlib import Path

import pandas as pd
import psycopg
import structlog

from weather.config import PG_DSN, PROCESSED_DIR

log = structlog.get_logger()

DDL = """
CREATE TABLE IF NOT EXISTS weather_hourly (
    city TEXT NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    temperature_2m DOUBLE PRECISION,
    precipitation DOUBLE PRECISION,
    wind_speed_10m DOUBLE PRECISION,
    run_date DATE NOT NULL,
    PRIMARY KEY (city, timestamp)
);
"""

UPSERT = """
INSERT INTO weather_hourly (city, timestamp, temperature_2m, precipitation, wind_speed_10m, run_date)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (city, timestamp) DO UPDATE SET
    temperature_2m = EXCLUDED.temperature_2m,
    precipitation = EXCLUDED.precipitation,
    wind_speed_10m = EXCLUDED.wind_speed_10m,
    run_date = EXCLUDED.run_date;
"""

def load(run_date: date) -> int:
    in_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
    df = pd.read_parquet(in_file)
    rows = list(df[["city", "timestamp", "temperature_2m", "precipitation", "wind_speed_10m", "run_date"]].itertuples(index=False, name=None))

    with psycopg.connect(PG_DSN) as conn:
        with conn.cursor() as cur:
            cur.execute(DDL)
            cur.executemany(UPSERT, rows)
        conn.commit()

    log.info("load.done", rows=len(rows), date=str(run_date))
    return len(rows)

Pasul de load folosește INSERT ... ON CONFLICT ... DO UPDATE, upsert-ul Postgres. Cheia primară e (city, timestamp). Re-rularea load-ului pentru aceeași dată suprascrie rândurile existente cu cele noi; dacă o valoare s-a schimbat (pentru că sursa a corectat-o), baza de date se aliniază. Asta face pipeline-ul idempotent de la un capăt la altul: re-rularea pentru o dată trecută nu duplică, nu intră în conflict, nu necesită curățenie manuală.

Notificare

# src/weather/notify.py
import os
import sentry_sdk

def init():
    dsn = os.environ.get("SENTRY_DSN")
    if dsn:
        sentry_sdk.init(dsn=dsn, traces_sample_rate=0.0)

În producție inițializezi Sentry la pornire și orice excepție ne-prinsă e raportată, cu traceback, locale și context. Dacă nu vrei dependența Sentry, un fallback SMTP de 20 de linii e bun — important e ca eșecurile să ajungă la un om.

Orchestratorul

# src/weather/pipeline.py
import sys
from datetime import date, timedelta

import structlog

from weather import notify
from weather.extract import extract
from weather.transform import transform
from weather.load import load

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)
log = structlog.get_logger()

def run(run_date: date) -> None:
    notify.init()
    log.info("pipeline.start", date=str(run_date))
    try:
        extract(run_date)
        transform(run_date)
        n = load(run_date)
        log.info("pipeline.done", date=str(run_date), rows=n)
    except Exception:
        log.exception("pipeline.failed", date=str(run_date))
        raise

if __name__ == "__main__":
    if len(sys.argv) > 1:
        target = date.fromisoformat(sys.argv[1])
    else:
        target = date.today() - timedelta(days=1)
    run(target)

Trei observații. Default-ul e ieri, nu azi, fiindcă datele complete pentru ziua curentă nu sunt încă gata: o greșeală frecventă în pipeline-urile programate. Un argument CLI cu data îți permite să faci backfill pentru orice zi din trecut cu python -m weather.pipeline 2026-03-15. Eșecurile re-aruncă după logare, deci cron vede ieșirea non-zero și Sentry vede excepția.

Programează-l

Pentru v1, cron:

# crontab -e
0 2 * * * cd /opt/weather_pipeline && /opt/weather_pipeline/.venv/bin/python -m weather.pipeline >> logs/cron.log 2>&1

Zilnic la 2 noaptea, cu logurile adăugate la coadă. Asta e tot. Pentru dimensiunea acestui pipeline, cron e sincer suficient. Motivele să avansezi la Prefect sau Airflow (lecția 41) sunt: mai multe pipeline-uri, dependențe între ele, un UI pentru stakeholderi non-tehnici, logică de retry mai inteligentă sau nevoia de a face backfill pentru intervale de date prin click-uri în loc de bucle shell.

Când avansezi, munca e mecanică. Versiunea Prefect a aceluiași pipeline e, în mare:

from prefect import flow, task
from datetime import date

@task(retries=3)
def t_extract(run_date): return extract(run_date)
@task
def t_transform(run_date): return transform(run_date)
@task
def t_load(run_date): return load(run_date)

@flow
def weather_flow(run_date: date | None = None):
    run_date = run_date or (date.today() - timedelta(days=1))
    t_extract(run_date)
    t_transform(run_date)
    t_load(run_date)

if __name__ == "__main__":
    weather_flow.serve(name="weather-daily", cron="0 2 * * *")

Aceeași logică, învelită în decoratoare flow / task, programată de Prefect Cloud, observabilă într-un UI real. Munca pe care ai făcut-o ca să faci extract / transform / load separabile e cea care a făcut migrarea ieftină.

Backfills

Decizia se plătește în ziua în care cineva spune „am pierdut o săptămână de date, poți să rulezi din nou?” Cu acest design:

for d in 2026-03-01 2026-03-02 2026-03-03 2026-03-04 2026-03-05; do
    python -m weather.pipeline $d
done

Gata. Fiecare zi e independentă, idempotentă, re-rulabilă în siguranță. Niciun „mod special de backfill”, nicio ștergere manuală, nicio surpriză cu duplicate. Proiectarea pentru backfill din ziua unu e singurul obicei cu cel mai mare leverage din data engineering.

Unde ajută asistenții AI și unde nu

E un moment bun pentru o notă onestă. Asistenții de cod (Claude, Copilot, Cursor) își câștigă pâinea pe părțile de lipici dintr-un pipeline. Decoratoare de retry cu backoff, boilerplate de logging structurat, „scrie-mi upsert-ul SQL pentru schema asta de dataframe”, „adaugă un argument CLI pentru dată” — cod plictisitor, structurat, predispus la erori, unde consistența contează și creativitatea nu. Îl scriu repede și bine.

Unde ajută cel mai puțin e arhitectura: unde să pui granița dintre extract și transform, ce să validezi în ce stadiu, cum să modelezi graful de active, care e contractul tău de idempotență. Acele decizii depind de context pe care modelul nu îl are: ciudățeniile surselor tale de date, convențiile echipei tale, toleranțele consumatorilor de mai jos. Ia-le tu; lasă asistentul să umple schela din jurul lor.

Ce lipsește

Aceasta este cea mai mică formă realistă, nu e gata de producție. Producția adaugă:

  • Teste: teste unitare pentru transform, teste de integrare contra unui container de test Postgres, teste de contract contra unui răspuns API înregistrat.
  • Infrastructură: containerizare, deploy pe un VM mic sau pe un runner gestionat, gestionarea secretelor prin platforma ta.
  • Alertare: peste Sentry, o alertă „pipeline-ul nu a rulat” (o verificare de heartbeat lipsă, adesea via un serviciu precum Healthchecks.io).
  • CI: lint, type-check și rulare de teste la fiecare PR.
  • Observabilitate: trimiterea logurilor către ELK / Loki / Datadog ca să poți căuta între rulări și metrici pe numărul de rânduri ca să prinzi sub-extragerea silențioasă.
  • Management de schemă: migrări via Alembic când tabelul evoluează, în loc de un CREATE IF NOT EXISTS.

Dar versiunea pe care am construit-o aici acoperă esența: date care curg fiabil de la sursă la destinație, monitorizate, idempotente, programabile, debug-abile. Acesta e standardul, și e suficient de înalt încât majoritatea pipeline-urilor din sălbăticie nu îl ating.

Modulul 7 în recapitulare și ce urmează

Am început Modulul 7 cu o introducere în data engineering ca disciplină. Am acoperit ETL versus ELT, formate moderne (Parquet, Iceberg, Delta), object stores, dbt, asyncio, orchestratoare și acum pipeline-uri. Ai văzut trusa de unelte pe care un junior data engineer e așteptat să o navigheze în 2026.

Modulul 8, Python numeric, întoarce camera invers. Acolo unde Modulele 6 și 7 erau despre lupta cu datele și mișcarea lor, Modulul 8 e despre calculul pe ele. NumPy în profunzime, vectorizare, broadcasting, ecosistemul SciPy, un tur al locurilor unde Python atinge munca numerică pe care pandas n-o atinge. Mai puțină instalație, mai multă matematică. Ne vedem acolo.


Citații: Open-Meteo archive API, tenacity docs, psycopg 3 docs, structlog docs, Prefect docs. Consultat 2026-05-01.

Caută