Python, dalle fondamenta Lezione 38 / 60

Costruire una pipeline di ingestion: dai file al database

Una pipeline completa file-verso-Postgres in Python, con i pattern che sopravvivono al contatto con la realtà.

Nella lezione 37 abbiamo abbozzato la forma di ETL. Oggi ne costruiamo uno. Lo scenario è la forma di ingestion più comune nell’industria: i file arrivano in una directory, Python li raccoglie, valida, carica su Postgres, sposta gli originali fuori dai piedi. I vendor lasciano CSV, un team interno scrive JSON, un partner carica Parquet: il pattern è lo stesso. Fallo bene una volta e lo riusi per anni.

Lavoreremo prima sui mattoni costitutivi, poi assembleremo uno script reale di ~200 righe. Codice ovunque. Pattern che ho visto fallire in produzione e i fix che reggono.

Il pattern della drop-zone

La forma:

data/
  incoming/    <- files arrive here
  processed/   <- successfully ingested files move here
  quarantine/  <- files that errored out

Uno script Python fa polling su incoming/ ogni N secondi, processa quello che trova, e lo sposta. Tre directory, uno script. Abbastanza semplice perché un junior possa fare debug alle 9 di sera.

from pathlib import Path

DATA_ROOT = Path("/var/data")
INCOMING = DATA_ROOT / "incoming"
PROCESSED = DATA_ROOT / "processed"
QUARANTINE = DATA_ROOT / "quarantine"

for d in (INCOMING, PROCESSED, QUARANTINE):
    d.mkdir(parents=True, exist_ok=True)

Perché spostare i file invece di tracciare semplicemente quali abbiamo visto? Due ragioni. Primo, la directory è il suo proprio display di stato: ops può vedere a colpo d’occhio cosa è bloccato. Secondo, è recovery-friendly: scarichi un file da quarantine di nuovo in incoming e lo hai riprocessato.

”Questo file ha finito di scriversi?”

La prima cosa che morde tutti. I file non appaiono atomicamente. Il tool di upload di un vendor li scrive byte per byte. Il tuo script si sveglia, vede orders.csv, lo apre, e legge i primi 17 megabyte di un file da 40 megabyte. Mezza riga alla fine. La validazione fallisce. Sposti il file in quarantine. Due secondi dopo l’upload finisce. Hai messo in quarantena un file perfettamente buono.

Tre pattern per gestirlo, in ordine di preferenza.

Pattern 1: il producer scrive un marker .done. Il vendor carica orders.csv, poi immediatamente carica un orders.csv.done vuoto. Il tuo script processa un file solo quando il suo compagno .done esiste.

def is_ready(path: Path) -> bool:
    return path.with_suffix(path.suffix + ".done").exists()

Se controlli il producer questa è la risposta più pulita. Economica, affidabile, nessuna race.

Pattern 2: mtime stabile. Se non puoi cambiare il producer, controlla il tempo di modifica del file. Se non è cambiato in N secondi, assumi che sia finito.

import time

def is_stable(path: Path, quiet_seconds: float = 5.0) -> bool:
    age = time.time() - path.stat().st_mtime
    return age >= quiet_seconds

Meno pulito: devi scegliere un valore di quiet_seconds più lungo di qualsiasi pausa di upload plausibile ma abbastanza corto da tenere la latenza ragionevole. Cinque secondi è un default difendibile per dati di scala umana.

Pattern 3: rename atomica. Il producer scrive su orders.csv.tmp, lo rinomina in orders.csv quando ha finito. Stessa idea del marker file, ma usa l’atomicità della rename POSIX. Funziona solo se lo writer è sullo stesso filesystem.

Sceglione uno e attieniti a quello. Il bug in cui processi un file scritto a metà è il tipo che emerge alle 4 di mattina di un giorno festivo.

Validazione dello schema: fallisci presto

La seconda cosa che morde tutti. Il vendor manda un file. Lo schema sembra a posto. I dati sono spazzatura: date come "01/02/2026" quando ti aspettavi ISO, stringhe di valuta con prefissi "$", null codificati come la stringa "NULL". Se lo lasci passare, le righe sbagliate atterrano nel tuo database e contaminano le tabelle a valle per settimane.

