PySpark, dalle fondamenta Lezione 54 / 60

Output mode e sink idempotenti: foreachBatch e il pattern di upsert

Append vs update vs complete, i sink che Spark fornisce, e l'escape hatch foreachBatch per tutto il resto.

Quindi la tua query streaming stateful sta calcolando cose. Dove vanno quelle cose? E come ti assicuri che ci arrivino esattamente una volta, anche quando Spark riprova un batch dopo un crash?

Questa lezione è sul lato output: i tre output mode, i sink che Spark fornisce, e il pattern foreachBatch che è la risposta a “ma io voglio scrivere su X” per qualunque X non sia nella lista integrata.

I tre output mode, riassunti

Ogni query streaming ha un outputMode: cosa viene scritto a ogni batch.

append — solo le nuove righe dall’ultimo batch. Il mode più comune. Funziona per:

  • Query non aggreganti (filtri, proiezioni, stream-stream join). Ogni riga di input mappa a zero o più righe di output che vengono emesse una volta.
  • Aggregazioni limitate da un watermark. La riga della finestra viene emessa quando il watermark la chiude, e mai più.

Cosa non può fare: emettere aggregazioni che non sono ancora state finalizzate. Se il tuo business vuole vedere “aggiornamenti del totale corrente man mano che accadono”, append non te lo darà: il running total appare solo quando la finestra si chiude, magari ore dopo che gli eventi sono arrivati.

update — righe che sono cambiate dall’ultimo batch. Funziona per le aggregazioni e ti dà visibilità in tempo reale: ogni batch emette le finestre che hanno ricevuto nuovi dati, con i loro totali correnti. Lo svantaggio: la stessa chiave può essere emessa molte volte con valori diversi, quindi il sink deve essere upsert-capable. Scrivere su un sink append-only (Parquet grezzo, Kafka senza una strategia di key) perde la semantica: vedresti cinque righe per la stessa finestra con cinque count diversi.

complete — l’intera tabella di risultato dopo ogni batch. Valido solo per aggregazioni. Pratico solo per risultati piccoli. Spark ri-emette letteralmente l’intero set di group key a ogni batch. Se hai 10 chiavi distinte e una dashboard che sovrascrive un singolo file, complete mode va bene. Se hai 10 milioni di chiavi, ti aspettano brutti momenti.

Il mode che scegli è vincolato da cosa fa la tua query e da cosa può fare il tuo sink. Non sempre hai libera scelta.

I sink integrati

Out of the box, Spark fornisce:

File sinkformat("parquet"), format("orc"), format("json"), format("csv"). Scrive file append-only in una directory, partizionati se specifichi partitionBy. Solo append mode. Buono per landing zone, lake, archivi grezzi.

(query.writeStream
    .format("parquet")
    .option("path", "s3://lake/views/")
    .option("checkpointLocation", "s3://ck/views/")
    .partitionBy("dt")
    .outputMode("append")
    .start())

Kafka sinkformat("kafka"). Scrive ogni riga di output come record Kafka. Si aspetta colonne key, value, topic, e opzionali headers. Consegna at-least-once; implementi exactly-once via consumer idempotenti a valle.

