PySpark, dalle fondamenta Lezione 59 / 60

Adaptive Query Execution: la killer feature di Spark 3.x

Dynamic partition coalescing, gestione dello skew a runtime e switch della strategia di join: le config da conoscere e i casi in cui AQE non può aiutarti.

Per gran parte della storia di Spark, l’optimizer ha funzionato come funziona ogni optimizer relazionale: leggi la query, guarda le statistiche delle tabelle, pianifica tutto in anticipo, esegui. Se le statistiche erano sbagliate (per esempio, stai facendo join su una tabella che esce da uno stage Spark precedente senza statistiche reali), il piano era sbagliato e ti beccavi tu il costo.

Spark 3.0 ha introdotto l’Adaptive Query Execution (AQE) e ha cambiato le carte in tavola. AQE riscrive il piano fisico a runtime, dopo che gli stage iniziali sono completati e Spark ha le dimensioni reali invece di stime. Da Spark 3.2 è attivo di default. In 3.5 è diventato fondamentale per metà delle ottimizzazioni che la documentazione raccomanda. Se sei su una versione recente di Spark e non stai usando AQE, ti stai lasciando sul tavolo il 30-50% dell’optimizer.

Questa lezione è su cosa fa davvero AQE, le config che lo controllano e dove non può ancora aiutarti.

Le tre cose che fa AQE

AQE è una collezione di ottimizzazioni a runtime, ma in pratica te ne interessano tre.

1. Dynamic partition coalescing

Questa è quella di cui beneficiano subito quasi tutti i job.

Senza AQE: imposti spark.sql.shuffle.partitions = 200. Spark fa sempre lo shuffle in 200 partition. Se i dati post-shuffle sono in totale 4 GB, sono 20 MB per partition, va bene. Se sono 40 MB in totale, sono 200 KB per partition e hai 200 task di overhead a processare partition quasi vuote. Se sono 400 GB, hai 200 partition da 2 GB ciascuna, pesanti per un singolo task.

Il numero 200 è una stima fatta prima che Spark sapesse alcunché dei dati.

Con AQE: dopo lo shuffle, Spark guarda quanto è grande davvero ogni partition di output e fa il coalesce di quelle piccole adiacenti in una più grande. La stima delle 200 partition diventa 8 partition da 50 MB se è quello che i dati richiedono. Meno overhead, meno task minuscoli, job più veloce.

Non hai quasi mai bisogno di chiamare coalesce() a mano dopo un groupBy, ormai. Ci pensa AQE.

# Configs that drive coalescing
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.coalescePartitions.minPartitionNum": "1",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "64m",

Il valore advisoryPartitionSizeInBytes è quello a cui AQE punta dopo il coalescing. 64 MB è il default; molti team lo alzano a 128 MB o 256 MB per i job batch per ridurre ulteriormente il numero di task. Non andare oltre quello che ci sta comodamente nel working set di un singolo task.

2. Gestione dello skew nei join

La seconda cosa che fa AQE è rilevare a runtime le partition skewed nei join e dividerle.

Senza AQE: un SortMergeJoin su una chiave dove l’80% delle righe ha country = 'US' produce una partition gigante processata da un solo task: la classica coda di skew. Te ne stai lì a guardare un task che gira per 40 minuti mentre gli altri 199 hanno finito in 10 secondi. Il salting (lezione 29) era la fix manuale.

Con AQE: Spark si accorge che una partition shuffled è più di skewedPartitionFactor x la mediana (default 5x) e più grande di skewedPartitionThresholdInBytes (default 256 MB). Quando entrambe le condizioni valgono, Spark divide quella partition grassa in pezzi più piccoli, replica le righe corrispondenti dall’altro lato e le esegue in parallelo. Salting automatico per i casi facili, senza modifiche al codice.

"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.adaptive.skewJoin.skewedPartitionFactor": "5",
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes": "256MB",

Se continui a sbattere contro lo skew con queste attive, abbassa il fattore (prova 3) o la soglia (prova 64 MB): AQE dividerà in modo più aggressivo. Il tradeoff è che uno splitting aggressivo su un dataset non skewed aggiunge overhead, quindi non spingere senza una vittoria misurata.

3. Switch della strategia di join

