PySpark, dalle fondamenta Lezione 51 / 60

Kafka source: l'ingest di produzione più comune

Come Spark legge da Kafka, la semantica degli offset, e la questione at-least-once vs exactly-once.

Se metti su un job Structured Streaming vero in produzione, c’è circa il 90% di probabilità che l’input sia Kafka. File-su-directory funziona per i tutorial e l’occasionale pipeline di rotazione log; i socket sono un giocattolo; la rate source è per benchmark. Kafka è l’event log durabile, replayable, partitionato, multi-consumer su cui tutti costruiscono davvero i sistemi streaming. Quindi è qui che vive questa lezione.

Non ti insegnerò Kafka in sé; per quello ci sono libri interi. La versione da 30 secondi: Kafka memorizza log append-only chiamati topic, ognuno diviso in partition per il parallelismo. I producer scrivono messaggi, ognuno ottiene un offset numerico all’interno della sua partition. I consumer leggono messaggi tracciando quali offset hanno processato, per partition. Il log è durabile (replicato attraverso i broker) e replayable (gli offset non vengono riusati, quindi puoi rileggere la storia finché è dentro la finestra di retention). Quella è la fondazione. Spark ci si attacca come consumer.

La chiamata base

events = (spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
            .option("subscribe", "user-events")
            .option("startingOffsets", "latest")
            .load())

Qualche cosa da segnalare subito:

  • Il connector è format("kafka"). Disponibile come org.apache.spark:spark-sql-kafka-0-10_2.13:<spark-version> su Maven. Non lo ottieni con Spark default: devi aggiungerlo via --packages o --jars. Lo 0-10 nel nome dell’artifact è la versione dell’API client di Kafka, non la versione del broker Kafka; funziona contro qualsiasi broker dalla 0.10 in poi, che è qualsiasi cosa staresti realisticamente facendo girare nel 2026.
  • kafka.bootstrap.servers è una lista separata da virgole di broker da cui fare il bootstrap. Spark li usa per scoprire il cluster e negoziare le assegnazioni di partition. Non devi elencare tutti i broker: due o tre vanno bene per fault tolerance.
  • subscribe sceglie il/i topic. Puoi sottoscriverti a più di uno separando con virgole: "orders,refunds,returns". Per la sottoscrizione basata su pattern, usa subscribePattern con una regex: "events_.*". C’è anche assign per la selezione manuale di partition, ma non lo vorrai quasi mai: lascia che Spark e Kafka si coordinino.
  • startingOffsets è critico e ci arriviamo tra un minuto.

Com’è fatto il DataFrame

Chiamare events.printSchema() rivela le colonne che Spark espone da Kafka:

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array<struct<key:string,value:binary>> (nullable = true)

Le interessanti:

  • key e value sono binary. Kafka non sa né si interessa di cosa c’è nei tuoi messaggi; memorizza byte. Spark espone quei byte fedelmente. Se stampassi value cosi’ com’e’ vedresti blob in esadecimale. Devi deserializzare.
  • topic, partition, offset identificano esattamente da dove viene questa riga in Kafka. Utili per debug, lineage e sink offset-aware.
  • timestamp è il timestamp del record del broker, o quando il producer lo ha scritto o quando il broker lo ha ingerito, a seconda della config message.timestamp.type del topic. Questo non è il tuo event time. Se il body del messaggio ha il proprio campo event_ts, è quello che dovresti usare per le window (lezione 52); il timestamp Kafka è metadata lato processing.

Deserializzare il value

Il payload più comune è JSON. Converti binary in string in struct:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

payload_schema = StructType([
    StructField("user_id",   StringType(),    False),
    StructField("action",    StringType(),    False),
    StructField("amount",    DoubleType(),    True),
    StructField("event_ts",  TimestampType(), False),
])

decoded = (events
             .select(col("topic"),
                     col("partition"),
                     col("offset"),
                     col("timestamp").alias("kafka_ts"),
                     from_json(col("value").cast("string"), payload_schema).alias("data"))
             .select("topic", "partition", "offset", "kafka_ts", "data.*"))

Le due chiamate chiave: col("value").cast("string") per andare da binary a stringa UTF-8, poi from_json(..., schema) per parsare il JSON in uno struct, poi data.* per appiattire. Dimenticare il cast è la singola insidia Kafka-Spark più comune. Se fai from_json(col("value"), schema), Spark in silenzio produrrà struct tutti null perché non sa parsare binary come JSON. L’errore è muto; ti chiederai perché i tuoi filter non matchano niente.

Per Avro con un Confluent Schema Registry, un setup di produzione molto comune, il pattern è from_avro (in pyspark.sql.avro.functions) combinato con il client schema registry. Il wire format Confluent prefissa ogni messaggio con un magic byte e un ID schema da 4 byte, quindi tipicamente substring(col("value"), 6, ...) per saltare il prefisso, poi from_avro con lo schema preso dal registry. La libreria abris lo wrappa bene se non vuoi farlo a mano; la funzione from_avro nuda funziona se puoi hardcodare o fetchare lo schema separatamente.

Per Protobuf, storia simile: Spark 4 spedisce from_protobuf. Stesso pattern.

