Questa è la lezione di chiusura del Modulo 7 e, come il progetto di analisi alla fine del Modulo 6, è pratica. Costruiamo una pipeline di dati completa: estraiamo ogni giorno da una API pubblica, trasformiamo con pandas, carichiamo in Postgres, schedulazione con cron, log per ogni step, alert in caso di errore, e la progettiamo in modo da poter rieseguire in sicurezza qualunque giorno passato. Alla fine avrai una directory che potresti realisticamente committare in un repo.
La pipeline scarica le osservazioni meteorologiche giornaliere per una lista di città dalla API Open-Meteo (gratuita, senza chiave, ben educata), trasforma il JSON in righe ordinate e carica in una tabella Postgres. Lo stesso pattern funziona per qualunque API pubblica; Open-Meteo ci risparmia solo la danza delle credenziali.
La forma
Prima di qualunque codice, il layout. Una pipeline vera non è un singolo script, è una directory con responsabilità separate:
weather_pipeline/
├── pyproject.toml
├── src/
│ └── weather/
│ ├── __init__.py
│ ├── extract.py # API → file JSON grezzi
│ ├── transform.py # JSON grezzo → Parquet
│ ├── load.py # Parquet → Postgres
│ ├── pipeline.py # orchestra i tre sopra
│ ├── config.py # città, percorsi, DSN
│ └── notify.py # alert sugli errori
├── data/
│ ├── raw/ # JSON, un file per (data, città)
│ └── processed/ # Parquet, un file per data
├── logs/
└── README.md
Tre layer: extract, transform, load. Ognuno scrive il proprio output su disco prima che il successivo lo legga. Questa separazione è ciò che rende la pipeline debuggabile, riavviabile e idempotente. Se transform si rompe, i dati grezzi sono già su disco. Se load si rompe, i dati processati sono già su disco. Non riesegui la chiamata alla API per sistemare un bug di parsing.
Setup con uv
uv init weather_pipeline
cd weather_pipeline
uv add httpx tenacity pandas pyarrow psycopg[binary] python-dotenv structlog sentry-sdk
uv add --dev pytest ruff mypy
Questo ci dà: httpx per la API, tenacity per i retry, pandas e pyarrow per la trasformazione, psycopg 3 per Postgres, python-dotenv per i segreti, structlog per il logging strutturato, sentry-sdk per gli alert. Default moderni e sensati per il 2026.
Config
# src/weather/config.py
from pathlib import Path
import os
from dotenv import load_dotenv
load_dotenv()
ROOT = Path(__file__).parent.parent.parent
RAW_DIR = ROOT / "data" / "raw"
PROCESSED_DIR = ROOT / "data" / "processed"
LOG_DIR = ROOT / "logs"
for d in (RAW_DIR, PROCESSED_DIR, LOG_DIR):
d.mkdir(parents=True, exist_ok=True)
CITIES = [
{"name": "Milan", "lat": 45.46, "lon": 9.19},
{"name": "Rome", "lat": 41.90, "lon": 12.50},
{"name": "Naples", "lat": 40.85, "lon": 14.27},
{"name": "Turin", "lat": 45.07, "lon": 7.69},
{"name": "Florence", "lat": 43.77, "lon": 11.25},
]
PG_DSN = os.environ["WEATHER_PG_DSN"]
SENTRY_DSN = os.environ.get("SENTRY_DSN")
Nota che non ci sono segreti nel codice. Il DSN arriva da .env (gitignorato) in dev e da vere variabili d’ambiente in produzione.
Extract
# src/weather/extract.py
import json
from datetime import date
from pathlib import Path
import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from weather.config import CITIES, RAW_DIR
log = structlog.get_logger()
API = "https://archive-api.open-meteo.com/v1/archive"
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, max=60),
retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
reraise=True,
)
def fetch_city(client: httpx.Client, city: dict, run_date: date) -> dict:
log.info("fetch_city.start", city=city["name"], date=str(run_date))
response = client.get(
API,
params={
"latitude": city["lat"],
"longitude": city["lon"],
"start_date": run_date.isoformat(),
"end_date": run_date.isoformat(),
"hourly": "temperature_2m,precipitation,wind_speed_10m",
},
timeout=30,
)
response.raise_for_status()
return response.json()
def extract(run_date: date) -> Path:
out_dir = RAW_DIR / run_date.isoformat()
out_dir.mkdir(parents=True, exist_ok=True)
with httpx.Client(http2=True) as client:
for city in CITIES:
out_file = out_dir / f"{city['name'].lower()}.json"
if out_file.exists():
log.info("extract.skip_existing", city=city["name"])
continue
data = fetch_city(client, city, run_date)
data["_meta"] = {"city": city["name"], "run_date": run_date.isoformat()}
out_file.write_text(json.dumps(data))
log.info("fetch_city.done", city=city["name"], bytes=len(out_file.read_bytes()))
return out_dir
Tre cose da notare. Tenacity avvolge la chiamata HTTP: 5 tentativi, backoff esponenziale con tetto a 60 secondi, retry solo su errori di rete (non su 4xx, che non miglioreranno). Idempotenza a livello di file: se il JSON per una città esiste già, lo salta. Rieseguire l’extract per una data è sicuro e gratis. Logging strutturato via structlog: ogni evento è una riga JSON con campi nominati, facile da fare grep, da parsare, da spedire a un aggregatore di log.
Transform
# src/weather/transform.py
import json
from datetime import date
from pathlib import Path
import pandas as pd
import structlog
from weather.config import RAW_DIR, PROCESSED_DIR
log = structlog.get_logger()
def transform(run_date: date) -> Path:
in_dir = RAW_DIR / run_date.isoformat()
rows = []
for path in sorted(in_dir.glob("*.json")):
payload = json.loads(path.read_text())
city = payload["_meta"]["city"]
hourly = payload["hourly"]
df = pd.DataFrame(hourly)
df["timestamp"] = pd.to_datetime(df["time"])
df["city"] = city
df["run_date"] = pd.Timestamp(run_date)
df = df.drop(columns=["time"])
rows.append(df)
if not rows:
raise RuntimeError(f"no raw files for {run_date}")
combined = pd.concat(rows, ignore_index=True)
# Validation — fail loudly on impossible values.
if combined["temperature_2m"].dropna().between(-60, 60).all() is False:
bad = combined[~combined["temperature_2m"].between(-60, 60, inclusive="both")]
raise ValueError(f"impossible temperatures: {bad}")
if combined.duplicated(subset=["city", "timestamp"]).any():
raise ValueError("duplicate (city, timestamp) rows after combine")
out_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
combined.to_parquet(out_file, index=False)
log.info("transform.done", rows=len(combined), out=str(out_file))
return out_file
La trasformazione legge ogni JSON grezzo per la data, denormalizza il dict hourly annidato in righe, aggiunge la colonna città (la API non la rimanda indietro) e valida. La validazione vive qui, non in load. Beccare valori impossibili prima del database è più pulito: Postgres accetterà qualunque cosa gli dai; tu vuoi l’errore leggibile da un essere umano prima di arrivare lì.
L’output Parquet è il contratto con lo step di load. Parquet invece di CSV perché preserva i dtype, comprime bene e si legge veloce.
Load
# src/weather/load.py
from datetime import date
from pathlib import Path
import pandas as pd
import psycopg
import structlog
from weather.config import PG_DSN, PROCESSED_DIR
log = structlog.get_logger()
DDL = """
CREATE TABLE IF NOT EXISTS weather_hourly (
city TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
temperature_2m DOUBLE PRECISION,
precipitation DOUBLE PRECISION,
wind_speed_10m DOUBLE PRECISION,
run_date DATE NOT NULL,
PRIMARY KEY (city, timestamp)
);
"""
UPSERT = """
INSERT INTO weather_hourly (city, timestamp, temperature_2m, precipitation, wind_speed_10m, run_date)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (city, timestamp) DO UPDATE SET
temperature_2m = EXCLUDED.temperature_2m,
precipitation = EXCLUDED.precipitation,
wind_speed_10m = EXCLUDED.wind_speed_10m,
run_date = EXCLUDED.run_date;
"""
def load(run_date: date) -> int:
in_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
df = pd.read_parquet(in_file)
rows = list(df[["city", "timestamp", "temperature_2m", "precipitation", "wind_speed_10m", "run_date"]].itertuples(index=False, name=None))
with psycopg.connect(PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(DDL)
cur.executemany(UPSERT, rows)
conn.commit()
log.info("load.done", rows=len(rows), date=str(run_date))
return len(rows)
Lo step di load usa INSERT ... ON CONFLICT ... DO UPDATE, l’upsert di Postgres. La chiave primaria è (city, timestamp). Rieseguire il load per la stessa data sovrascrive le righe esistenti con quelle nuove; se un valore è cambiato (perché la sorgente l’ha corretto), il database si aggiorna. Questo è ciò che rende la pipeline idempotente da un capo all’altro: rieseguire per una data passata non duplica, non genera conflitti, non richiede pulizia manuale.
Notification
# src/weather/notify.py
import os
import sentry_sdk
def init():
dsn = os.environ.get("SENTRY_DSN")
if dsn:
sentry_sdk.init(dsn=dsn, traces_sample_rate=0.0)
In produzione inizializzi Sentry allo startup e qualunque eccezione non catturata viene segnalata, con traceback, locals e contesto. Se non vuoi la dipendenza da Sentry, un fallback SMTP da 20 righe va bene; assicurati solo che i fallimenti raggiungano un essere umano.
L’orchestratore
# src/weather/pipeline.py
import sys
from datetime import date, timedelta
import structlog
from weather import notify
from weather.extract import extract
from weather.transform import transform
from weather.load import load
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
log = structlog.get_logger()
def run(run_date: date) -> None:
notify.init()
log.info("pipeline.start", date=str(run_date))
try:
extract(run_date)
transform(run_date)
n = load(run_date)
log.info("pipeline.done", date=str(run_date), rows=n)
except Exception:
log.exception("pipeline.failed", date=str(run_date))
raise
if __name__ == "__main__":
if len(sys.argv) > 1:
target = date.fromisoformat(sys.argv[1])
else:
target = date.today() - timedelta(days=1)
run(target)
Tre osservazioni. Il default è ieri, non oggi, perché i dati di tutta la giornata di oggi non sono ancora pronti: un errore comune nelle pipeline schedulate. Un argomento CLI per la data ti permette di fare il backfill di qualunque giorno passato con python -m weather.pipeline 2026-03-15. I fallimenti ri-sollevano l’eccezione dopo aver loggato, così cron vede l’exit code diverso da zero e Sentry vede l’eccezione.
Schedulalo
Per la v1, cron:
# crontab -e
0 2 * * * cd /opt/weather_pipeline && /opt/weather_pipeline/.venv/bin/python -m weather.pipeline >> logs/cron.log 2>&1
Le 2 di notte ogni giorno, con i log appesi in coda. Tutto qui. Per le dimensioni di questa pipeline, cron è sinceramente sufficiente. I motivi per fare il salto a Prefect o Airflow (lezione 41) sono: più pipeline, dipendenze tra di loro, una UI per stakeholder non tecnici, retry logic più intelligente, o la necessità di fare backfill di intervalli di date a click invece che con loop di shell.
Quando fai il salto, il lavoro è meccanico. La versione Prefect della stessa pipeline è grosso modo:
from prefect import flow, task
from datetime import date
@task(retries=3)
def t_extract(run_date): return extract(run_date)
@task
def t_transform(run_date): return transform(run_date)
@task
def t_load(run_date): return load(run_date)
@flow
def weather_flow(run_date: date | None = None):
run_date = run_date or (date.today() - timedelta(days=1))
t_extract(run_date)
t_transform(run_date)
t_load(run_date)
if __name__ == "__main__":
weather_flow.serve(name="weather-daily", cron="0 2 * * *")
Stessa logica, avvolta in decoratori flow / task, schedulata da Prefect Cloud, osservabile in una vera UI. Il lavoro che hai fatto per rendere extract / transform / load separabili è ciò che ha reso la migrazione economica.
Backfill
La decisione paga il giorno in cui qualcuno dice “abbiamo perso una settimana di dati, puoi rieseguirli?”. Con questo design:
for d in 2026-03-01 2026-03-02 2026-03-03 2026-03-04 2026-03-05; do
python -m weather.pipeline $d
done
Fatto. Ogni giorno è indipendente, idempotente, rieseguibile in sicurezza. Nessuna “modalità backfill” speciale, nessuna cancellazione manuale, nessun duplicato a sorpresa. Progettare per il backfill dal primo giorno è la singola abitudine a maggior leva nel data engineering.
Dove gli assistenti AI aiutano, e dove no
Questo è un buon momento per una nota onesta. Gli assistenti di codice (Claude, Copilot, Cursor) si guadagnano lo stipendio sulle parti di colla di una pipeline. Decoratori di retry con backoff, boilerplate di logging strutturato, “scrivimi l’upsert SQL per questo schema di dataframe”, “aggiungi un argomento CLI per la data”: codice noioso, strutturato, soggetto a errori, dove la consistenza conta e la creatività no. Quel codice lo scrivono velocemente e bene.
Dove aiutano di meno è l’architettura: dove mettere il confine tra extract e transform, cosa validare in quale stage, come modellare il grafo degli asset, qual è il tuo contratto di idempotenza. Quelle decisioni dipendono da contesto che il modello non ha: le stranezze delle tue sorgenti dati, le convenzioni del tuo team, le tolleranze dei tuoi consumatori downstream. Prendile tu; lascia che l’assistente riempia l’impalcatura intorno.
Cosa manca
Questa è la forma minima realistica, non pronta per la produzione. La produzione aggiunge:
- Test: unit test per transform, integration test contro un container Postgres di test, contract test contro una risposta API registrata.
- Infrastruttura: containerizzare, fare il deploy su una piccola VM o un runner gestito, gestire i segreti tramite la tua piattaforma.
- Alerting: oltre a Sentry, un alert “la pipeline non è partita” (un check di heartbeat mancante, spesso tramite un servizio come Healthchecks.io).
- CI: lint, type-check ed esecuzione dei test su ogni PR.
- Osservabilità: spedizione dei log su ELK / Loki / Datadog così da poter cercare attraverso le run, e metriche sui conteggi delle righe per beccare under-fetching silenziosi.
- Schema management: migrazioni via Alembic quando la tabella evolve, invece di un
CREATE IF NOT EXISTS.
Ma la versione che abbiamo costruito qui copre il nucleo: dati che fluiscono in modo affidabile dalla sorgente alla destinazione, monitorati, idempotenti, schedulabili, debuggabili. Quella è l’asticella, e è abbastanza alta che la maggior parte delle pipeline in giro non la supera.
Modulo 7 in rassegna, e cosa viene dopo
Abbiamo iniziato il Modulo 7 con un’introduzione al data engineering come disciplina. Abbiamo coperto ETL contro ELT, formati moderni (Parquet, Iceberg, Delta), object store, dbt, asyncio, orchestratori, e ora le pipeline. Hai visto il toolkit che ci si aspetta che un data engineer junior sappia navigare nel 2026.
Il Modulo 8, Python numerico, gira la telecamera dall’altra parte. Mentre i Moduli 6 e 7 erano sul wrangling e sullo spostamento dei dati, il Modulo 8 è sul calcolo che ci si fa sopra. NumPy in profondità, vettorizzazione, broadcasting, l’ecosistema SciPy, un giro dove Python tocca il lavoro numerico che pandas non raggiunge. Meno idraulica, più matematica. Ci vediamo lì.
Citazioni: Open-Meteo archive API, tenacity docs, psycopg 3 docs, structlog docs, Prefect docs. Recuperati il 2026-05-01.