Python, from the ground up Lesson 42 / 60

Data engineering project: build a real pipeline

From source to destination, with monitoring, idempotency, and a schedule. The lessons of Module 7 made tactile.

This is the closing lesson of Module 7, and like the analysis project at the end of Module 6, it’s hands-on. We build a complete data pipeline: pull from a public API daily, transform with pandas, load to Postgres, schedule it with cron, log each step, alert on failure, and design it so you can re-run any past day safely. By the end you’ll have a directory you could realistically commit to a repo.

The pipeline pulls daily weather observations for a list of cities from the Open-Meteo API (free, no key, well-behaved), transforms the JSON into tidy rows, and loads into a Postgres table. Same pattern works for any public API; Open-Meteo just spares us the credentials dance.

The shape

Before any code, the layout. A real pipeline isn’t one script — it’s a directory with separate concerns:

weather_pipeline/
├── pyproject.toml
├── src/
│   └── weather/
│       ├── __init__.py
│       ├── extract.py     # API → raw JSON files
│       ├── transform.py   # raw JSON → Parquet
│       ├── load.py        # Parquet → Postgres
│       ├── pipeline.py    # orchestrates the three above
│       ├── config.py      # cities, paths, DSNs
│       └── notify.py      # error alerting
├── data/
│   ├── raw/               # JSON, one file per (date, city)
│   └── processed/         # Parquet, one file per date
├── logs/
└── README.md

Three layers: extract, transform, load. Each writes its output to disk before the next reads it. That separation is what makes the pipeline debuggable, restartable, and idempotent. If transform breaks, raw data is already on disk. If load breaks, processed data is already on disk. You don’t re-pull the API to fix a parsing bug.

Setup with uv

uv init weather_pipeline
cd weather_pipeline
uv add httpx tenacity pandas pyarrow psycopg[binary] python-dotenv structlog sentry-sdk
uv add --dev pytest ruff mypy

That gives us: httpx for the API, tenacity for retries, pandas and pyarrow for the transform, psycopg 3 for Postgres, python-dotenv for secrets, structlog for structured logging, sentry-sdk for alerting. Modern, sensible defaults for 2026.

Config

# src/weather/config.py
from pathlib import Path
import os
from dotenv import load_dotenv

load_dotenv()

ROOT = Path(__file__).parent.parent.parent
RAW_DIR = ROOT / "data" / "raw"
PROCESSED_DIR = ROOT / "data" / "processed"
LOG_DIR = ROOT / "logs"

for d in (RAW_DIR, PROCESSED_DIR, LOG_DIR):
    d.mkdir(parents=True, exist_ok=True)

CITIES = [
    {"name": "Milan", "lat": 45.46, "lon": 9.19},
    {"name": "Rome", "lat": 41.90, "lon": 12.50},
    {"name": "Naples", "lat": 40.85, "lon": 14.27},
    {"name": "Turin", "lat": 45.07, "lon": 7.69},
    {"name": "Florence", "lat": 43.77, "lon": 11.25},
]

PG_DSN = os.environ["WEATHER_PG_DSN"]
SENTRY_DSN = os.environ.get("SENTRY_DSN")

Notice no secrets in code. The DSN comes from .env (gitignored) in dev and from real environment variables in production.

Extract

# src/weather/extract.py
import json
from datetime import date
from pathlib import Path

import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

from weather.config import CITIES, RAW_DIR

log = structlog.get_logger()
API = "https://archive-api.open-meteo.com/v1/archive"

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, max=60),
    retry=retry_if_exception_type((httpx.HTTPError, httpx.TimeoutException)),
    reraise=True,
)
def fetch_city(client: httpx.Client, city: dict, run_date: date) -> dict:
    log.info("fetch_city.start", city=city["name"], date=str(run_date))
    response = client.get(
        API,
        params={
            "latitude": city["lat"],
            "longitude": city["lon"],
            "start_date": run_date.isoformat(),
            "end_date": run_date.isoformat(),
            "hourly": "temperature_2m,precipitation,wind_speed_10m",
        },
        timeout=30,
    )
    response.raise_for_status()
    return response.json()

def extract(run_date: date) -> Path:
    out_dir = RAW_DIR / run_date.isoformat()
    out_dir.mkdir(parents=True, exist_ok=True)
    with httpx.Client(http2=True) as client:
        for city in CITIES:
            out_file = out_dir / f"{city['name'].lower()}.json"
            if out_file.exists():
                log.info("extract.skip_existing", city=city["name"])
                continue
            data = fetch_city(client, city, run_date)
            data["_meta"] = {"city": city["name"], "run_date": run_date.isoformat()}
            out_file.write_text(json.dumps(data))
            log.info("fetch_city.done", city=city["name"], bytes=len(out_file.read_bytes()))
    return out_dir