Per “ho solo JSON, la vita è facile”, from_json è quello che peschi. L’80% delle pipeline Kafka-su-Spark è esattamente questo.

Offset e startingOffsets

startingOffsets è l’opzione più importante a cui non hai mai pensato. Controlla da dove Spark inizia a leggere da Kafka al primissimo run della query. Dopo, prende il sopravvento il checkpoint.

Tre forme:

.option("startingOffsets", "latest")    # default — parte dalla fine di ogni partition
.option("startingOffsets", "earliest")  # parte dall'inizio della retention
.option("startingOffsets", """
    {"user-events": {"0": 1234, "1": 5678, "2": 9012}}
""")  # offset espliciti per partition

latest (il default) significa “salta tutto quello che è già in Kafka e processa solo i messaggi che arrivano dopo che parto”. Ragionevole per casi d’uso solo real-time dove gli eventi storici non contano. Pericoloso se volevi davvero la storia e non ti rendevi conto che il default era latest.

earliest significa “leggi tutto quello che Kafka ha ancora”. Per un topic con sette giorni di retention e tanto traffico, questo è un backfill. Spark leggerà tutto, in molti micro-batch, prima di raggiungere “adesso”. Usa questo quando vuoi legittimamente fare il bootstrap dalla storia.

Il JSON esplicito ti lascia partire da offset precisi per partition, utile quando migri da un altro sistema che si è fermato a offset noti, o quando vuoi saltare un messaggio cattivo specifico.

Esiste un endingOffsets simmetrico ma si applica solo a read batch (spark.read.format("kafka")), non streaming. Le query streaming sono unbounded per definizione.

Dopo il primo run, il checkpoint possiede gli offset. startingOffsets viene ignorato a ogni restart che ha un checkpoint valido. Questo è il comportamento corretto: tutto il senso del checkpoint è ricordare dove sei. Ma significa anche: se cambi startingOffsets dopo il deployment, non succede nulla. Per fare reset davvero, devi cancellare o spostare il checkpoint, che poi riapplica startingOffsets allo start successivo.

Questa è la seconda insidia Kafka-Spark più comune. L’ingegnere cambia da latest a earliest per fare il backfill, redeploya, non vede cambiamenti, si confonde. Il fix è sempre: cambia gli starting offset e pulisci il checkpoint e riavvia.

At-least-once è integrato. Exactly-once richiede aiuto.

Il connector Kafka di Spark ti dà semantica at-least-once out of the box, con un checkpoint:

  1. Spark legge un micro-batch da offset [a, b) per partition.
  2. Spark lo processa e scrive sul sink.
  3. Spark commita gli offset [a, b) al checkpoint.

Se il job muore tra lo step 2 e lo step 3, al restart Spark rileggerà [a, b) e ri-processerà quei messaggi. Il lato output potrebbe averne già ricevuti alcuni. Risultato: alcune righe possono apparire due volte.

Per sink idempotenti, va bene. Esempi di sink idempotenti:

  • Delta Lake / Iceberg con MERGE chiavato per event ID: rilanciarlo è un no-op per le righe già fuse.
  • JDBC con un upsert (INSERT ... ON CONFLICT DO UPDATE in Postgres, MERGE in SQL Server) chiavato su un ID stabile.
  • Kafka con il producer transazionale, configurato con scritture idempotenti e un sink exactly-once.

Per sink non idempotenti, at-least-once significa duplicati. Opzioni:

  • Rendilo idempotente al livello applicativo. Aggiungi un event ID ai tuoi record (l’offset Kafka funziona come uno), e il consumer downstream deduplica. Economico, quasi sempre vale la pena farlo.
  • Usa foreachBatch con una scrittura transazionale. Ottieni il DataFrame del micro-batch e committi offset e dati nella stessa transazione. Se la transazione committa, hai finito; se fallisce, nessuno dei due lati avanza.
  • Usa Delta o Iceberg come sink con txnVersion (Delta): registrano commit ID per query in modo che i micro-batch ritentati non scrivano due volte.

Non c’è un singolo bottone etichettato “exactly-once” in Structured Streaming. C’è at-least-once + sink idempotente = exactly-once nei fatti. Pianifica di conseguenza.

maxOffsetsPerTrigger: la valvola di sicurezza per il backfill

Ecco uno scenario che morde le persone. Imposti startingOffsets=earliest su un topic con 7 giorni di retention e 100 GB di dati. Fai partire la query. Spark prova a leggere tutto nel primo micro-batch, finisce la memoria dell’executor, il job muore, il restart riparte da zero, muore di nuovo, loop infinito.

Il fix è maxOffsetsPerTrigger. Limita il numero di nuovi offset letti su tutte le partition per micro-batch:

.option("maxOffsetsPerTrigger", 1_000_000)

Con questo, il primo micro-batch legge fino a 1M di offset, li processa, committa, e il prossimo micro-batch legge il prossimo 1M. Il backfill richiede molti micro-batch (proporzionali a dati totali / 1M), ma ognuno è limitato e gli OOM spariscono. Una volta che la query raggiunge “adesso”, ogni micro-batch legge solo i nuovi arrivi, che di solito sono ben sotto il cap.

