Dacă există o parte din Structured Streaming care îi încurcă pe aproape toți la prima întâlnire, e asta: există două ceasuri și nu sunt de acord. Până nu interiorizezi acest lucru, agregările tale vor produce numere care par corecte la prima vedere și sunt, în liniște și subtil, greșite.
Lecția asta e despre cele două ceasuri și mecanismul pe care Spark ți-l dă ca să le împaci: watermark-ul.
Cele două timpuri
Fiecare eveniment care trece prin pipeline-ul tău are asociat cel puțin două timestamp-uri.
Event time este momentul în care lucrul s-a întâmplat efectiv. Utilizatorul a apăsat „buy” la 10:03:47. Senzorul IoT a înregistrat o temperatură la 10:03:47. Acel moment este un fapt imuabil despre lume.
Processing time este momentul în care Spark vede rândul. Click-ul a călătorit printr-un CDN, a stat câteva secunde într-o partiție Kafka, a fost preluat de un trigger de micro-batch; Spark îl poate procesa la 10:03:51, sau la 10:04:30, sau, dacă Kafka era în întârziere, la 10:18:00.
Într-o lume sincronă perfectă, cele două ar fi egale. În sisteme reale, nu sunt niciodată egale. Clienții mobili buffer-uiesc evenimentele când sunt offline și le trimit o oră mai târziu. Hopurile de rețea cauzează retry-uri. Un proces producător repornește și îți redă commit log-ul. Un job batch ingerează azi fișierul CSV de ieri. Toate acestea produc evenimente cu event time vechi care sosesc la processing time-uri proaspete.
Acum imaginează-ți că vrei să calculezi „page views per fereastră de 10 minute”. Varianta naivă e:
from pyspark.sql.functions import window, count
views_by_processing_time = (events
.groupBy(window("processing_ts", "10 minutes"), "page")
.agg(count("*").alias("views")))
Asta e simplu, pare corect și e greșit. Nu măsori „views în fereastra 10:00-10:10 a lumii”. Măsori „rândurile pe care Spark s-a întâmplat să le ingereze între 10:00 și 10:10”. Sunt mulțimi diferite și diferența dintre ele e exact de unde vin bug-urile tale de raportare.
Ce vrei aproape întotdeauna este event-time windowing:
views_by_event_time = (events
.groupBy(window("event_time", "10 minutes"), "page")
.agg(count("*").alias("views")))
window("event_time", "10 minutes") îi spune lui Spark: pune fiecare rând într-o fereastră de 10 minute pe baza coloanei event_time. Un rând cu event_time = 10:03:47 merge în fereastra [10:00, 10:10) indiferent de când îl procesează Spark. Un rând cu event_time = 09:58:12 care se întâmplă să sosească la processing time 10:05 merge în fereastra [09:50, 10:00), nu în cea curentă.
Acesta este modelul. Acum problema.
Problema unbounded-state
Dacă Spark acceptă late events la nesfârșit, state-ul pentru ferestrele trecute nu poate fi eliberat niciodată. Fiecare fereastră de 10 minute are running count-ul ei stând în memorie (sau RocksDB, sau oriunde îți trăiește state store-ul), așteptând posibilitatea ca încă un întârziat de la 09:50 să apară. După o săptămână de rulare, ai 1.008 ferestre de 10 minute, fiecare ținând state pentru fiecare cheie page care a apărut vreodată. După o lună, ai 4.320. Memoria crește nelimitat, state store-ul devine mai lent și, în cele din urmă, jobul de streaming moare.
Ai nevoie de o metodă să-i spui lui Spark: „după acest punct, nu mai aștepta. Închide fereastra, emite rezultatul final, aruncă state-ul.”
Asta este un watermark.
Ce este de fapt un watermark
Un watermark este o garanție pe care i-o dai tu lui Spark că nu vor mai sosi evenimente cu event_time mai vechi decât un anumit prag X. Odată ce Spark observă acea garanție pentru o anumită fereastră, adică odată ce watermark-ul a depășit sfârșitul ferestrei, consideră fereastra închisă, finalizează agregarea și aruncă state-ul.
Spark calculează watermark-ul continuu. Politica de bază este:
watermark = max(event_time observed so far) − lateness_budget
Tu configurezi lateness budget-ul. Acela e butonul. Un budget de 5 minute spune: „Spark, dacă ai văzut un eveniment la 10:14, presupune că nimic mai vechi de 10:09 nu va mai apărea.”
API-ul:
from pyspark.sql.functions import window, count
aggregated = (events
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes"), "page")
.agg(count("*").alias("views")))
withWatermark("event_time", "5 minutes") face două lucruri deodată:
- Îi spune lui Spark ce coloană poartă event time (ca să știe ce să urmărească).
- Setează lateness budget-ul; evenimentele care sosesc la mai mult de 5 minute după watermark-ul curent sunt considerate prea târzii și aruncate în tăcere.
A doua parte e critică și ușor de ratat. Un watermark schimbă corectitudinea pe boundedness. Tu spui explicit „prefer să arunc 0,1% din late stragglers decât să las state-ul să crească la nesfârșit.” Mărimea acelui 0,1% este alegerea ta; ajustează lateness budget-ul în funcție de cât de târziu sosesc evenimentele tale reale.
Exemplu lucrat: parcurgere prin evenimente concrete
Hai să rulăm evenimente reale prin asta și să vedem ce se întâmplă. Setup:
- Fereastră: 10 minute, tumbling. Deci ferestrele sunt
[10:00, 10:10),[10:10, 10:20)etc. - Lateness watermark: 5 minute.
- Evenimentele sosesc în această ordine processing-time (
event_timelor e arătat în paranteze):
| # | processing_ts | event_time | window |
|---|---|---|---|
| 1 | 10:00:30 | 10:00 | [10:00,10:10) |
| 2 | 10:09:10 | 10:09 | [10:00,10:10) |
| 3 | 10:11:05 | 10:11 | [10:10,10:20) |
| 4 | 10:11:40 | 10:08 LATE | [10:00,10:10) |
| 5 | 10:14:20 | 10:14 | [10:10,10:20) |
| 6 | 10:18:05 | 10:06 TOO LATE | dropped |
Parcurgere:
Eveniment 1 (event_time 10:00). Primul eveniment văzut. Max event time până acum = 10:00. Watermark = 10:00 − 5min = 09:55. Fereastra [10:00,10:10) are running count = 1.
Eveniment 2 (event_time 10:09). Aceeași fereastră. Max event time = 10:09. Watermark = 10:09 − 5min = 10:04. Window count este acum 2. Watermark-ul e la 10:04, încă în interiorul ferestrei [10:00,10:10), deci fereastra rămâne deschisă.
Eveniment 3 (event_time 10:11). Aterizează în următoarea fereastră. Max event time = 10:11, watermark = 10:06. Încă în interiorul [10:00,10:10), deci acea fereastră rămâne deschisă. Noua fereastră [10:10,10:20) count = 1.
Eveniment 4 (event_time 10:08). Late! Event_time-ul lui e 10:08, dar processing time este 10:11:40, mult după sfârșitul nominal al ferestrei. Este în budget-ul watermark-ului? Watermark-ul curent este 10:06, iar 10:08 > 10:06, deci da, este acceptat. Window [10:00,10:10) count merge la 3. Asta e watermark-ul care își câștigă pâinea: late events sunt incluse atâta timp cât nu sunt prea târzii.
Eveniment 5 (event_time 10:14). Max event time = 10:14, watermark = 10:09. Acum, 10:09 a atins sfârșitul lui [10:00,10:10). Spark închide acea fereastră, emite count-ul final de 3 și aruncă state-ul pentru ea. Noul watermark înseamnă și că viitoarele evenimente dinainte de 10:09 vor fi respinse.
Eveniment 6 (event_time 10:06). Watermark este 10:09. 10:06 < 10:09, deci acest eveniment este aruncat în tăcere. E prea târziu. Fereastra a dispărut. Spark nu o va redeschide.
Câteva lucruri de reținut din acea parcurgere:
- Watermark-ul se mișcă doar înainte. Chiar dacă evenimentul 6 ar fi fost event time 10:00:00, nu ar fi tras watermark-ul înapoi.
- Late events în budget actualizează fereastra încă deschisă; nu primești un eveniment separat de „late update”; running aggregation pur și simplu le absoarbe.
- Fereastra se închide când watermark-ul îi depășește sfârșitul. Output-ul (în append mode) pentru
[10:00,10:10)se emite doar după ce evenimentul 5 e procesat, deși wall clock-ul e bine peste 10:10. - Evenimentele în afara budget-ului sunt aruncate. Nu există nicio eroare, niciun warning în datele tale, doar un counter în metrici.
Acel ultim punct e motivul pentru care monitorizarea contează.
Watermark monitoring
În tab-ul Streaming din Spark UI, fiecare micro-batch arată două valori pe care ar trebui să înveți să le citești:
- Event time max: cel mai mare event time observat în acest batch.
- Watermark: valoarea curentă a watermark-ului.
Plotează-le în timp. Ar trebui să avanseze împreună, cu un gap aproximativ constant (lateness budget-ul tău). Dacă watermark-ul nu mai avansează, stream-ul tău e blocat (niciun max event time nou, de obicei o sursă oprită). Dacă gap-ul explodează, input-ul tău e brusc foarte târziu.
Urmărește și metrica numRowsDroppedByWatermark. O valoare nenulă înseamnă că arunci late events; o valoare care crește constant înseamnă că lateness budget-ul tău e prea mic pentru pattern-ul real de sosire.
Alegerea lateness budget-ului
Nu există o formulă. Îți măsori datele. Calculează, pe un eșantion reprezentativ:
from pyspark.sql.functions import col, unix_timestamp
lateness = (events.select(
(unix_timestamp("processing_ts") - unix_timestamp("event_time")).alias("lag_seconds")
))
lateness.summary("min", "50%", "95%", "99%", "max").show()
Percentila 99 e, de obicei, un budget rezonabil: accepți 1% pierdere în schimbul unui state mărginit. Dacă, literalmente, nu poți tolera aruncarea de evenimente, nu vrei deloc agregare în streaming; vrei un append-only fact table scris într-un lake, cu un job batch separat care recalculează zilele recente în fiecare noapte.
Un query funcțional cap-coadă
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, col, from_json
from pyspark.sql.types import StructType, StringType, TimestampType
spark = SparkSession.builder.appName("EventTimeDemo").getOrCreate()
schema = StructType().add("page", StringType()).add("event_time", TimestampType())
raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "pageviews")
.load())
events = (raw
.select(from_json(col("value").cast("string"), schema).alias("e"))
.select("e.*"))
windowed = (events
.withWatermark("event_time", "5 minutes")
.groupBy(window("event_time", "10 minutes"), "page")
.agg(count("*").alias("views")))
query = (windowed.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", "/tmp/ck/views")
.start())
outputMode("append") aici este perechea naturală cu watermarks: fiecare fereastră este emisă exact o dată, după ce watermark-ul îi depășește sfârșitul. Vom petrece următoarele două lecții despachetând output modes mai cu atenție.
Despre ce nu am vorbit încă este ce se întâmplă în spatele watermark-ului: unde stochează Spark running counts, cum supraviețuiește unui restart al jobului și ce alte pattern-uri stateful există dincolo de simpla agregare. Asta e lecția 53: state store-ul, sessionization și mapGroupsWithState.
Referințe: Apache Spark Structured Streaming Programming Guide, secțiunile despre event time și watermarking (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking). Consultat 2026-05-01.