La terza è più sottile, ma rende sui job grossi. Spark aveva pianificato un SortMergeJoin perché al momento della pianificazione entrambi i lati sembravano troppo grandi per fare broadcast. Ma magari il build side esce da un filtro precedente che butta via il 99% delle righe. Al planning time Spark stimava 5 GB; a runtime sono in realtà 30 MB. Con AQE, Spark se ne accorge e cambia il join in un BroadcastHashJoin, molto più economico.

Questo è attivo di default con AQE. La config è spark.sql.adaptive.localShuffleReader.enabled (true), che permette al lato broadcast di leggere i file di shuffle in locale senza rishufflare l’altro lato. Lo spark.sql.autoBroadcastJoinThreshold continua a valere: dimensioni a runtime sotto la soglia ribaltano il join.

Come funziona davvero AQE sotto il cofano (in breve)

Vale la pena capirlo una volta, perché spiega i limiti.

Un piano Spark è un albero di stage separati da shuffle. Senza AQE, Spark pianifica l’intero albero in anticipo e lo esegue. Con AQE, Spark pianifica fino al primo confine di shuffle, lo esegue, misura le dimensioni reali di output e poi ripianifica il resto dell’albero usando quei numeri reali invece delle stime dell’optimizer. Lo fa in modo ricorsivo a ogni confine di shuffle.

Quel ripianificare è dove succede la magia. Con le dimensioni reali in mano, l’optimizer può:

  • Vedere che un lato “grande” è in realtà piccolo e passare a broadcast.
  • Vedere che una partition è skewed e dividerla.
  • Vedere che i dati post-shuffle sono piccoli e fare il coalesce delle partition.

Le due conseguenze di questo design:

  • AQE aiuta solo ai confini di shuffle. Niente shuffle, niente misurazione, niente re-plan. Per questo lo skew lato sorgente senza shuffle non può beneficiarne (più avanti).
  • AQE aggiunge un piccolo overhead di pianificazione per stage. Per query molto veloci su input minuscoli, l’overhead può superare il beneficio. Per tutto il resto, è trascurabile rispetto al risparmio a runtime.

Puoi vedere il confine nel piano SQL: ogni shuffle è avvolto in uno ShuffleQueryStage, e AQE si inserisce tra gli stage.

Un before/after reale

Un esempio piccolo ma realistico. Leggi una fact table, filtri a un singolo cliente, fai join con una dim table. Senza AQE, il filtro non cambia la strategia di join e resti in territorio SortMergeJoin:

spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "200")

orders = spark.read.parquet("s3://lake/orders/")          # 80 GB
customers = spark.read.parquet("s3://lake/customers/")    # 12 GB

orders.filter("customer_id = 'cust_42'") \
      .join(customers, "customer_id") \
      .groupBy("region") \
      .sum("amount") \
      .write.parquet("s3://out/cust42-by-region/")

Quel filtro butta giù il lato orders a circa 80 MB. Ma Spark ha pianificato il join quando entrambi i lati erano grossi, quindi ottieni 200 task reducer per circa 80 MB di dati uniti a 12 GB. Wall clock: 14 minuti.

Accendi AQE, stesso job:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

Cosa cambia:

  • Dopo lo shuffle del filtro, AQE vede che il lato orders è 80 MB e cambia il piano per fare un broadcast-hash-join del lato orders dentro customers (il lato piccolo si è invertito: ora è l’orders filtrato).
  • Lo shuffle del groupBy post-join ha 200 partition ma da circa 150 KB ciascuna. AQE fa il coalesce a 4-8 partition.

Wall clock: 90 secondi. Stesso codice, stessi dati, una sola modifica di config.

Non vedrai sempre 10x. Ma spesso vedrai 2-3x su workload reali, e AQE quasi mai rende un job più lento: nel peggiore dei casi è neutro su query già ottimali.

Leggere i piani riscritti da AQE

Il tab SQL nella UI mostra il piano a runtime. Vedrai operatori come AdaptiveSparkPlan, CustomShuffleReader (coalesced o local) e ShuffleQueryStage. C’è una vista “Initial Plan” e una “Final Plan”; il diff tra le due è esattamente quello che AQE ha riscritto.

