Python, de la zero Lecția 38 / 60

Construirea unui pipeline de ingestion: de la fisiere la baza de date

Un pipeline complet fisier-spre-Postgres in Python, cu pattern-urile care supravietuiesc contactului cu realitatea.

În lecția 37 am schițat forma ETL. Astăzi construim unul. Scenariul e cea mai comună formă de ingestion din industrie: fișierele ajung într-un director, Python le ridică, validează, încarcă în Postgres, mută originalele din cale. Vendori aruncă CSV-uri, o echipă internă scrie JSON, un partener urcă Parquet, pattern-ul e același. Fă-l corect o dată și îl reutilizezi ani de zile.

Vom parcurge întâi blocurile de construcție, apoi asamblăm un script real de ~200 de linii. Cod peste tot. Pattern-uri pe care le-am văzut eșuând în producție și fix-urile care țin.

Pattern-ul drop-zone

Forma:

data/
  incoming/    <- files arrive here
  processed/   <- successfully ingested files move here
  quarantine/  <- files that errored out

Un script Python interoghează incoming/ la fiecare N secunde, procesează ce găsește și mută. Trei directoare, un script. Suficient de simplu cât un junior îl poate debuga la 9 seara.

from pathlib import Path

DATA_ROOT = Path("/var/data")
INCOMING = DATA_ROOT / "incoming"
PROCESSED = DATA_ROOT / "processed"
QUARANTINE = DATA_ROOT / "quarantine"

for d in (INCOMING, PROCESSED, QUARANTINE):
    d.mkdir(parents=True, exist_ok=True)

De ce să mutăm fișiere în loc de doar a urmări care au fost văzute? Două motive. Primul, directorul e propriul afișaj de status: ops poate vedea dintr-o privire ce e blocat. Al doilea, e prietenos cu recovery-ul: aruncă un fișier din quarantine înapoi în incoming și l-ai reprocesat.

„A terminat fișierul ăsta de scris?”

Primul lucru care mușcă pe toată lumea. Fișierele nu apar atomic. Tool-ul de upload al unui vendor le scrie byte cu byte. Scriptul tău se trezește, vede orders.csv, îl deschide și citește primii 17 megabytes dintr-un fișier de 40 de megabytes. Rând pe jumătate la final. Validarea pică. Muți fișierul în quarantine. Două secunde mai târziu, upload-ul se termină. Ai pus în carantină un fișier perfect bun.

Trei pattern-uri ca să tratezi asta, în ordinea preferinței.

Pattern 1: producătorul scrie un marker .done. Vendorul urcă orders.csv, apoi imediat urcă un orders.csv.done gol. Scriptul tău procesează un fișier doar când însoțitorul lui .done există.

def is_ready(path: Path) -> bool:
    return path.with_suffix(path.suffix + ".done").exists()

Dacă deții producătorul, ăsta e răspunsul cel mai curat. Ieftin, fiabil, fără race.

Pattern 2: mtime stabil. Dacă nu poți schimba producătorul, verifică timpul de modificare al fișierului. Dacă nu s-a schimbat în N secunde, presupune că e gata.

import time

def is_stable(path: Path, quiet_seconds: float = 5.0) -> bool:
    age = time.time() - path.stat().st_mtime
    return age >= quiet_seconds

Mai puțin curat: trebuie să alegi o valoare quiet_seconds mai lungă decât orice pauză plauzibilă de upload, dar suficient de scurtă cât să ții latența rezonabilă. Cinci secunde e un default apărabil pentru date la scară umană.

Pattern 3: redenumire atomică. Producătorul scrie în orders.csv.tmp, redenumește în orders.csv când termină. Aceeași idee ca fișierul marker, dar folosește atomicitatea POSIX a rename. Merge doar dacă writer-ul e pe același filesystem.

Alege unul și ține-te de el. Bug-ul în care procesezi un fișier scris pe jumătate e genul care iese la suprafață la 4 dimineața de sărbătoare.

Validare de schemă: eșuează rapid

Al doilea lucru care mușcă pe toată lumea. Vendorul trimite un fișier. Schema arată ok. Datele sunt gunoi: date ca "01/02/2026" când așteptai ISO, string-uri de monedă cu prefix "$", nulls codate ca string-ul "NULL". Dacă le lași să treacă, rândurile proaste aterizează în baza ta de date și poluează tabele downstream săptămâni întregi.

