La Spark UI ti racconta cos’è successo a cose fatte. .explain() ti dice cosa sta per succedere prima ancora che parta un singolo task. Servono entrambi: la UI conferma la realtà, .explain() ti permette di prevederla. Un Spark engineer senior legge i plan con lo stesso atteggiamento con cui un DBA legge un EXPLAIN ANALYZE di Postgres: con un filo di paranoia e l’occhio puntato sugli operatori che non dovrebbero esserci.
Abbiamo accennato ai plan nella lezione 41, quando abbiamo coperto Catalyst. Questa lezione è la versione production-grade: leggere ogni riga, sapere quali operatori contano, e capire la differenza tra ciò che .explain() stampa e ciò che effettivamente gira sotto Adaptive Query Execution.
Cosa stampa .explain
Di default, .explain() mostra solo il physical plan, quello che Spark eseguirà davvero.
df.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[country#42], functions=[sum(amount#15)])
+- Exchange hashpartitioning(country#42, 200), ENSURE_REQUIREMENTS, [plan_id=...]
+- *(1) HashAggregate(keys=[country#42], functions=[partial_sum(amount#15)])
+- *(1) Project [amount#15, country#42]
+- *(1) BroadcastHashJoin [user_id#14], [user_id#41], Inner, BuildRight
:- *(1) Filter (isnotnull(amount#15) AND (amount#15 > 40.0))
: +- *(1) Scan parquet orders[order_id#13,user_id#14,amount#15]
: PushedFilters: [IsNotNull(amount), GreaterThan(amount,40.0)]
+- BroadcastExchange HashedRelationBroadcastMode(...)
+- *(1) Filter isnotnull(user_id#41)
+- *(1) Scan parquet users[user_id#41,name#42,country#43]
.explain(True) (oppure .explain(extended=True)) mostra tutte e quattro le fasi: parsed, analyzed, optimized e physical. È quello che vuoi quando stai facendo debug di un plan inatteso, perché vedere il percorso di ottimizzazione spesso ti dice quale rewrite Catalyst ha saltato o applicato in un modo che non ti aspettavi.
df.explain(True)
C’è anche .explain("formatted"), molto più leggibile sui plan lunghi: numera gli operatori e stampa un blocco “details” separato. E .explain("cost"), che aggiunge stime di row count e size dal cost model dell’optimizer. Ci torniamo.
Direzione di lettura
I plan sono stampati dall’alto verso il basso ma vengono eseguiti dal basso verso l’alto. Le foglie sono gli input (file scan, exchange che ricevono dati di shuffle), la radice è l’action che ha innescato il plan.
Quando leggo un plan, parto dalle foglie. “Cosa sto leggendo, quali colonne, quali filtri sono stati pushati?” Poi traccio verso l’alto: “cosa fa Spark a questi dati lungo la strada verso l’action?”
I caratteri :- e +- disegnano l’albero. I figli di un operatore sono indentati sotto di lui.
Operatori da imparare a memoria
Non serve conoscere ogni operatore. Servono questi.
Scan parquet (o Scan orc, Scan json, ecc.) è la lettura del file. La riga sotto elenca PushedFilters e PartitionFilters. Le colonne che Spark legge davvero sono tra parentesi quadre: quello è il column pruning all’opera.
Project è la selezione di colonne o la valutazione di espressioni. Dovrebbe costare poco.
Filter è un predicato a livello di riga. Se vedi un Filter subito sopra uno Scan parquet e lo stesso predicato è anche in PushedFilters, stanno lavorando entrambi: il source ha fatto del suo meglio, Spark ripulisce ciò che non si è potuto pushare. Se il predicato non è in PushedFilters, non si è potuto pushare (spesso a causa di una UDF o di un’espressione complessa) e Spark sta filtrando dopo aver letto ogni riga.
HashAggregate è un’aggregazione hash-based (group by). Spesso compare due volte nel plan attorno a un Exchange: un’aggregazione parziale prima dello shuffle, una finale dopo. Quel pattern di aggregazione in due passi è una delle ottimizzazioni più importanti di Spark: è il motivo per cui groupBy().count() non shuffla ogni riga di input.
Sort è un sort. Costoso se non è pushato nel lato richiesto da un SortMergeJoin.
Exchange hashpartitioning(...) è lo shuffle. Ogni shuffle è un Exchange. È l’operatore più costoso del tuo plan, punto. Contali. Ognuno è un round-trip di rete piu’ una scrittura su disco piu’ una lettura da disco. Gli Exchange inattesi sono il bug di performance numero uno.
BroadcastExchange trasmette in broadcast un dataset piccolo a ogni executor. Costa poco se il dataset è davvero piccolo: il driver lo raccoglie e lo spedisce. Di solito appare sotto un BroadcastHashJoin.
BroadcastHashJoin è il join economico. Un lato viene broadcastato, l’altro fa hash-probe in locale. Spark sceglie questo quando un lato è sotto spark.sql.autoBroadcastJoinThreshold (10 MB di default).
SortMergeJoin è il join di default sicuro. Entrambi i lati shufflati allo stesso partitioning, sortati, poi mergiati. Due Exchange sotto.
ShuffleHashJoin entrambi i lati shufflati, il lato più piccolo costruito in una hash table. Meno comune; AQE a volte lo preferisce quando le dimensioni sono intermedie.
BroadcastNestedLoopJoin è l’antipattern. O(n per m). Spark ricade qui quando non riesce a scegliere una strategia hash, di solito perché la condizione di join è un predicato non-equi (<, BETWEEN, una chiamata di funzione) e deve confrontare ogni riga sinistra con ogni riga destra. Se lo vedi in un plan su dati di dimensioni reali, il job non finirà. Riscrivi la condizione come equi-join con un range filter sopra.
WholeStageCodegen non è davvero un operatore, è più un wrapper. I marcatori *(1), *(2) davanti agli operatori ti dicono che quegli operatori sono fusi in una singola funzione generata a runtime. Bytecode generation di Tungsten. Se vedi *(1) su Filter, Project, HashAggregate, Spark ha scritto un unico ciclo grosso per tutti e tre che gira senza dispatch virtuale. Il confine tra stage di code-gen è di solito uno shuffle o un operatore che non supporta codegen (alcune UDF, certe operazioni Python). Due operatori con lo stesso *(N) sono fusi; con N diverso, no.
Le righe PushedFilters / PartitionFilters
Sotto uno Scan parquet vedrai qualcosa tipo:
PushedFilters: [IsNotNull(amount), GreaterThan(amount,40.0)]
PartitionFilters: [isnotnull(dt#17), (dt#17 = 2026-05-01)]
PushedFilters sono pushati giu’ al reader del formato di file. Le statistiche del footer di Parquet gli permettono di saltare row group il cui min/max esclude il predicato. Questo è data skipping a livello di colonna: il tuo filtro su amount > 40 permette a Parquet di saltare interi pezzi di file senza decodificarli.
PartitionFilters prunano intere directory prima ancora che venga aperto un file. Se i tuoi dati sono partizionati per dt, e filtri su dt = '2026-05-01', quel partition filter significa che Spark elenca esattamente una directory e ignora il resto. I PartitionFilter sono i filtri più economici in Spark, avvengono al planning time.
Cosa vuoi vedere in un plan sano: ogni filtro selettivo che hai scritto compare in PushedFilters o PartitionFilters. Se un filtro compare solo come operatore Filter sopra lo scan, senza nulla in PushedFilters, stai leggendo l’intero file e scartando righe. Cause comuni:
- Type mismatch (
dt = '2026-05-01'contro una colonnaDATE, serve un cast). - Una UDF dentro il filtro: le UDF sono opache all’optimizer.
- Un’espressione che il source non sa rappresentare (regex su testo, calendar math complessa).
Sistema il filtro, guardalo atterrare in PushedFilters, guarda i byte di input scendere nella UI. Causa ed effetto.
Un esempio passo passo
Un piccolo join tra due tabelle. Lo facciamo explain e ripercorriamo le fasi.
from pyspark.sql import functions as F
orders = spark.read.parquet("/data/orders/") # ~5 GB, partitioned by dt
users = spark.read.parquet("/data/users/") # ~30 MB
q = (orders
.filter(F.col("dt") == "2026-05-01")
.filter(F.col("amount") > 40)
.join(users, "user_id")
.groupBy("country")
.agg(F.sum("amount").alias("total")))
q.explain(True)
.explain(True) stampa quattro blocchi. Scorrili in quest’ordine.
Parsed Logical Plan è l’AST letterale del tuo codice. I riferimenti a colonna sono non risolti ('amount, con l’apostrofo che indica il non risolto). Utile solo quando vuoi confermare che Spark ha parsato la tua query come l’hai scritta. Da saltare nel 99% dei casi.
Analyzed Logical Plan: i riferimenti a colonna sono risolti a colonne concrete con ID (amount#15), tipi attaccati. Se hai un typo in un nome di colonna, è qui che esplode. Se vedi ID di colonna che non riconosci, sono alias o colonne auto-generate dal join.
Optimized Logical Plan: Catalyst ha eseguito tutti i rewrite rule-based. Qui vedi cosa l’optimizer ha effettivamente fatto alla tua query. Trasformazioni comuni da cercare:
- I due filtri uniti in uno:
Filter ((dt = '2026-05-01') AND (amount > 40)). - Colonne prunate: restano solo
user_id,amount,country. Il join usauser_id, la projection serveamountecountry, il resto è andato. - Predicati pushati sotto i join: i filtri su
ordersavvengono prima del join, non dopo.
Se hai scritto un filtro e non compare nell’optimized plan, l’optimizer l’ha scartato perché ridondante o sempre vero. Se hai scritto una colonna che non hai poi usato, il projection pruning l’ha rimossa. Leggere l’optimized plan ti dice cosa Catalyst ha tenuto.
Physical Plan è il plan eseguibile. Percorrilo dalle foglie alla radice:
- Scan parquet orders con
PartitionFilters: [(dt = '2026-05-01')]ePushedFilters: [IsNotNull(amount), GreaterThan(amount, 40.0)]. Entrambi i filtri sono atterrati nel source. Bene. - Filter sopra: Spark ricontrolla il predicato pushato. Costa poco.
- Scan parquet users: piccolo, dovrebbe essere broadcastato.
- BroadcastExchange: il driver raccoglie users, broadcasta.
- BroadcastHashJoin: orders fa probe sulla hash table broadcastata. Ottimo, nessuno shuffle sul lato grosso.
- Project: tieni solo
amount,country. - HashAggregate (partial): somma parziale dentro ogni task.
- Exchange hashpartitioning(country, 200): shuffle per country.
- HashAggregate (final): somma finale per country.
Un Exchange in totale. La tabella grossa non ha mai shufflato, il broadcast ci ha salvati. Quello è il plan che vuoi.
Se invece avessi visto due Exchange e un SortMergeJoin, il broadcast non è scattato, di solito perché le statistiche del lato piccolo l’hanno fatto sembrare più grande di spark.sql.autoBroadcastJoinThreshold. Puoi forzarlo con F.broadcast(users) o alzare la soglia.
.explain non è sempre la verità: AQE
Ecco la realtà di produzione: con Adaptive Query Execution attivo (default in Spark 3.2+), il physical plan può essere riscritto a runtime basandosi sulle statistiche di shuffle reali. AQE può:
- Coalescere partition piccole post-shuffle (meno task per dati banali).
- Cambiare un SortMergeJoin in un BroadcastHashJoin se la dimensione post-shuffle è abbastanza piccola da broadcastare.
- Spezzare partition skewed in subpartition più piccole.
Quello che .explain() stampa è il physical plan pre-AQE, il plan come prodotto dall’optimizer statico. Il plan che gira davvero sta nel tab SQL della UI, dove il grafo degli operatori riflette il plan riscritto da AQE con i box AQEShuffleRead e gli eventuali cambi di strategia di join.
L’implicazione: .explain() serve a capire l’intento e prevedere il caso peggiore. Il tab SQL della UI serve a confermare cosa è successo. Se differiscono, AQE ha riscritto qualcosa, di solito in meglio.
Le altre modalita’
.explain("formatted") è quella che uso quando il plan supera le 30 righe circa. L’output è cosi’:
== Physical Plan ==
* HashAggregate (8)
+- Exchange (7)
+- * HashAggregate (6)
+- * Project (5)
+- * BroadcastHashJoin (4)
...
(1) Scan parquet orders
Output: [order_id#13, user_id#14, amount#15]
PushedFilters: [...]
...
(2) Filter
Input: [...]
Condition: (amount#15 > 40.0)
...
L’albero è in cima con nodi numerati; i dettagli per ogni nodo numerato sono sotto. Molto più leggibile della forma indentata sui plan grossi.
.explain("cost") aggiunge le stime di row count e size dell’optimizer accanto a ogni operatore. Leggile con scetticismo: vengono dalle statistiche di tabella, e le statistiche driftano; sono utili come segnale di prim’ordine, non come vangelo.
Cosa fartene
Quando una query è lenta, prima di mettere mano al tuning:
- Lancia
.explain(True). Percorri l’optimized plan e il physical plan. - Conta gli Exchange. Ognuno è uno shuffle. Sono più di quanti te ne aspettassi?
- Controlla gli algoritmi di join. C’è qualche
BroadcastNestedLoopJoin? QualcheSortMergeJoindove volevi un broadcast? - Controlla
PushedFiltersePartitionFilterssu ogni scan. I tuoi filtri sono stati pushati? - Lancia la query. Apri il tab SQL nella UI. Confronta il plan riscritto da AQE con quello che
.explainha stampato. Annota cosa AQE ha cambiato.
Quel rituale di cinque minuti intercetta più problemi di performance di qualsiasi quantità di “fammi tunare spark.sql.shuffle.partitions”. Il plan è la verità. Tuna il plan, non i config.
Prossima lezione: quando niente di tutto questo ti salva e l’executor muore. Memory tuning e postmortem dell’OOM.