Three things to notice. Tenacity wraps the HTTP call: 5 attempts, exponential backoff capped at 60 seconds, only retry on network errors (not on 4xx, which won’t get better). Idempotency at the file level: if the JSON for a city already exists, skip it. Re-running the extract for a date is safe and free. Structured logging via structlog — every event is a JSON line with named fields, easy to grep, parse, ship to a log aggregator.

Transform

# src/weather/transform.py
import json
from datetime import date
from pathlib import Path

import pandas as pd
import structlog

from weather.config import RAW_DIR, PROCESSED_DIR

log = structlog.get_logger()

def transform(run_date: date) -> Path:
    in_dir = RAW_DIR / run_date.isoformat()
    rows = []
    for path in sorted(in_dir.glob("*.json")):
        payload = json.loads(path.read_text())
        city = payload["_meta"]["city"]
        hourly = payload["hourly"]
        df = pd.DataFrame(hourly)
        df["timestamp"] = pd.to_datetime(df["time"])
        df["city"] = city
        df["run_date"] = pd.Timestamp(run_date)
        df = df.drop(columns=["time"])
        rows.append(df)

    if not rows:
        raise RuntimeError(f"no raw files for {run_date}")

    combined = pd.concat(rows, ignore_index=True)

    # Validation — fail loudly on impossible values.
    if combined["temperature_2m"].dropna().between(-60, 60).all() is False:
        bad = combined[~combined["temperature_2m"].between(-60, 60, inclusive="both")]
        raise ValueError(f"impossible temperatures: {bad}")
    if combined.duplicated(subset=["city", "timestamp"]).any():
        raise ValueError("duplicate (city, timestamp) rows after combine")

    out_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
    combined.to_parquet(out_file, index=False)
    log.info("transform.done", rows=len(combined), out=str(out_file))
    return out_file

The transform reads every raw JSON for the date, denormalizes the nested hourly dict into rows, adds the city column (the API doesn’t echo it back), and validates. Validation belongs here, not in load. Catching impossible values before the database is cleaner — Postgres will accept whatever you give it; you want the human-readable error before that.

The Parquet output is the contract with the load step. Parquet over CSV because it preserves dtypes, compresses well, and reads fast.

Load

# src/weather/load.py
from datetime import date
from pathlib import Path

import pandas as pd
import psycopg
import structlog

from weather.config import PG_DSN, PROCESSED_DIR

log = structlog.get_logger()

DDL = """
CREATE TABLE IF NOT EXISTS weather_hourly (
    city TEXT NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    temperature_2m DOUBLE PRECISION,
    precipitation DOUBLE PRECISION,
    wind_speed_10m DOUBLE PRECISION,
    run_date DATE NOT NULL,
    PRIMARY KEY (city, timestamp)
);
"""

UPSERT = """
INSERT INTO weather_hourly (city, timestamp, temperature_2m, precipitation, wind_speed_10m, run_date)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (city, timestamp) DO UPDATE SET
    temperature_2m = EXCLUDED.temperature_2m,
    precipitation = EXCLUDED.precipitation,
    wind_speed_10m = EXCLUDED.wind_speed_10m,
    run_date = EXCLUDED.run_date;
"""

def load(run_date: date) -> int:
    in_file = PROCESSED_DIR / f"{run_date.isoformat()}.parquet"
    df = pd.read_parquet(in_file)
    rows = list(df[["city", "timestamp", "temperature_2m", "precipitation", "wind_speed_10m", "run_date"]].itertuples(index=False, name=None))

    with psycopg.connect(PG_DSN) as conn:
        with conn.cursor() as cur:
            cur.execute(DDL)
            cur.executemany(UPSERT, rows)
        conn.commit()

    log.info("load.done", rows=len(rows), date=str(run_date))
    return len(rows)

The load step uses INSERT ... ON CONFLICT ... DO UPDATE — Postgres’s upsert. The primary key is (city, timestamp). Re-running the load for the same date overwrites the existing rows with the new ones; if a value changed (because the source corrected it), the database catches up. This is what makes the pipeline idempotent end to end: re-running for a past date doesn’t duplicate, doesn’t conflict, doesn’t require manual cleanup.

Notification

# src/weather/notify.py
import os
import sentry_sdk

def init():
    dsn = os.environ.get("SENTRY_DSN")
    if dsn:
        sentry_sdk.init(dsn=dsn, traces_sample_rate=0.0)

In production you initialize Sentry at startup and any uncaught exception gets reported, with traceback, locals, and context. If you don’t want the Sentry dependency, a 20-line SMTP fallback is fine — just make sure failures reach a human.

The orchestrator

# src/weather/pipeline.py
import sys
from datetime import date, timedelta

import structlog

from weather import notify
from weather.extract import extract
from weather.transform import transform
from weather.load import load

structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)
log = structlog.get_logger()