(query.selectExpr("CAST(user_id AS STRING) AS key",
                  "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "page-aggs")
    .option("checkpointLocation", "...")
    .start())

Console sinkformat("console"). Stampa sullo stdout del driver. Append, update, o complete. Strettamente per lo sviluppo: il driver andrà in OOM se provi a stampare a console un workload reale.

.format("console").outputMode("update").option("truncate", False)

Memory sinkformat("memory"). Scrive in una tabella in-driver con nome che puoi interrogare con il normale Spark SQL. Utile per test e notebook.

.format("memory").queryName("session_results").outputMode("append")
# in un'altra cella:
spark.sql("SELECT * FROM session_results").show()

foreachBatch — l’escape hatch. Se la merita una sezione tutta sua.

Quella è la lista ufficiale. Nota cosa non c’è: Postgres, MySQL, Snowflake, BigQuery, Mongo, Elasticsearch, Cassandra, Redis. Quasi ogni destinazione reale al di là di un file o di Kafka è un job foreachBatch.

Il pattern foreachBatch

foreachBatch ti consegna il micro-batch come un normale DataFrame e un batch ID, e ti lascia farne quello che vuoi:

def write_batch(batch_df, batch_id):
    # batch_df è un normale DataFrame
    # batch_id è un long monotonicamente crescente
    batch_df.write.format("jdbc").options(...).mode("append").save()

(query.writeStream
    .foreachBatch(write_batch)
    .option("checkpointLocation", "...")
    .start())

Dentro la funzione, hai l’API completa del batch. Scritture multi-target, trasformazioni complesse, chiamate a sistemi esterni, logica condizionale: tutto. Il motore streaming si occupa di triggering, ordinamento e replay dei batch.

Due cose da interiorizzare:

  1. batch_df è un normale DataFrame. Non è uno stream. Puoi chiamarci sopra .write, o .collect() se è piccolo, o .cache() se ti serve due volte. Ha una dimensione definita; questo batch è finito quando la tua funzione ritorna.
  2. I batch possono essere rigiocati. Se la tua funzione fallisce o il driver crasha a metà batch, Spark chiamerà di nuovo write_batch con lo stesso batch_id e lo stesso contenuto. Il tuo codice deve gestire la cosa.

Quel secondo punto è dove entra in gioco l’idempotenza.

La regola del sink idempotente

Il contratto è: lo stesso batch_id più lo stesso batch_df dovrebbe sempre produrre lo stesso stato finale nel sink, non importa quante volte gira la tua funzione.

Se il tuo sink supporta upsert (insert o update per chiave primaria), l’idempotenza è quasi gratis: rieseguire con gli stessi dati si sovrascrive da solo. Se il tuo sink supporta solo append, devi deduplicare per batch_id, di solito con una chiave primaria lato target tipo (batch_id, source_offset).

Seguono due esempi concreti.

Pattern 1: foreachBatch in un Delta merge (upsert)

Delta Lake supporta MERGE: upsert atomico per predicato. Questo è il gold standard per output di aggregazione exactly-once:

from delta.tables import DeltaTable

def upsert_to_delta(batch_df, batch_id):
    target = DeltaTable.forPath(spark, "s3://lake/page_aggs/")
    (target.alias("t")
        .merge(
            batch_df.alias("s"),
            "t.window_start = s.window_start AND t.page = s.page",
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute())

(windowed.writeStream
    .foreachBatch(upsert_to_delta)
    .outputMode("update")
    .option("checkpointLocation", "s3://ck/page_aggs/")
    .start())

Se il batch 17 fallisce a metà strada, Spark lo rigioca. La MERGE gira di nuovo con lo stesso batch_df. Le righe che erano già state mergeate vengono matchate e aggiornate agli stessi valori; le righe nuove vengono inserite. Lo stato finale è identico in entrambi i casi.

Accoppia questo con outputMode("update") così ogni batch porta solo le finestre che sono cambiate. Insieme, ottieni una tabella Delta che riflette sempre l’aggregato più recente per finestra, aggiornato in continuazione, con semantica exactly-once a partire da uno stream strettamente at-least-once.

Pattern 2: foreachBatch in un upsert su Postgres

Postgres non ha integrazione streaming-native, ma ha INSERT ... ON CONFLICT, che basta:

def upsert_to_postgres(batch_df, batch_id):
    rows = batch_df.collect()  # sicuro solo se il batch è piccolo
    if not rows:
        return

    sql = """
        INSERT INTO page_aggs (window_start, page, views, batch_id)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (window_start, page) DO UPDATE
        SET views = EXCLUDED.views,
            batch_id = EXCLUDED.batch_id
        WHERE page_aggs.batch_id < EXCLUDED.batch_id
    """

    import psycopg2
    conn = psycopg2.connect(...)
    try:
        with conn, conn.cursor() as cur:
            cur.executemany(sql, [
                (r.window_start, r.page, r.views, batch_id)
                for r in rows
            ])
    finally:
        conn.close()

Un paio di dettagli che contano:

  • ON CONFLICT (window_start, page) DO UPDATE è la clausola di upsert. Richiede un indice unico o una PK sulle colonne di conflict.
  • La guard WHERE page_aggs.batch_id < EXCLUDED.batch_id rende l’upsert monotono. Se un batch viene rigiocato fuori ordine o hai ritardatari da un batch più vecchio, non sovrascrivi uno stato più nuovo con dati più vecchi.
  • .collect() materializza il batch sul driver. Sicuro solo se i batch sono piccoli (qualche migliaio di righe). Per batch più grandi, usa batch_df.foreachPartition e connettiti una volta per partizione, non una volta per batch.
  • La connessione dovrebbe essere aperta dentro la funzione, non chiusa sopra dal driver. Le connessioni non sono picklable, e anche se lo fossero, ne riuseresti una sola tra batch in modi imprevedibili.

Per batch davvero grandi, un pattern più robusto stagea in una tabella temporanea per batch e fa il merge:

def upsert_to_postgres_staged(batch_df, batch_id):
    staging = f"page_aggs_stage_{batch_id}"
    (batch_df.write
        .format("jdbc")
        .option("dbtable", staging)
        .mode("overwrite")
        .save())
    # Poi esegui un singolo statement MERGE/UPSERT da staging in target,
    # transazionalmente, poi droppa la tabella di staging.

Spark scrive la staging table in parallelo; una transazione fa il merge atomicamente; cleanup alla fine. Idempotente perché scrivere la staging table con mode("overwrite") è esso stesso idempotente sul batch_id, e anche il merge lo è.

Semantica di consegna, riassunto

Mettendo insieme il modello streaming end-to-end:

  • At-least-once è automatico. Il checkpointing di Spark garantisce che ogni evento di input venga processato, e ogni batch di output venga consegnato, almeno una volta. Crash, restart e replay preservano questa proprietà.
  • Exactly-once richiede cooperazione del sink. Il sink deve essere idempotente sotto il replay dello stesso batch_id. I sink append-only hanno bisogno di una chiave di dedup; i sink upsert-capable hanno bisogno di semantica PK.
  • Il checkpoint è la fonte di verità. Registra quali offset Kafka sono stati consumati, che stato è stato tenuto, e quali batch sono stati committati. Perdilo e perdi la tua garanzia.

Un errore comune è mettere in produzione una pipeline streaming che scrive Parquet semplice e chiamarla “exactly-once” perché niente è crashato durante il testing. Poi succede un outage reale, il job riparte, l’ultimo batch si rigioca, e ti ritrovi righe duplicate nel lake. L’idempotenza non è gratis; devi costruirla.

Scegliere mode + sink per un caso d’uso

Qualche combinazione comune e cosa implica:

  • Filter/transform streaming → file sink. outputMode("append"). I file sono append-only; nessun problema di idempotenza perché i file di ogni batch vengono committati atomicamente con l’offset.
  • Aggregazione windowed → console per dev, Delta merge per prod. outputMode("update") con foreachBatch. Totali correnti visibili immediatamente; corretti sotto replay.
  • Sessionizzazione (flatMapGroupsWithState) → Kafka. outputMode("append"). Le sessioni emettono quando si chiudono; i consumer a valle gestiscono l’idempotenza con la chiave session_id.
  • Dashboard real-time appoggiata a Postgres. outputMode("update") + upsert con foreachBatch. La dashboard legge lo snapshot corrente; l’upsert lo tiene consistente.

Se ti ritrovi a desiderare il complete mode in produzione, fai un passo indietro. È quasi sempre sbagliato a scala. La mossa giusta è di solito update mode con un sink upsert, più un job di snapshot periodico se a valle serve davvero un quadro completo.

Dove ci lascia tutto questo

Hai ora, end-to-end, il modello streaming: source (lezione 50), esecuzione a micro-batch (lezione 51), event time e watermark (lezione 52), stato e operatori stateful (lezione 53), e ora output mode più sink idempotenti. È abbastanza per progettare e mettere in produzione una pipeline streaming.

Quello che ancora non hai è il vocabolario da war stories per quando le cose vanno male: query in back-pressure, stato che esplode, watermark in stallo, micro-batch lenti, contesa di risorse a livello di cluster. Quello è il modulo 10: debug di produzione. Si parte leggendo un tab di Streaming UI come un medico legge un ECG.


Riferimenti: Apache Spark Structured Streaming Programming Guide, sezioni su output mode, output sink, e foreachBatch (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes); documentazione MERGE di Delta Lake. Recuperato il 2026-05-01.

Cerca