Il fix è validare prima di caricare, e fallire rumorosamente quando la validazione fallisce. Due librerie si guadagnano la pagnotta qui.

pydantic per la validazione a livello di riga quando puoi descrivere la riga come un modello:

from datetime import date
from decimal import Decimal
from pydantic import BaseModel, Field, EmailStr

class OrderRow(BaseModel):
    order_id: int
    customer_email: EmailStr
    amount: Decimal = Field(gt=0)
    order_date: date
    status: str = Field(pattern=r"^(pending|shipped|delivered|cancelled)$")
import csv
from pydantic import ValidationError

def parse_orders(path: Path) -> tuple[list[OrderRow], list[dict]]:
    valid, errors = [], []
    with path.open() as f:
        for line_no, raw in enumerate(csv.DictReader(f), start=2):
            try:
                valid.append(OrderRow.model_validate(raw))
            except ValidationError as e:
                errors.append({"line": line_no, "raw": raw, "errors": e.errors()})
    return valid, errors

Due output: righe pulite che puoi caricare, e una lista di righe sbagliate che puoi scrivere su un log di quarantena. Non scartare le righe sbagliate silenziosamente. Non far esplodere l’intero file per una riga sbagliata, neanche: la via di mezzo è una soglia per file (“se più dell’1% delle righe è sbagliato, metti in quarantena l’intero file”) e log di errore per riga che puoi mostrare al vendor.

pyarrow per file colonnari con tipi rigidi (Parquet, Arrow IPC). Quando il file è già tipizzato, la validazione dello schema di pyarrow è più veloce del parsing di ogni riga attraverso pydantic.

La tabella di checkpoint

Se il tuo script crasha a metà processamento di una directory di 50 file, cosa succede al riavvio? Risposta ingenua: vede gli stessi 50 file e li riprocessa. Se il tuo load non è idempotente, ottieni duplicati.

Il fix è una piccola tabella di checkpoint nello stesso database su cui stai caricando. Una riga per file, che registra quando è stato ingerito e il risultato.

CREATE TABLE IF NOT EXISTS ingestion_checkpoints (
    filename TEXT PRIMARY KEY,
    file_size BIGINT NOT NULL,
    file_sha256 TEXT NOT NULL,
    rows_loaded INTEGER NOT NULL,
    rows_quarantined INTEGER NOT NULL,
    ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    status TEXT NOT NULL CHECK (status IN ('success', 'failed'))
);

Prima di processare, controlla se il file (per nome + sha256) è già nella tabella. Se sì, salta. Se no, processa e registra. Lo sha256 cattura il caso in cui il vendor ha riusato un nome file per un file diverso: stesso nome, contenuto diverso, ingestion diversa.

import hashlib

def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()

Caricare su Postgres: ON CONFLICT e COPY

Per qualche migliaio di righe, gli insert parametrizzati vanno bene:

import psycopg
from psycopg.rows import dict_row

INSERT_SQL = """
INSERT INTO orders (order_id, customer_email, amount, order_date, status)
VALUES (%(order_id)s, %(customer_email)s, %(amount)s, %(order_date)s, %(status)s)
ON CONFLICT (order_id) DO UPDATE SET
    customer_email = EXCLUDED.customer_email,
    amount         = EXCLUDED.amount,
    order_date     = EXCLUDED.order_date,
    status         = EXCLUDED.status;
"""