Fix-ul e să validezi înainte să încarci și să eșuezi zgomotos când validarea pică. Două biblioteci își câștigă pâinea aici.

pydantic pentru validare la nivel de rând când poți descrie rândul ca un model:

from datetime import date
from decimal import Decimal
from pydantic import BaseModel, Field, EmailStr

class OrderRow(BaseModel):
    order_id: int
    customer_email: EmailStr
    amount: Decimal = Field(gt=0)
    order_date: date
    status: str = Field(pattern=r"^(pending|shipped|delivered|cancelled)$")
import csv
from pydantic import ValidationError

def parse_orders(path: Path) -> tuple[list[OrderRow], list[dict]]:
    valid, errors = [], []
    with path.open() as f:
        for line_no, raw in enumerate(csv.DictReader(f), start=2):
            try:
                valid.append(OrderRow.model_validate(raw))
            except ValidationError as e:
                errors.append({"line": line_no, "raw": raw, "errors": e.errors()})
    return valid, errors

Două output-uri: rânduri curate pe care le poți încărca și o listă de rânduri proaste pe care o poți scrie într-un quarantine log. Nu arunca rândurile proaste în tăcere. Nu arunca în aer întreg fișierul pentru un singur rând prost, nici. Calea de mijloc e un prag pe fișier („dacă mai mult de 1% din rânduri sunt proaste, pune întreg fișierul în quarantine”) și log-uri de eroare per rând pe care le poți arăta vendorului.

pyarrow pentru fișiere coloane cu tipare stricte (Parquet, Arrow IPC). Când fișierul e deja tipat, validarea de schemă din pyarrow e mai rapidă decât parsarea fiecărui rând prin pydantic.

Tabela de checkpoint

Dacă scriptul tău crapă la jumătate prin procesarea unui director cu 50 de fișiere, ce se întâmplă la restart? Răspuns naiv: vede aceleași 50 de fișiere și le reprocesează. Dacă încărcarea ta nu e idempotentă, primești duplicate.

Fix-ul e o mică tabelă de checkpoint în aceeași bază de date în care încarci. Un rând per fișier, înregistrând când a fost ingerat și rezultatul.

CREATE TABLE IF NOT EXISTS ingestion_checkpoints (
    filename TEXT PRIMARY KEY,
    file_size BIGINT NOT NULL,
    file_sha256 TEXT NOT NULL,
    rows_loaded INTEGER NOT NULL,
    rows_quarantined INTEGER NOT NULL,
    ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    status TEXT NOT NULL CHECK (status IN ('success', 'failed'))
);

Înainte de procesare, verifici dacă fișierul (după nume + sha256) e deja în tabelă. Dacă da, sari peste. Dacă nu, procesează și înregistrează. Sha256-ul prinde cazul în care vendorul a reutilizat un nume de fișier pentru un alt fișier: același nume, conținut diferit, ingestion diferit.

import hashlib

def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()

Încărcare în Postgres: ON CONFLICT și COPY

Pentru câteva mii de rânduri, insert-urile parametrizate sunt ok:

import psycopg
from psycopg.rows import dict_row

INSERT_SQL = """
INSERT INTO orders (order_id, customer_email, amount, order_date, status)
VALUES (%(order_id)s, %(customer_email)s, %(amount)s, %(order_date)s, %(status)s)
ON CONFLICT (order_id) DO UPDATE SET
    customer_email = EXCLUDED.customer_email,
    amount         = EXCLUDED.amount,
    order_date     = EXCLUDED.order_date,
    status         = EXCLUDED.status;
"""

