Python, from the ground up Lesson 38 / 60

Building an ingestion pipeline: files to database

A complete file-to-Postgres pipeline in Python, with the patterns that survive contact with reality.

In lesson 37 we sketched the shape of ETL. Today we build one. The scenario is the most common ingestion shape in industry: files arrive in a directory, Python picks them up, validates, loads to Postgres, moves the originals out of the way. Vendors drop CSVs, an internal team writes JSON, a partner uploads Parquet — the pattern is the same. Get it right once and you reuse it for years.

We’ll work through the building blocks first, then assemble a real ~200-line script. Code throughout. Patterns I’ve watched fail in production and the fixes that stick.

The drop-zone pattern

The shape:

data/
  incoming/    <- files arrive here
  processed/   <- successfully ingested files move here
  quarantine/  <- files that errored out

A Python script polls incoming/ every N seconds, processes anything it finds, and moves it. Three directories, one script. Simple enough that a junior can debug it at 9pm.

from pathlib import Path

DATA_ROOT = Path("/var/data")
INCOMING = DATA_ROOT / "incoming"
PROCESSED = DATA_ROOT / "processed"
QUARANTINE = DATA_ROOT / "quarantine"

for d in (INCOMING, PROCESSED, QUARANTINE):
    d.mkdir(parents=True, exist_ok=True)

Why move files instead of just tracking which ones we’ve seen? Two reasons. First, the directory is its own status display: ops can see at a glance what’s stuck. Second, it’s recovery-friendly — drop a file from quarantine back into incoming and you’ve reprocessed it.

”Is this file finished writing?”

The first thing that bites everyone. Files don’t appear atomically. A vendor’s upload tool writes them byte by byte. Your script wakes up, sees orders.csv, opens it, and reads the first 17 megabytes of a 40-megabyte file. Half-row at the end. Validation fails. You move the file to quarantine. Two seconds later the upload finishes. You’ve quarantined a perfectly good file.

Three patterns to handle this, in order of preference.

Pattern 1: producer writes a .done marker. Vendor uploads orders.csv, then immediately uploads an empty orders.csv.done. Your script only processes a file when its .done companion exists.

def is_ready(path: Path) -> bool:
    return path.with_suffix(path.suffix + ".done").exists()

If you control the producer this is the cleanest answer. Cheap, reliable, no race.

Pattern 2: stable mtime. If you can’t change the producer, check the file’s modification time. If it hasn’t changed in N seconds, assume it’s done.

import time

def is_stable(path: Path, quiet_seconds: float = 5.0) -> bool:
    age = time.time() - path.stat().st_mtime
    return age >= quiet_seconds

Less clean — you have to pick a quiet_seconds value that’s longer than any plausible upload pause but short enough to keep latency reasonable. Five seconds is a defensible default for human-scale data.

Pattern 3: atomic rename. Producer writes to orders.csv.tmp, renames to orders.csv when done. Same idea as the marker file, but uses POSIX rename atomicity. Works only if the writer is on the same filesystem.

Pick one and stick with it. The bug where you process a half-written file is the kind that surfaces at 4am on a holiday.

Schema validation: fail fast

The second thing that bites everyone. The vendor sends a file. The schema looks fine. The data is garbage — dates as "01/02/2026" when you expected ISO, currency strings with "$" prefixes, nulls encoded as the string "NULL". If you let it through, the bad rows land in your database and pollute downstream tables for weeks.

The fix is to validate before you load, and to fail loudly when validation fails. Two libraries earn their keep here.

pydantic for row-level validation when you can describe the row as a model:

from datetime import date
from decimal import Decimal
from pydantic import BaseModel, Field, EmailStr

class OrderRow(BaseModel):
    order_id: int
    customer_email: EmailStr
    amount: Decimal = Field(gt=0)
    order_date: date
    status: str = Field(pattern=r"^(pending|shipped|delivered|cancelled)$")
import csv
from pydantic import ValidationError

def parse_orders(path: Path) -> tuple[list[OrderRow], list[dict]]:
    valid, errors = [], []
    with path.open() as f:
        for line_no, raw in enumerate(csv.DictReader(f), start=2):
            try:
                valid.append(OrderRow.model_validate(raw))
            except ValidationError as e:
                errors.append({"line": line_no, "raw": raw, "errors": e.errors()})
    return valid, errors

Two outputs: clean rows you can load, and a list of bad rows you can write to a quarantine log. Don’t drop bad rows silently. Don’t blow up the whole file for one bad row, either — the middle path is a per-file threshold (“if more than 1% of rows are bad, quarantine the whole file”), and per-row error logs you can show the vendor.

pyarrow for type-strict columnar files (Parquet, Arrow IPC). When the file is already typed, pyarrow’s schema validation is faster than parsing each row through pydantic.

The checkpoint table

If your script crashes halfway through processing a directory of 50 files, what happens on restart? Naive answer: it sees the same 50 files and re-processes them. If your load isn’t idempotent, you get duplicates.