In produzione lo imposto sempre per qualsiasi topic con potenziale di backlog non banale. Il default è illimitato, che è un footgun al primo deployment.

C’è anche minOffsetsPerTrigger (Spark 3.4+), che dice “aspetta finché non esistono almeno N nuovi messaggi prima di triggerare”, utile per topic a basso volume dove non vuoi far partire un micro-batch per ogni singolo messaggio. Abbinalo con maxTriggerDelay per limitare quanto Spark aspetta.

Un job streaming completo Kafka-to-Parquet

Mettiamo tutto insieme. Leggere eventi JSON da Kafka, parsarli, filtrarli, scrivere su Parquet, con checkpointing appropriato e bound sugli offset:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = (SparkSession.builder
         .appName("KafkaToParquet")
         .config("spark.jars.packages",
                 "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0")
         .getOrCreate())

payload_schema = StructType([
    StructField("user_id",   StringType(),    False),
    StructField("action",    StringType(),    False),
    StructField("amount",    DoubleType(),    True),
    StructField("event_ts",  TimestampType(), False),
])

raw = (spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
         .option("subscribe", "user-events")
         .option("startingOffsets", "earliest")
         .option("maxOffsetsPerTrigger", 500_000)
         .option("failOnDataLoss", "false")
         .load())

decoded = (raw
             .select(col("partition"),
                     col("offset"),
                     col("timestamp").alias("kafka_ts"),
                     from_json(col("value").cast("string"), payload_schema).alias("d"))
             .select("partition", "offset", "kafka_ts", "d.*"))

purchases = decoded.filter((col("action") == "purchase") & (col("amount") > 0))

query = (purchases.writeStream
            .format("parquet")
            .outputMode("append")
            .option("path", "s3://my-bucket/events/purchases/")
            .option("checkpointLocation", "s3://my-bucket/checkpoints/purchases/")
            .trigger(processingTime="60 seconds")
            .start())

query.awaitTermination()

Cosa vale la pena segnalare:

  • spark.jars.packages porta dentro il connector Kafka. Senza, ottieni una ClassNotFoundException alla prima read.
  • failOnDataLoss=false dice a Spark di non crashare se scopre che gli offset che si aspettava di leggere sono stati cancellati dalla retention di Kafka. Il default è true (crasha rumorosamente), che è corretto per job dove i dati mancanti sono inaccettabili. Per job di backfill best-effort, false lascia Spark saltare avanti con un warning invece che morire.
  • Il checkpoint è su S3. Va bene per deployment di produzione perché S3 è durabile. Non metterlo su disco locale in un job a cluster; i guasti dei nodi lo perderanno.
  • processingTime("60 seconds") corrisponde a una landing cadence di 1 minuto per i consumer downstream. Aggiusta a piacere.
  • L’output è Parquet partitionato per nulla in questo momento: per un vero job di produzione, aggiungi .partitionBy("event_date") o simile per tenere navigabili le directory Parquet. Tirato fuori per chiarezza.

Lancialo. Butta messaggi su user-events. Guarda i file Parquet apparire in s3://my-bucket/events/purchases/. Ammazza il job. Butta altri messaggi. Riavvia. Spark riprende dal checkpoint, legge solo i nuovi messaggi (perché i vecchi offset sono committati), e continua.

Qualche ultima cosa che direi a un collega

  • group.id lo imposta Spark, non tu. Se imposti kafka.group.id manualmente, Spark farà un warning o rifiuterà. Spark gestisce gli offset attraverso il suo checkpoint, non attraverso gli offset del consumer group di Kafka. Questo prende le persone che vengono da Kafka Streams o kafka-python; il modello mentale è diverso.
  • Una partition Spark per partition Kafka. Se il tuo topic ha 12 partition, il tuo micro-batch ha 12 task di input. Vuoi più parallelismo downstream? repartition dopo la read. Ne vuoi meno? Combina con coalesce o resta col default. Il numero di partition Kafka mette il cap sul tuo parallelismo di lettura; parla al tuo team di piattaforma sul numero di partition se è troppo basso.
  • Gli header sono esposti da Spark 3.0, e puoi anche scriverli sul sink Kafka. Buoni per tracing ID, versioni di schema, tag di tenant.
  • Attento allo skew temporale tra broker e processor. Il campo timestamp di Kafka è impostato da chissà chi (broker, producer) a seconda della configurazione. Se lo usi per le window, controlla due volte che stai usando il tempo che intendi. Meglio: usa un campo nel body del messaggio che controlli tu.

Questo copre la source. La lezione 52 è event-time e watermark, il pezzo che ti permette di scrivere groupBy(window(...)) su uno stream Kafka e ottenere risultati corretti anche quando gli eventi arrivano fuori ordine. Dopo, output mode (53), operazioni stateful (54), stream-stream join (55), e avremo un toolkit streaming completo.


Riferimenti: Apache Spark Structured Streaming + Kafka Integration Guide (https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) e la documentazione del client Kafka (https://kafka.apache.org/documentation/). Consultati il 2026-05-01.

Cerca