Alcuni pattern da riconoscere:

  • CustomShuffleReader coalesced -> è avvenuto il coalescing, meno partition di quanto pianificato originariamente.
  • BroadcastHashJoin accanto a uno ShuffleQueryStage -> AQE ha ribaltato un SortMergeJoin in broadcast a runtime.
  • Letture multiple di CustomShuffleReader dello stesso shuffle, con annotazioni Skewed=true -> AQE ha gestito lo skew.

Se non vedi queste annotazioni su una query che ti aspettavi AQE ottimizzasse, controlla che AQE sia davvero attivo. spark.conf.get("spark.sql.adaptive.enabled") dovrebbe restituire "true". Alcune piattaforme escono di fabbrica con AQE disattivato nei default del cluster: Databricks lo ha attivo, lo Spark open-source vanilla < 3.2 lo ha spento.

Dove AQE non può ancora aiutarti

AQE è fantastico. Non è magia. Non può aiutarti con:

Skew lato sorgente, niente shuffle. Se la tua chiave skewed è nel file di input (un file Parquet ha il 90% delle righe per country='US' per come è stato scritto) e la tua query non fa shuffle, AQE non ha nulla a cui reagire. La fix è fare repartition della sorgente in lettura, o sistemare la scrittura a monte, o filtrare la chiave calda e processarla a parte. La lezione 28 vale ancora.

Broadcast join. Una volta che un join è un broadcast, non c’è shuffle da ispezionare. Se il lato broadcast ha skew che esplode sul join (un one-to-many in cui una chiave matcha l’80% del build side), AQE non può intervenire. Ti serve una strategia di join diversa o il salting della lezione 29.

Skew su colonne derivate. Se la tua chiave di join è concat(country, city) e lo skew è su country, AQE guarda le dimensioni di partition della chiave concatenata. Funziona comunque nella maggior parte dei casi, ma se la concat distribuisce lo skew su più partition in modo abbastanza uniforme da sfuggire allo skewedPartitionFactor, AQE non dividerà. Ti tocca ispezionare a mano.

Coalescing di partition minuscole dentro uno stage memory-bound. AQE fa il coalesce delle partition piccole per ridurre l’overhead. Se le esigenze di memoria per task dello stage successivo scalano con la dimensione di partition (una window function con uno spec di partizione enorme, un’aggregazione ad alta cardinalità), il coalescing in partition più grandi può spingerti in territorio di spill. Tieni d’occhio lo spill nello stage successivo; se lo vedi dopo aver attivato AQE, riabbassa advisoryPartitionSizeInBytes.

Streaming. AQE non si applica alle query Structured Streaming. Il motore di streaming pianifica i micro-batch in modo diverso e molte riscritture di AQE romperebbero la semantica incrementale. Per lo streaming si fa il tuning delle partition esplicitamente.

Cosa accendere, oggi

Per i job batch su Spark 3.2+, la config di default sicura è:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")

La maggior parte delle piattaforme ha già tutto attivo di default. Verifica con spark.conf.get. Se sei su un cluster self-managed con Spark 3.0 o 3.1, AQE era spento di default: accendilo, fai girare i job, guardali diventare più veloci.

I rari casi in cui disattiveresti AQE: una pipeline molto stabile, ottimizzata a mano, in cui hai misurato una regressione, o una query in cui l’overhead a runtime del planner AQE supera il beneficio (query molto brevi su input piccoli). Sono casi limite. La risposta di default è “lascialo acceso”.

Riannodando con la checklist di debug

La lezione 58 era il loop diagnostico: trova lo stage lento, controlla skew, GC, volume di shuffle, piano. AQE è la risposta a “ho lo skew” e “le mie shuffle partition sono sbagliate” per molti job. Se stai seguendo la checklist e continui a trovare skew, la prima domanda è se AQE sia attivo. La seconda è se sta catturando lo skew (guarda nel piano SQL le annotazioni skewedPartition). La terza, solo se entrambe sono vere, è se ti serve salting manuale sopra.

Nella maggior parte dei giorni, AQE basta. Nella maggior parte dei giorni non ci devi pensare. Quello è il punto.

La prossima lezione è il capstone del corso: un health check da 30 minuti su un cluster Spark che non hai mai visto. La sweep di chiusura che mette insieme tutto quello che abbiamo coperto. Porta caffè.

Cerca