The fix is a small checkpoint table in the same database you’re loading to. One row per file, recording when it was ingested and the result.

CREATE TABLE IF NOT EXISTS ingestion_checkpoints (
    filename TEXT PRIMARY KEY,
    file_size BIGINT NOT NULL,
    file_sha256 TEXT NOT NULL,
    rows_loaded INTEGER NOT NULL,
    rows_quarantined INTEGER NOT NULL,
    ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    status TEXT NOT NULL CHECK (status IN ('success', 'failed'))
);

Before processing, check if the file (by name + sha256) is already in the table. If yes, skip. If no, process and record. The sha256 catches the case where the vendor reused a filename for a different file — same name, different content, different ingestion.

import hashlib

def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()

Loading to Postgres: ON CONFLICT and COPY

For a few thousand rows, parameterized inserts are fine:

import psycopg
from psycopg.rows import dict_row

INSERT_SQL = """
INSERT INTO orders (order_id, customer_email, amount, order_date, status)
VALUES (%(order_id)s, %(customer_email)s, %(amount)s, %(order_date)s, %(status)s)
ON CONFLICT (order_id) DO UPDATE SET
    customer_email = EXCLUDED.customer_email,
    amount         = EXCLUDED.amount,
    order_date     = EXCLUDED.order_date,
    status         = EXCLUDED.status;
"""