def run(run_date: date) -> None:
    notify.init()
    log.info("pipeline.start", date=str(run_date))
    try:
        extract(run_date)
        transform(run_date)
        n = load(run_date)
        log.info("pipeline.done", date=str(run_date), rows=n)
    except Exception:
        log.exception("pipeline.failed", date=str(run_date))
        raise

if __name__ == "__main__":
    if len(sys.argv) > 1:
        target = date.fromisoformat(sys.argv[1])
    else:
        target = date.today() - timedelta(days=1)
    run(target)

Three observations. The default is yesterday, not today, because today’s full-day data isn’t ready yet — a common mistake in scheduled pipelines. A CLI date argument lets you backfill any past day with python -m weather.pipeline 2026-03-15. Failures re-raise after logging, so cron sees the non-zero exit and Sentry sees the exception.

Schedule it

For v1, cron:

# crontab -e
0 2 * * * cd /opt/weather_pipeline && /opt/weather_pipeline/.venv/bin/python -m weather.pipeline >> logs/cron.log 2>&1

2am daily, with logs appended. That’s it. For the size of this pipeline, cron is honestly enough. The reasons to graduate to Prefect or Airflow (lesson 41) are: more pipelines, dependencies between them, a UI for non-technical stakeholders, smarter retry logic, or a need to backfill date ranges through clicks rather than shell loops.

When you do graduate, the work is mechanical. The Prefect version of the same pipeline is roughly:

from prefect import flow, task
from datetime import date

@task(retries=3)
def t_extract(run_date): return extract(run_date)
@task
def t_transform(run_date): return transform(run_date)
@task
def t_load(run_date): return load(run_date)

@flow
def weather_flow(run_date: date | None = None):
    run_date = run_date or (date.today() - timedelta(days=1))
    t_extract(run_date)
    t_transform(run_date)
    t_load(run_date)

if __name__ == "__main__":
    weather_flow.serve(name="weather-daily", cron="0 2 * * *")

Same logic, wrapped in flow / task decorators, scheduled by Prefect Cloud, observable in a real UI. The work you did to make extract / transform / load separable is what made the migration cheap.

Backfills

The decision pays off the day someone says “we lost a week of data, can you re-run?” With this design:

for d in 2026-03-01 2026-03-02 2026-03-03 2026-03-04 2026-03-05; do
    python -m weather.pipeline $d
done

Done. Each day is independent, idempotent, safely re-runnable. No special “backfill mode,” no manual deletes, no surprise duplicates. Designing for backfill from day one is the single highest-leverage habit in data engineering.

Where AI assistants help, and where they don’t

This is a good moment for an honest note. Code assistants — Claude, Copilot, Cursor — earn their keep on the glue parts of a pipeline. Retry decorators with backoff, structured-logging boilerplate, “write me the SQL upsert for this dataframe schema,” “add a CLI argument for the date” — boring, structured, error-prone code where consistency matters and creativity doesn’t. They write that quickly and well.

Where they help least is the architecture: where to put the boundary between extract and transform, what to validate at which stage, how to model your asset graph, what your idempotency contract is. Those decisions depend on context the model doesn’t have — your data sources’ quirks, your team’s conventions, your downstream consumers’ tolerances. Make them yourself; let the assistant fill in the scaffolding around them.

What’s missing

This is the smallest realistic shape, not production-ready. Production adds:

  • Tests: unit tests for transform, integration tests against a Postgres test container, contract tests against a recorded API response.
  • Infrastructure: containerize, deploy to a small VM or a managed runner, manage secrets through your platform.
  • Alerting: on top of Sentry, a “pipeline didn’t run” alert (a missing-heartbeat check, often via a service like Healthchecks.io).
  • CI: lint, type-check, and run tests on every PR.
  • Observability: log shipping to ELK / Loki / Datadog so you can search across runs, and metrics on row counts so you can catch silent under-fetching.
  • Schema management: migrations via Alembic when the table evolves, instead of a CREATE IF NOT EXISTS.

But the version we built here covers the core: data flowing reliably from source to destination, monitored, idempotent, schedulable, debuggable. That’s the bar, and it’s a high enough bar that most pipelines in the wild don’t clear it.

Module 7 in review, and what’s next

We started Module 7 with an introduction to data engineering as a discipline. We covered ETL versus ELT, modern formats (Parquet, Iceberg, Delta), object stores, dbt, asyncio, orchestrators, and now pipelines. You’ve seen the toolkit a junior data engineer is expected to navigate in 2026.

Module 8 — numerical Python — turns the camera the other way. Where Modules 6 and 7 were about wrangling and moving data, Module 8 is about computing on it. NumPy at depth, vectorization, broadcasting, the SciPy ecosystem, a tour of where Python touches numerical work that pandas doesn’t reach. Less plumbing, more math. See you there.


Citations: Open-Meteo archive API, tenacity docs, psycopg 3 docs, structlog docs, Prefect docs. Retrieved 2026-05-01.

Search