def load_orders(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    payload = [r.model_dump() for r in rows]
    with conn.cursor() as cur:
        cur.executemany(INSERT_SQL, payload)
    return len(payload)

Clauza ON CONFLICT ... DO UPDATE e comutatorul de idempotență. Rulezi același fișier de două ori, obții aceeași stare finală. Asta e ce am exersat în lecția 37.

Pentru zeci de mii de rânduri sau mai mult, treci la COPY. E un ordin de magnitudine mai rapid decât executemany:

def load_orders_copy(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    with conn.cursor() as cur, cur.copy(
        "COPY orders_staging (order_id, customer_email, amount, order_date, status) FROM STDIN"
    ) as copy:
        for r in rows:
            copy.write_row((r.order_id, r.customer_email, r.amount, r.order_date, r.status))
    cur.execute("""
        INSERT INTO orders SELECT * FROM orders_staging
        ON CONFLICT (order_id) DO UPDATE SET
          customer_email = EXCLUDED.customer_email,
          amount         = EXCLUDED.amount,
          order_date     = EXCLUDED.order_date,
          status         = EXCLUDED.status;
        TRUNCATE orders_staging;
    """)
    return len(rows)

Pattern-ul: COPY într-o tabelă de staging (rapid), apoi INSERT ... SELECT ... ON CONFLICT din staging în tabela reală (idempotent). Ce-i mai bun din ambele.

Logging la fiecare pas

Adu lecția 11 mai aproape: structured logging la fiecare pas, cu numele fișierului în fiecare linie. Când ceva merge prost la 3 dimineața vrei să poți face grep pe log-uri după nume de fișier și să vezi toată povestea.

import logging
logger = logging.getLogger("ingest")

logger.info("processing", extra={"file": path.name, "size": path.stat().st_size})
logger.info("validated", extra={"file": path.name, "valid": len(valid), "errors": len(errors)})
logger.info("loaded", extra={"file": path.name, "rows": loaded})

Dacă folosești setup-ul cu structlog sau logging.config din lecția 11, acele câmpuri extra aterizează ca chei JSON. Tu cel din viitor vei fi recunoscător.

Scriptul complet

Două sute de linii, rulabil, cu opinii. Pune-l în ingest.py, indică-l către rădăcina ta data/, rulează-l sub cron sau ca serviciu systemd.

"""ingest.py — drop-zone CSV ingestion to Postgres."""
from __future__ import annotations
import csv
import hashlib
import logging
import shutil
import time
from datetime import date
from decimal import Decimal
from pathlib import Path

import psycopg
from pydantic import BaseModel, EmailStr, Field, ValidationError

DATA_ROOT = Path("/var/data")
INCOMING = DATA_ROOT / "incoming"
PROCESSED = DATA_ROOT / "processed"
QUARANTINE = DATA_ROOT / "quarantine"
DSN = "postgresql://etl:secret@localhost:5432/warehouse"
QUIET_SECONDS = 5.0
MAX_BAD_ROW_FRACTION = 0.01

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("ingest")


class OrderRow(BaseModel):
    order_id: int
    customer_email: EmailStr
    amount: Decimal = Field(gt=0)
    order_date: date
    status: str = Field(pattern=r"^(pending|shipped|delivered|cancelled)$")


def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()


def is_stable(path: Path) -> bool:
    return time.time() - path.stat().st_mtime >= QUIET_SECONDS


def already_ingested(conn: psycopg.Connection, name: str, digest: str) -> bool:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT 1 FROM ingestion_checkpoints WHERE filename=%s AND file_sha256=%s",
            (name, digest),
        )
        return cur.fetchone() is not None


def parse(path: Path) -> tuple[list[OrderRow], list[dict]]:
    valid: list[OrderRow] = []
    errors: list[dict] = []
    with path.open(newline="") as f:
        for line_no, raw in enumerate(csv.DictReader(f), start=2):
            try:
                valid.append(OrderRow.model_validate(raw))
            except ValidationError as e:
                errors.append({"line": line_no, "raw": raw, "errors": e.errors()})
    return valid, errors


INSERT_SQL = """
INSERT INTO orders (order_id, customer_email, amount, order_date, status)
VALUES (%(order_id)s, %(customer_email)s, %(amount)s, %(order_date)s, %(status)s)
ON CONFLICT (order_id) DO UPDATE SET
    customer_email = EXCLUDED.customer_email,
    amount         = EXCLUDED.amount,
    order_date     = EXCLUDED.order_date,
    status         = EXCLUDED.status;
"""


def load(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    if not rows:
        return 0
    with conn.cursor() as cur:
        cur.executemany(INSERT_SQL, [r.model_dump() for r in rows])
    return len(rows)


def checkpoint(
    conn: psycopg.Connection,
    path: Path,
    digest: str,
    loaded: int,
    quarantined: int,
    status: str,
) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO ingestion_checkpoints
              (filename, file_size, file_sha256, rows_loaded, rows_quarantined, status)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (filename) DO UPDATE SET
              file_size = EXCLUDED.file_size,
              file_sha256 = EXCLUDED.file_sha256,
              rows_loaded = EXCLUDED.rows_loaded,
              rows_quarantined = EXCLUDED.rows_quarantined,
              ingested_at = NOW(),
              status = EXCLUDED.status;
            """,
            (path.name, path.stat().st_size, digest, loaded, quarantined, status),
        )


def process_one(conn: psycopg.Connection, path: Path) -> None:
    log.info("processing %s", path.name)
    digest = sha256_of(path)
    if already_ingested(conn, path.name, digest):
        log.info("already ingested, skipping: %s", path.name)
        path.rename(PROCESSED / path.name)
        return

    valid, errors = parse(path)
    total = len(valid) + len(errors)
    bad_fraction = len(errors) / total if total else 0.0

    if bad_fraction > MAX_BAD_ROW_FRACTION:
        log.error("too many bad rows in %s (%.2f%%), quarantining", path.name, bad_fraction * 100)
        shutil.move(path, QUARANTINE / path.name)
        checkpoint(conn, path, digest, 0, len(errors), "failed")
        conn.commit()
        return

    try:
        with conn.transaction():
            loaded = load(conn, valid)
            checkpoint(conn, path, digest, loaded, len(errors), "success")
        log.info("loaded %d rows from %s (%d errors)", loaded, path.name, len(errors))
        path.rename(PROCESSED / path.name)
    except Exception:
        log.exception("load failed for %s", path.name)
        shutil.move(path, QUARANTINE / path.name)
        with conn.transaction():
            checkpoint(conn, path, digest, 0, len(errors), "failed")


def run_once() -> None:
    with psycopg.connect(DSN, autocommit=False) as conn:
        for path in sorted(INCOMING.glob("*.csv")):
            if not is_stable(path):
                log.debug("not stable yet: %s", path.name)
                continue
            try:
                process_one(conn, path)
            except Exception:
                log.exception("unexpected failure on %s", path.name)


def main() -> None:
    for d in (INCOMING, PROCESSED, QUARANTINE):
        d.mkdir(parents=True, exist_ok=True)
    while True:
        run_once()
        time.sleep(10)


if __name__ == "__main__":
    main()

Asta e tot. Tratează race-ul cu fișiere scrise pe jumătate, validează schema înainte de încărcare, eșuează rapid pe fișiere cu prea multe rânduri proaste, încarcă idempotent prin ON CONFLICT, deduplică după nume + sha256, loghează informații structurate la fiecare pas și se recuperează curat la restart. Două sute de linii sunt suficiente pentru un pipeline real.

Să ruleze ăsta ca serviciu?

Scriptul de mai sus e o buclă infinită. E ok pentru v1: rulează-l sub nohup sau într-o sesiune tmux și pleacă. Pentru v2, îl învelești într-o unitate systemd ca să repornească la crash și să pornească la boot:

# /etc/systemd/system/ingest.service
[Unit]
Description=Drop-zone ingestion
After=postgresql.service network.target

[Service]
ExecStart=/opt/venv/bin/python /opt/ingest/ingest.py
Restart=on-failure
RestartSec=5
User=etl

[Install]
WantedBy=multi-user.target

systemctl enable --now ingest și ai terminat.

Pentru v3, când acest job are dependențe de alte job-uri, sau are nevoie de monitorizare SLA, sau vrei un UI ca să vezi istoria rulărilor, absolvi la Airflow / Prefect / Dagster. Asta e lecția 41. Nu te repezi la un orchestrator înainte să ai nevoie de el; o buclă systemd și o tabelă de checkpoint te duc departe.

Ce ai construit

Un pipeline real de ingestion. Fișierele aterizează, sunt validate, sunt încărcate idempotent, sunt urmărite. Eșecurile merg în quarantine în loc să corupă baza de date. Repornirile sunt sigure. Aceeași formă merge pentru JSON, Parquet, fișiere fixed-width, API-uri de vendor care aruncă în S3: schimbi funcția parse și ești pornit.

În lecția 39 atacăm cealaltă jumătate: tras date din API-uri, cu retries, rate limits și paginare.

Citări

Caută