def load_orders(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    payload = [r.model_dump() for r in rows]
    with conn.cursor() as cur:
        cur.executemany(INSERT_SQL, payload)
    return len(payload)

The ON CONFLICT ... DO UPDATE clause is the idempotency switch. Run the same file twice, get the same final state. This is what we drilled in lesson 37.

For tens of thousands of rows or more, switch to COPY. It’s an order of magnitude faster than executemany:

def load_orders_copy(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    with conn.cursor() as cur, cur.copy(
        "COPY orders_staging (order_id, customer_email, amount, order_date, status) FROM STDIN"
    ) as copy:
        for r in rows:
            copy.write_row((r.order_id, r.customer_email, r.amount, r.order_date, r.status))
    cur.execute("""
        INSERT INTO orders SELECT * FROM orders_staging
        ON CONFLICT (order_id) DO UPDATE SET
          customer_email = EXCLUDED.customer_email,
          amount         = EXCLUDED.amount,
          order_date     = EXCLUDED.order_date,
          status         = EXCLUDED.status;
        TRUNCATE orders_staging;
    """)
    return len(rows)

The pattern: COPY into a staging table (fast), then INSERT ... SELECT ... ON CONFLICT from staging into the real table (idempotent). Best of both worlds.

Logging at every step

Pull lesson 11 forward: structured logging at every step, with the filename in every line. When something goes wrong at 3am you want to be able to grep the logs by filename and see the whole story.

import logging
logger = logging.getLogger("ingest")

logger.info("processing", extra={"file": path.name, "size": path.stat().st_size})
logger.info("validated", extra={"file": path.name, "valid": len(valid), "errors": len(errors)})
logger.info("loaded", extra={"file": path.name, "rows": loaded})

If you’re using the structlog or logging.config setup from lesson 11, those extra fields land as JSON keys. Future you will be grateful.

The full script

Two hundred lines, runnable, opinionated. Drop it in ingest.py, point it at your data/ root, run it under cron or as a systemd service.

"""ingest.py — drop-zone CSV ingestion to Postgres."""
from __future__ import annotations
import csv
import hashlib
import logging
import shutil
import time
from datetime import date
from decimal import Decimal
from pathlib import Path

import psycopg
from pydantic import BaseModel, EmailStr, Field, ValidationError

DATA_ROOT = Path("/var/data")
INCOMING = DATA_ROOT / "incoming"
PROCESSED = DATA_ROOT / "processed"
QUARANTINE = DATA_ROOT / "quarantine"
DSN = "postgresql://etl:secret@localhost:5432/warehouse"
QUIET_SECONDS = 5.0
MAX_BAD_ROW_FRACTION = 0.01

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("ingest")


class OrderRow(BaseModel):
    order_id: int
    customer_email: EmailStr
    amount: Decimal = Field(gt=0)
    order_date: date
    status: str = Field(pattern=r"^(pending|shipped|delivered|cancelled)$")


def sha256_of(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()


def is_stable(path: Path) -> bool:
    return time.time() - path.stat().st_mtime >= QUIET_SECONDS


def already_ingested(conn: psycopg.Connection, name: str, digest: str) -> bool:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT 1 FROM ingestion_checkpoints WHERE filename=%s AND file_sha256=%s",
            (name, digest),
        )
        return cur.fetchone() is not None


def parse(path: Path) -> tuple[list[OrderRow], list[dict]]:
    valid: list[OrderRow] = []
    errors: list[dict] = []
    with path.open(newline="") as f:
        for line_no, raw in enumerate(csv.DictReader(f), start=2):
            try:
                valid.append(OrderRow.model_validate(raw))
            except ValidationError as e:
                errors.append({"line": line_no, "raw": raw, "errors": e.errors()})
    return valid, errors


INSERT_SQL = """
INSERT INTO orders (order_id, customer_email, amount, order_date, status)
VALUES (%(order_id)s, %(customer_email)s, %(amount)s, %(order_date)s, %(status)s)
ON CONFLICT (order_id) DO UPDATE SET
    customer_email = EXCLUDED.customer_email,
    amount         = EXCLUDED.amount,
    order_date     = EXCLUDED.order_date,
    status         = EXCLUDED.status;
"""


def load(conn: psycopg.Connection, rows: list[OrderRow]) -> int:
    if not rows:
        return 0
    with conn.cursor() as cur:
        cur.executemany(INSERT_SQL, [r.model_dump() for r in rows])
    return len(rows)


def checkpoint(
    conn: psycopg.Connection,
    path: Path,
    digest: str,
    loaded: int,
    quarantined: int,
    status: str,
) -> None:
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO ingestion_checkpoints
              (filename, file_size, file_sha256, rows_loaded, rows_quarantined, status)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (filename) DO UPDATE SET
              file_size = EXCLUDED.file_size,
              file_sha256 = EXCLUDED.file_sha256,
              rows_loaded = EXCLUDED.rows_loaded,
              rows_quarantined = EXCLUDED.rows_quarantined,
              ingested_at = NOW(),
              status = EXCLUDED.status;
            """,
            (path.name, path.stat().st_size, digest, loaded, quarantined, status),
        )


def process_one(conn: psycopg.Connection, path: Path) -> None:
    log.info("processing %s", path.name)
    digest = sha256_of(path)
    if already_ingested(conn, path.name, digest):
        log.info("already ingested, skipping: %s", path.name)
        path.rename(PROCESSED / path.name)
        return

    valid, errors = parse(path)
    total = len(valid) + len(errors)
    bad_fraction = len(errors) / total if total else 0.0

    if bad_fraction > MAX_BAD_ROW_FRACTION:
        log.error("too many bad rows in %s (%.2f%%), quarantining", path.name, bad_fraction * 100)
        shutil.move(path, QUARANTINE / path.name)
        checkpoint(conn, path, digest, 0, len(errors), "failed")
        conn.commit()
        return

    try:
        with conn.transaction():
            loaded = load(conn, valid)
            checkpoint(conn, path, digest, loaded, len(errors), "success")
        log.info("loaded %d rows from %s (%d errors)", loaded, path.name, len(errors))
        path.rename(PROCESSED / path.name)
    except Exception:
        log.exception("load failed for %s", path.name)
        shutil.move(path, QUARANTINE / path.name)
        with conn.transaction():
            checkpoint(conn, path, digest, 0, len(errors), "failed")


def run_once() -> None:
    with psycopg.connect(DSN, autocommit=False) as conn:
        for path in sorted(INCOMING.glob("*.csv")):
            if not is_stable(path):
                log.debug("not stable yet: %s", path.name)
                continue
            try:
                process_one(conn, path)
            except Exception:
                log.exception("unexpected failure on %s", path.name)


def main() -> None:
    for d in (INCOMING, PROCESSED, QUARANTINE):
        d.mkdir(parents=True, exist_ok=True)
    while True:
        run_once()
        time.sleep(10)


if __name__ == "__main__":
    main()

That’s it. It handles the half-written-file race, validates schema before loading, fails fast on too-many-bad-rows files, idempotent loads via ON CONFLICT, deduplicates by filename + sha256, logs structured information at every step, and recovers cleanly on restart. Two hundred lines is enough for a real pipeline.

Should this run as a service?

The script above is an infinite loop. That’s fine for v1 — run it under nohup or in a tmux session and walk away. For v2, wrap it in a systemd unit so it restarts on crash and starts on boot:

# /etc/systemd/system/ingest.service
[Unit]
Description=Drop-zone ingestion
After=postgresql.service network.target

[Service]
ExecStart=/opt/venv/bin/python /opt/ingest/ingest.py
Restart=on-failure
RestartSec=5
User=etl

[Install]
WantedBy=multi-user.target

systemctl enable --now ingest and you’re done.

For v3 — when this job has dependencies on other jobs, or needs SLA monitoring, or you want a UI to see the run history — graduate to Airflow / Prefect / Dagster. That’s lesson 41. Don’t reach for an orchestrator before you need one; a systemd loop and a checkpoint table will carry you a long way.

What you’ve built

A real ingestion pipeline. Files land, get validated, get loaded idempotently, get tracked. Failures move to quarantine instead of corrupting the database. Restarts are safe. The same shape works for JSON, Parquet, fixed-width files, vendor APIs that drop to S3 — swap the parse function and you’re off.

In lesson 39 we tackle the other half: pulling data from APIs, with retries, rate limits, and pagination.

Citations

Search