Dacă pui în picioare un job Structured Streaming real în producție, e cam 90% șansă ca input-ul să fie Kafka. Files-on-a-directory funcționează pentru tutoriale și pentru ocazionalul pipeline de log-rotation; socket-urile sunt o jucărie; sursa rate e pentru benchmarking. Kafka e log-ul de evenimente durabil, replayable, partiționat, multi-consumer pe care toată lumea construiește efectiv sisteme streaming. Deci aici stă lecția asta.
Nu am să te învăț Kafka în sine, există cărți întregi pentru asta. Versiunea de 30 de secunde: Kafka stochează log-uri append-only numite topicuri, fiecare împărțit în partiții pentru paralelism. Producătorii scriu mesaje, fiecare primind un offset numeric în partiția lui. Consumatorii citesc mesaje urmărind ce offset-uri au procesat, per partiție. Log-ul este durabil (replicat pe brokeri) și replayable (offset-urile nu sunt reutilizate, deci poți reciti istoricul atâta timp cât e în fereastra de retenție). Asta e fundația. Spark se conectează ca un consumator.
Apelul de bază
events = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.load())
Câteva lucruri de marcat imediat:
- Conectorul este
format("kafka"). Disponibil caorg.apache.spark:spark-sql-kafka-0-10_2.13:<spark-version>pe Maven. Nu vine cu Spark implicit, trebuie să-l adaugi prin--packagessau--jars.0-10din numele artefactului este versiunea API-ului clientului Kafka, nu versiunea broker-ului Kafka; funcționează cu orice broker de la 0.10 încolo, ceea ce înseamnă orice ai rula realist în 2026. kafka.bootstrap.serverseste o listă separată prin virgulă de brokeri din care să bootstrap-uiești. Spark folosește aceștia pentru a descoperi clusterul și pentru a negocia atribuirile de partiții. Nu trebuie să listezi toți brokerii, doi sau trei sunt OK pentru toleranță la erori.subscribealege topicul/topicurile. Te poți abona la mai multe separându-le prin virgulă:"orders,refunds,returns". Pentru abonare bazată pe pattern, foloseștesubscribePatterncu un regex:"events_.*". Există șiassignpentru selecția manuală de partiții, dar aproape niciodată nu o vrei, lasă Spark și Kafka să coordoneze.startingOffsetseste critic și ajungem la el într-un minut.
Cum arată DataFrame-ul
Apelarea events.printSchema() dezvăluie coloanele pe care Spark le expune din Kafka:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
|-- headers: array<struct<key:string,value:binary>> (nullable = true)
Cele interesante:
keyșivaluesuntbinary. Kafka nu știe și nu-i pasă ce e în mesajele tale; stochează octeți. Spark face acei octeți disponibili fidel. Dacă ai afișavalueca atare, ai vedea blocuri hex. Trebuie să deserializezi.topic,partition,offsetidentifică exact de unde a venit acest rând în Kafka. Util pentru debugging, lineage și sink-uri conștiente de offset.timestampeste timestamp-ul de înregistrare al broker-ului, fie când producătorul l-a scris, fie când broker-ul l-a ingerat, în funcție de configurareamessage.timestamp.typea topicului. Acesta nu este event time-ul tău. Dacă mesajul tău are propriul câmpevent_ts, acela ar trebui să-l folosești pentru windowing (lecția 52); timestamp-ul Kafka este metadată din partea de procesare.
Deserializarea valorii
Cel mai comun payload este JSON. Convertește binar → string → struct:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
payload_schema = StructType([
StructField("user_id", StringType(), False),
StructField("action", StringType(), False),
StructField("amount", DoubleType(), True),
StructField("event_ts", TimestampType(), False),
])
decoded = (events
.select(col("topic"),
col("partition"),
col("offset"),
col("timestamp").alias("kafka_ts"),
from_json(col("value").cast("string"), payload_schema).alias("data"))
.select("topic", "partition", "offset", "kafka_ts", "data.*"))
Cele două apeluri cheie: col("value").cast("string") pentru a trece de la binar la string UTF-8, apoi from_json(..., schema) pentru a parsa JSON-ul într-un struct, apoi data.* pentru a aplatiza. Uitarea cast-ului e cea mai comună capcană Kafka-Spark. Dacă faci from_json(col("value"), schema), Spark va produce în liniște struct-uri toate-null fiindcă nu poate parsa binarul ca JSON. Eroarea e tăcută; te vei întreba de ce filtrele tale nu se potrivesc cu nimic.
Pentru Avro cu un Confluent Schema Registry, o configurare foarte comună în producție, pattern-ul este from_avro (în pyspark.sql.avro.functions) combinat cu clientul schema registry. Formatul wire Confluent pune un magic byte și un ID de schemă pe 4 octeți la începutul fiecărui mesaj, deci tipic faci substring(col("value"), 6, ...) pentru a sări peste prefix, apoi from_avro cu schema adusă din registry. Biblioteca abris ambalează asta frumos dacă nu vrei s-o faci singur; funcția simplă from_avro funcționează dacă poți codifica sau aduce schema separat.
Pentru Protobuf, poveste similară, Spark 4 vine cu from_protobuf. Același pattern.
Pentru „am doar JSON, viața e ușoară”, from_json e ce alegi. 80% din pipeline-urile Kafka-pe-Spark sunt exact asta.
Offset-uri și startingOffsets
startingOffsets este cea mai importantă opțiune la care nu te-ai gândit. Controlează de unde Spark începe să citească din Kafka la prima rulare a interogării. După aceea, checkpoint-ul preia.
Trei forme:
.option("startingOffsets", "latest") # implicit, porneste la sfarsitul fiecarei partitii
.option("startingOffsets", "earliest") # porneste la inceputul retentiei
.option("startingOffsets", """
{"user-events": {"0": 1234, "1": 5678, "2": 9012}}
""") # offset-uri explicite per partitie
latest (implicit) înseamnă „sari peste tot ce e deja în Kafka și procesează doar mesaje care sosesc după ce pornesc”. Rezonabil pentru cazuri de utilizare doar real-time unde evenimentele istorice nu contează. Periculos dacă efectiv voiai istoric și nu ți-ai dat seama că implicitul era latest.
earliest înseamnă „citește tot ce mai are Kafka”. Pentru un topic cu șapte zile de retenție și mult trafic, e un backfill. Spark va citi tot, în multe micro-batch-uri, înainte să ajungă din urmă „acum”-ul. Folosește asta când vrei legitim să faci bootstrap din istoric.
JSON explicit îți permite să pornești de la offset-uri precise per partiție, util când migrezi de pe alt sistem care s-a oprit la offset-uri cunoscute, sau când vrei să sari peste un mesaj specific defect.
Există și un endingOffsets simetric, dar se aplică doar citirilor batch (spark.read.format("kafka")), nu streaming-ului. Interogările streaming sunt nemărginite prin definiție.
După prima rulare, checkpoint-ul deține offset-urile. startingOffsets este ignorat la fiecare restart care are un checkpoint valid. Acesta e comportamentul corect, întregul scop al checkpoint-ului e să-și amintească unde ești. Dar înseamnă și: dacă schimbi startingOffsets după deployment, nu se întâmplă nimic. Pentru a reseta efectiv, trebuie să ștergi sau să relocalizezi checkpoint-ul, care apoi reaplică startingOffsets la următoarea pornire.
Asta e a doua cea mai comună capcană Kafka-Spark. Inginerul schimbă latest → earliest pentru a face backfill, redepune, nu vede nicio schimbare, se confuzează. Soluția e mereu: schimbă starting offsets și curăță checkpoint-ul și repornește.
At-least-once vine din cutie. Exactly-once are nevoie de ajutor.
Conectorul Kafka al lui Spark îți dă semantică at-least-once out of the box, cu un checkpoint:
- Spark citește un micro-batch din offset-urile
[a, b)per partiție. - Spark îl procesează și scrie în sink.
- Spark face commit la offset-urile
[a, b)în checkpoint.
Dacă jobul moare între pasul 2 și pasul 3, la restart Spark va reciti [a, b) și va reprocesa acele mesaje. Partea de output poate să fi primit deja unele dintre ele. Rezultat: unele rânduri pot apărea de două ori.
Pentru sink-uri idempotente, e OK. Exemple de sink-uri idempotente:
- Delta Lake / Iceberg cu
MERGEindexat după ID-ul evenimentului, rerularea e un no-op pentru rândurile deja merge-uite. - JDBC cu un upsert (
INSERT ... ON CONFLICT DO UPDATEîn Postgres,MERGEîn SQL Server) indexat după un ID stabil. - Kafka cu producătorul tranzacțional, configurat cu scrieri idempotente și un sink exactly-once.
Pentru sink-uri non-idempotente, at-least-once înseamnă duplicate. Opțiuni:
- Fă-l idempotent la nivelul aplicației. Adaugă un ID de eveniment la înregistrările tale (offset-ul Kafka funcționează ca unul) și consumatorul downstream face deduplicare. Ieftin, aproape mereu merită făcut.
- Folosește
foreachBatchcu o scriere tranzacțională. Primești DataFrame-ul micro-batch și faci commit la offset-uri și date în aceeași tranzacție. Dacă tranzacția face commit, ai terminat; dacă eșuează, niciuna din părți nu a avansat. - Folosește Delta sau Iceberg ca sink cu
txnVersion(Delta), înregistrează ID-uri de commit per interogare astfel încât micro-batch-urile reîncercate să nu se scrie de două ori.
Nu există un singur buton etichetat „exactly-once” în Structured Streaming. Există at-least-once + sink idempotent = efectiv exactly-once. Planifică în consecință.
maxOffsetsPerTrigger: supapa de siguranță pentru backfill
Iată un scenariu care prinde oamenii. Setezi startingOffsets=earliest pe un topic cu 7 zile de retenție și 100 GB de date. Pornești interogarea. Spark încearcă să citească totul în primul micro-batch, rămâne fără memorie de executor, jobul moare, restart-ul pornește de la început, moare iar, buclă infinită.
Soluția este maxOffsetsPerTrigger. Limitează numărul de offset-uri noi citite pe toate partițiile per micro-batch:
.option("maxOffsetsPerTrigger", 1_000_000)
Cu asta, primul micro-batch citește până la 1M de offset-uri, le procesează, face commit, iar următorul micro-batch citește următorul 1M. Backfill-ul ia multe micro-batch-uri (proporțional cu data totală / 1M), dar fiecare este mărginit și OOM-urile dispar. Odată ce interogarea ajunge din urmă „acum”-ul, fiecare micro-batch citește doar sosirile noi, ceea ce e de obicei bine sub plafon.
În producție setez asta mereu pentru orice topic cu potențial de backlog non-trivial. Implicitul e nelimitat, ceea ce e o capcană la primul deployment.
Există și minOffsetsPerTrigger (Spark 3.4+), care spune „așteaptă până când există cel puțin N mesaje noi înainte de a declanșa”, util pentru topicuri cu volum mic unde nu vrei să declanșezi un micro-batch pentru fiecare mesaj. Asociază-l cu maxTriggerDelay pentru a mărgini cât de mult va aștepta Spark.
Un job complet streaming Kafka-la-Parquet
Hai să punem totul la un loc. Citește evenimente JSON din Kafka, parsează, filtrează, scrie în Parquet, cu checkpointing corect și limite de offset:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = (SparkSession.builder
.appName("KafkaToParquet")
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0")
.getOrCreate())
payload_schema = StructType([
StructField("user_id", StringType(), False),
StructField("action", StringType(), False),
StructField("amount", DoubleType(), True),
StructField("event_ts", TimestampType(), False),
])
raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 500_000)
.option("failOnDataLoss", "false")
.load())
decoded = (raw
.select(col("partition"),
col("offset"),
col("timestamp").alias("kafka_ts"),
from_json(col("value").cast("string"), payload_schema).alias("d"))
.select("partition", "offset", "kafka_ts", "d.*"))
purchases = decoded.filter((col("action") == "purchase") & (col("amount") > 0))
query = (purchases.writeStream
.format("parquet")
.outputMode("append")
.option("path", "s3://my-bucket/events/purchases/")
.option("checkpointLocation", "s3://my-bucket/checkpoints/purchases/")
.trigger(processingTime="60 seconds")
.start())
query.awaitTermination()
Ce merită semnalat:
spark.jars.packagesaduce conectorul Kafka. Fără asta primești unClassNotFoundExceptionla prima citire.failOnDataLoss=falseîi spune lui Spark să nu se prăbușească dacă descoperă că offset-urile pe care le aștepta să le citească au fost șterse de retenția Kafka. Implicitul estetrue(prăbușire zgomotoasă), care e corect pentru job-uri unde datele lipsă sunt inacceptabile. Pentru job-uri de backfill best-effort,falseîi permite lui Spark să sară înainte cu un avertisment în loc să moară.- Checkpoint-ul e pe S3. Asta e OK pentru deployment-uri de producție fiindcă S3 e durabil. Nu-l pune pe disc local într-un job cluster; eșecurile de noduri îl vor pierde.
processingTime("60 seconds")se potrivește cu o cadență de aterizare de 1 minut pentru consumatori downstream. Reglează după gust.- Output-ul este Parquet partiționat după nimic acum, pentru un job de producție real, adaugă
.partitionBy("event_date")sau similar pentru a păstra directoarele Parquet navigabile. Scos pentru claritate.
Rulează asta. Aruncă mesaje pe user-events. Urmărește fișiere Parquet care apar în s3://my-bucket/events/purchases/. Omoară jobul. Aruncă mai multe mesaje. Repornește. Spark se reia din checkpoint, citește doar mesajele noi (fiindcă vechile offset-uri au fost commit-ate) și continuă.
Câteva ultime lucruri pe care le-aș spune unui coleg
group.ideste setat de Spark, nu de tine. Dacă setezikafka.group.idmanual, Spark va avertiza sau refuza. Spark gestionează offset-urile prin checkpoint-ul lui, nu prin offset-urile grupului de consumatori Kafka. Asta prinde oamenii care au venit de la Kafka Streams sau kafka-python; modelul mental e diferit.- O partiție Spark per partiție Kafka. Dacă topicul tău are 12 partiții, micro-batch-ul tău are 12 task-uri de input. Vrei mai mult paralelism downstream?
repartitiondupă citire. Vrei mai puțin? Combină cucoalescesau rămâi la implicit. Numărul de partiții Kafka îți limitează paralelismul de citire; vorbește cu echipa ta de platformă despre numărul de partiții dacă e prea mic. - Header-ele sunt expuse din Spark 3.0 și le poți și scrie pe sink-ul Kafka. Bune pentru ID-uri de tracing, versiuni de schemă, etichete de tenant.
- Atenție la time skew între brokeri și procesoare. Câmpul
timestampal Kafka este setat de cineva, broker, producător, în funcție de configurare. Dacă îl folosești pentru windowing, verifică de două ori că folosești timpul pe care îl înțelegi. Mai bine: folosește un câmp în corpul mesajului pe care tu îl controlezi.
Asta acoperă sursa. Lecția 52 e event time și watermarks, piesa care îți permite să scrii groupBy(window(...)) pe un stream Kafka și să primești rezultate corecte chiar și când evenimentele sosesc în afara ordinii. După aceea, modurile de output (53), operațiile cu stare (54), join-urile stream-stream (55) și vom avea un set complet de unelte pentru streaming.
Referințe: Apache Spark Structured Streaming + Kafka Integration Guide (https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) și documentația clientului Kafka (https://kafka.apache.org/documentation/). Consultat 2026-05-01.