def load_orders(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    payload = [r.model_dump() for r in rows]
    with conn.cursor() as cur:
        cur.executemany(INSERT_SQL, payload)
    return len(payload)

La clausola ON CONFLICT ... DO UPDATE è l’interruttore dell’idempotenza. Esegui lo stesso file due volte, ottieni lo stesso stato finale. È quello che abbiamo trapanato nella lezione 37.

Per decine di migliaia di righe o più, passa a COPY. È un ordine di grandezza più veloce di executemany:

def load_orders_copy(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    with conn.cursor() as cur, cur.copy(
        "COPY orders_staging (order_id, customer_email, amount, order_date, status) FROM STDIN"
    ) as copy:
        for r in rows:
            copy.write_row((r.order_id, r.customer_email, r.amount, r.order_date, r.status))
    cur.execute("""
        INSERT INTO orders SELECT * FROM orders_staging
        ON CONFLICT (order_id) DO UPDATE SET
          customer_email = EXCLUDED.customer_email,
          amount         = EXCLUDED.amount,
          order_date     = EXCLUDED.order_date,
          status         = EXCLUDED.status;
        TRUNCATE orders_staging;
    """)
    return len(rows)

Il pattern: COPY in una tabella di staging (veloce), poi INSERT ... SELECT ... ON CONFLICT da staging alla tabella reale (idempotente). Il meglio dei due mondi.

Logging a ogni passo

Tira avanti la lezione 11: structured logging a ogni passo, con il filename in ogni riga. Quando qualcosa va storto alle 3 di mattina vuoi poter fare grep dei log per filename e vedere l’intera storia.

import logging
logger = logging.getLogger("ingest")

logger.info("processing", extra={"file": path.name, "size": path.stat().st_size})
logger.info("validated", extra={"file": path.name, "valid": len(valid), "errors": len(errors)})
logger.info("loaded", extra={"file": path.name, "rows": loaded})

Se stai usando il setup structlog o logging.config della lezione 11, quei campi extra atterrano come chiavi JSON. Te del futuro sarà grato.

Lo script intero

Duecento righe, eseguibile, con opinioni. Mettilo in ingest.py, puntalo alla tua root data/, eseguilo sotto cron o come servizio systemd.

"""ingest.py — drop-zone CSV ingestion to Postgres."""
from __future__ import annotations
import csv
import hashlib
import logging
import shutil
import time
from datetime import date
from decimal import Decimal
from pathlib import Path

import psycopg
from pydantic import BaseModel, EmailStr, Field, ValidationError

DATA_ROOT = Path("/var/data")
INCOMING = DATA_ROOT / "incoming"
PROCESSED = DATA_ROOT / "processed"
QUARANTINE = DATA_ROOT / "quarantine"
DSN = "postgresql://etl:secret@localhost:5432/warehouse"
QUIET_SECONDS = 5.0
MAX_BAD_ROW_FRACTION = 0.01

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("ingest")


class OrderRow(BaseModel):
    order_id: int
    customer_email: EmailStr
    amount: Decimal = Field(gt=0)
    order_date: date
    status: str = Field(pattern=r"^(pending|shipped|delivered|cancelled)$")


def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()


def is_stable(path: Path) -> bool:
    return time.time() - path.stat().st_mtime >= QUIET_SECONDS


def already_ingested(conn: psycopg.Connection, name: str, digest: str) -> bool:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT 1 FROM ingestion_checkpoints WHERE filename=%s AND file_sha256=%s",
            (name, digest),
        )
        return cur.fetchone() is not None


def parse(path: Path) -> tuple[list[OrderRow], list[dict]]:
    valid: list[OrderRow] = []
    errors: list[dict] = []
    with path.open(newline="") as f:
        for line_no, raw in enumerate(csv.DictReader(f), start=2):
            try:
                valid.append(OrderRow.model_validate(raw))
            except ValidationError as e:
                errors.append({"line": line_no, "raw": raw, "errors": e.errors()})
    return valid, errors


INSERT_SQL = """
INSERT INTO orders (order_id, customer_email, amount, order_date, status)
VALUES (%(order_id)s, %(customer_email)s, %(amount)s, %(order_date)s, %(status)s)
ON CONFLICT (order_id) DO UPDATE SET
    customer_email = EXCLUDED.customer_email,
    amount         = EXCLUDED.amount,
    order_date     = EXCLUDED.order_date,
    status         = EXCLUDED.status;
"""


def load(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    if not rows:
        return 0
    with conn.cursor() as cur:
        cur.executemany(INSERT_SQL, [r.model_dump() for r in rows])
    return len(rows)


def checkpoint(
    conn: psycopg.Connection,
    path: Path,
    digest: str,
    loaded: int,
    quarantined: int,
    status: str,
) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO ingestion_checkpoints
              (filename, file_size, file_sha256, rows_loaded, rows_quarantined, status)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (filename) DO UPDATE SET
              file_size = EXCLUDED.file_size,
              file_sha256 = EXCLUDED.file_sha256,
              rows_loaded = EXCLUDED.rows_loaded,
              rows_quarantined = EXCLUDED.rows_quarantined,
              ingested_at = NOW(),
              status = EXCLUDED.status;
            """,
            (path.name, path.stat().st_size, digest, loaded, quarantined, status),
        )


def process_one(conn: psycopg.Connection, path: Path) -> None:
    log.info("processing %s", path.name)
    digest = sha256_of(path)
    if already_ingested(conn, path.name, digest):
        log.info("already ingested, skipping: %s", path.name)
        path.rename(PROCESSED / path.name)
        return

    valid, errors = parse(path)
    total = len(valid) + len(errors)
    bad_fraction = len(errors) / total if total else 0.0

    if bad_fraction > MAX_BAD_ROW_FRACTION:
        log.error("too many bad rows in %s (%.2f%%), quarantining", path.name, bad_fraction * 100)
        shutil.move(path, QUARANTINE / path.name)
        checkpoint(conn, path, digest, 0, len(errors), "failed")
        conn.commit()
        return

    try:
        with conn.transaction():
            loaded = load(conn, valid)
            checkpoint(conn, path, digest, loaded, len(errors), "success")
        log.info("loaded %d rows from %s (%d errors)", loaded, path.name, len(errors))
        path.rename(PROCESSED / path.name)
    except Exception:
        log.exception("load failed for %s", path.name)
        shutil.move(path, QUARANTINE / path.name)
        with conn.transaction():
            checkpoint(conn, path, digest, 0, len(errors), "failed")


def run_once() -> None:
    with psycopg.connect(DSN, autocommit=False) as conn:
        for path in sorted(INCOMING.glob("*.csv")):
            if not is_stable(path):
                log.debug("not stable yet: %s", path.name)
                continue
            try:
                process_one(conn, path)
            except Exception:
                log.exception("unexpected failure on %s", path.name)


def main() -> None:
    for d in (INCOMING, PROCESSED, QUARANTINE):
        d.mkdir(parents=True, exist_ok=True)
    while True:
        run_once()
        time.sleep(10)


if __name__ == "__main__":
    main()

Ecco fatto. Gestisce la race del file scritto a metà, valida lo schema prima di caricare, fallisce presto sui file con troppe righe sbagliate, fa load idempotenti via ON CONFLICT, deduplica per filename + sha256, logga informazioni strutturate a ogni passo, e si riprende pulitamente al riavvio. Duecento righe sono abbastanza per una pipeline reale.

Dovrebbe girare come servizio?

Lo script qui sopra è un loop infinito. Per la v1 va bene: eseguilo sotto nohup o in una sessione tmux e vai via. Per la v2, avvolgilo in una systemd unit così riparte al crash e si avvia al boot:

# /etc/systemd/system/ingest.service
[Unit]
Description=Drop-zone ingestion
After=postgresql.service network.target

[Service]
ExecStart=/opt/venv/bin/python /opt/ingest/ingest.py
Restart=on-failure
RestartSec=5
User=etl

[Install]
WantedBy=multi-user.target

systemctl enable --now ingest e hai finito.

Per la v3, quando questo job ha dipendenze su altri job, o serve monitoring SLA, o vuoi una UI per vedere la storia delle run, passa a Airflow / Prefect / Dagster. Quella è la lezione 41. Non ricorrere a un orchestrator prima di averne bisogno; un loop systemd e una tabella di checkpoint ti porteranno lontano.

Cosa hai costruito

Una pipeline di ingestion vera. I file atterrano, vengono validati, vengono caricati idempotentemente, vengono tracciati. I fallimenti si spostano in quarantena invece di corrompere il database. I riavvii sono sicuri. La stessa forma funziona per JSON, Parquet, file a larghezza fissa, API di vendor che scaricano su S3: cambi la funzione parse e sei a posto.

Nella lezione 39 affrontiamo l’altra metà: tirare dati da API, con retry, rate limit e paginazione.

Citazioni

Cerca