La prima volta che mi sono seduto con un collega che stava migrando uno script Pandas a PySpark, è successo esattamente questo. Ha scritto una catena (read, filter, select, group, aggregate), ha premuto Shift+Enter, e la cella è tornata in circa 12 millisecondi. Mi ha guardato. “Ha funzionato?” Poi ha digitato df.show() e ha aspettato 90 secondi mentre un job che pensava fosse già girato si decideva finalmente a girare davvero.
Ecco la lazy evaluation in un paragrafo. Benvenuto nel modulo 4.
La cosa che confonde tutti quelli che vengono da Pandas
In Pandas, ogni riga di codice fa lavoro. df = df[df.country == "IT"] legge un pezzo di memoria, lo scansiona e produce un nuovo DataFrame in RAM, subito. La riga successiva opera sul risultato. Pandas è eager: ogni chiamata calcola immediatamente.
PySpark è lazy. Le transformation non calcolano niente. Costruiscono un piano.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder
.appName("LazyDemo")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1, "IT", 59.0, "2026-03-05"),
(2, "IT", 29.0, "2026-03-18"),
(3, "NL", 149.0, "2026-02-15"),
(4, "IT", 89.5, "2026-03-22"),
(5, "DE", 14.0, "2026-03-10"),
(6, "IT", 42.42, "2026-03-26"),
],
"OrderId INT, Country STRING, Total DOUBLE, OrderDate STRING",
)
# Build a chain of transformations
result = (orders
.where(col("Country") == "IT")
.withColumn("Total", col("Total") * 1.22) # add VAT
.groupBy("Country")
.agg(F.sum("Total").alias("TotalWithVat"),
F.count("*").alias("OrderCount")))
print(result)
# DataFrame[Country: string, TotalWithVat: double, OrderCount: bigint]
Quel print(result) è tornato in microsecondi. Non è stato filtrato niente. Non è stato moltiplicato niente. Non è stato raggruppato niente. Spark non ha nemmeno aperto i dati, e qui orders è in memoria, quindi non c’è nemmeno niente da aprire. Tutto ciò che result è, a questo punto, è una descrizione del lavoro da fare.
Se non l’hai mai visto, sembra rotto. Non lo è. È tutto il punto.
Perché lazy: l’optimizer ha bisogno dell’intero piano
La ragione è semplice e vale la pena prenderla sul serio: un optimizer che vede un’operazione alla volta può prendere solo decisioni locali. Un optimizer che vede l’intera pipeline prima che ne giri un pezzo può riorganizzare le cose.
Esempi concreti di cosa fa Spark una volta che riesce a vedere il piano completo:
- Filter pushdown. Hai scritto
read.parquet(...).select("a","b","c").where(col("country")=="IT"). Spark si accorge del filtro e lo spinge giù dentro la lettura Parquet, in modo che solo i row group con country IT vengano caricati da disco. Se la tua catena fosse stata eager, la lettura sarebbe già avvenuta prima che il filtro comparisse. - Column pruning. Leggi 80 colonne da Parquet ma ne usi solo 4 a valle. Spark carica solo quelle 4. Stesso trucco: l’optimizer ha bisogno di vedere l’uso a valle prima di decidere cosa leggere.
- Predicate combination. Hai scritto
.where(col("a") > 10).where(col("b") < 100)come due chiamate. Spark le fonde in un unico passaggio di filtro. - Scelta della join strategy. Spark può scegliere tra broadcast join, shuffle hash join e sort-merge join. La scelta giusta dipende dalle dimensioni delle tabelle e dalle chiavi di join, che può conoscere solo una volta che il piano completo è stato assemblato.
- Operation fusion. Più chiamate
withColumneselectvengono compilate in un unico passaggio sui dati invece di N passaggi.
Tutto questo lo ottieni gratis, ma solo perché niente esegue finché un’action non lo forza. Una valutazione eager dovrebbe impegnarsi su una strategia a ogni riga; la lazy evaluation può aspettare il quadro completo e scegliere la migliore.
È la stessa idea di un query planner SQL. Tu scrivi una SELECT con una WHERE, e il database decide se usare un indice, fare uno scan dell’heap o fare qualcosa di furbo con le statistiche. Le transformation di PySpark sono l’equivalente sui dataframe: tu descrivi il cosa, il motore sceglie il come.
Le transformation descrivono; le action eseguono
Il modello mentale da interiorizzare:
- Una transformation restituisce un nuovo DataFrame e aggiunge un nodo al piano logico. Nessuna computazione. Esempi:
select,where,withColumn,join,groupBy.agg,orderBy,distinct,union. - Una action restituisce un risultato (un valore Python, una scrittura su disco, un side effect) e fa partire l’esecuzione di tutto ciò nel piano da cui dipende. Esempi:
show,count,collect,take,first,toPandas,write.parquet,write.csv,foreach.
La lezione 20 contiene il catalogo completo. Per ora il test è: “questo metodo restituisce un DataFrame?” Se sì, quasi certamente è una transformation. Se restituisce qualsiasi altra cosa (un numero, una lista di oggetti Row, None dopo aver scritto su disco), è un’action.
# Transformations: instant. The chain is just being described.
print("Building plan...")
plan = (orders
.where(col("Country") == "IT")
.withColumn("Total", col("Total") * 1.22)
.groupBy("Country")
.agg(F.sum("Total").alias("TotalWithVat")))
print("Plan built. Type:", type(plan).__name__)
# DataFrame
# Action: this is when work happens
print("Calling show()...")
plan.show()
# Now the executors actually compute. The first .show() on a non-trivial
# pipeline is the moment your laptop's fan spins up.
Se metti un time.time() intorno alla catena di transformation, vedrai numeri in microsecondi. Intorno allo show(), vedrai qualunque cosa costi la computazione vera: millisecondi per piccoli dati in memoria, minuti o ore per file grossi.
La sorpresa “ho chiamato .count() e ho aspettato tre minuti”
Questa è l’esperienza canonica del nuovo arrivato. Hai costruito una catena di 12 step che legge una tabella Parquet da 200 GB, la fa join con due tabelle di riferimento, filtra, aggrega, e vuoi sbirciare il risultato. Digiti result.count(). Aspetti. E aspetti. E il bollitore fischia, e la riunione comincia, e tu stai ancora aspettando.
Quello che è successo è esattamente ciò che doveva succedere: count() è la prima action della catena. Fino a quel momento, nessuno dei 12 step era stato eseguito. Erano tutti lì, nel piano logico. count() è ciò che ha fatto andare davvero il motore ad aprire la tabella da 200 GB, a fare i join e a contare le righe.
Non è un bug. Il bug è la tua aspettativa che count() sia economica. In Pandas lo è: i dati sono già in RAM, contare le righe è una lookup di lunghezza. In Spark, count() esegue di nuovo l’intera pipeline che ha prodotto il DataFrame.
L’anti-pattern del notebook
Adesso, la versione peggiore di questo. Stai prototipando. Costruisci la catena step by step, sbirciando a ogni stadio:
# Build the chain piece by piece, with .show() at each step (DON'T)
df1 = orders.where(col("Country") == "IT")
df1.show() # action #1: runs the read + filter
df2 = df1.withColumn("Total", col("Total") * 1.22)
df2.show() # action #2: re-runs read + filter + withColumn
df3 = df2.groupBy("Country").agg(F.sum("Total").alias("TotalWithVat"))
df3.show() # action #3: re-runs everything from scratch
print(df3.count()) # action #4: re-runs everything AGAIN
print(df3.count()) # action #5: re-runs everything ONE MORE TIME
Cinque action. Cinque volte è girata l’intera pipeline. Spark non mette in cache i risultati tra le action di default: ogni action parte dalla sorgente originale e ricammina l’intero piano.
Su 6 righe in memoria questo non importa. Su una pipeline reale che legge da S3 o HDFS, hai appena addebitato al tuo team 5 volte il compute e 5 volte l’IO di rete per guardare gli stessi numeri da cinque angolazioni diverse.
La soluzione è .cache() (o .persist()), che vedremo nella lezione 23. La versione breve: .cache() è un marcatore che dice “dopo la prossima action, tieni il risultato di questo DataFrame in memoria così le action successive possono riusarlo.” Applicalo a intermedi costosi che hai intenzione di sbirciare più volte durante lo sviluppo.
La versione ancora più breve: mentre stai imparando, non spargere chiamate .show() e .count() tra ogni riga. Costruisci l’intera catena. Esegui un’action alla fine. Se vuoi visibilità intermedia, aggiungi un .cache() prima delle chiamate di visibilità.
Vedilo da te
Tre esperimenti concreti che vale la pena fare una volta.
Esperimento 1: una catena senza action stampa istantaneamente.
import time
t0 = time.time()
big_chain = (orders
.where(col("Country") == "IT")
.withColumn("Total", col("Total") * 1.22)
.withColumn("VatRate", F.lit(0.22))
.withColumn("PreVat", col("Total") / (1 + col("VatRate")))
.groupBy("Country")
.agg(F.sum("Total").alias("TotalWithVat"),
F.sum("PreVat").alias("PreVatTotal"),
F.count("*").alias("OrderCount"))
.orderBy(F.desc("TotalWithVat")))
print(f"Building chain took {(time.time() - t0)*1000:.2f} ms")
# Building chain took 4.13 ms
Otto transformation. Sotto i 10ms. Non è stato calcolato niente.
Esperimento 2: l’action fa partire l’intera catena.
t0 = time.time()
big_chain.show()
print(f"show() took {(time.time() - t0)*1000:.2f} ms")
# show() took 850 ms (or whatever — depends on your machine)
Quegli 850 ms sono il read, il filter, le quattro operazioni withColumn, il group, l’aggregate, l’order e lo shuffle di rete per il groupBy. Tutto impacchettato in un solo job perché nessuna action è venuta prima.
Esperimento 3: .explain() ti mostra il piano senza eseguirlo.
big_chain.explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [TotalWithVat#... DESC NULLS LAST], true, 0
# +- Exchange rangepartitioning(TotalWithVat#... DESC NULLS LAST, 200), ...
# +- HashAggregate(keys=[Country#...], functions=[sum(Total#...), ...])
# +- Exchange hashpartitioning(Country#..., 200), ENSURE_REQUIREMENTS, ...
# +- HashAggregate(keys=[Country#...], functions=[partial_sum(...), ...])
# +- Project [Country#..., (Total#... * 1.22) AS Total#..., ...]
# +- Filter (isnotnull(Country#...) AND (Country#... = IT))
# +- Scan ExistingRDD[OrderId#..., Country#..., Total#..., ...]
Leggilo dal basso verso l’alto. Scansiona la sorgente, filtra alle righe IT, proietta le nuove colonne, aggregato parziale per partition, shuffle (Exchange), aggregato finale, shuffle di nuovo per ordinare, sort. Quello è l’intero physical plan, e non è mai stato eseguito. explain() è il modo più veloce per sviluppare intuizione su cosa Spark sta per fare, e lo rivedrai nella prossima lezione quando copriremo il DAG.
Cosa cambia in come scrivi codice
Tre abitudini che vengono con l’interiorizzare la lazy evaluation:
- Costruisci catene lunghe; resisti all’impulso di sbirciare. Ogni sbirciata è una rilettura completa. Conserva l’ispezione per la fine, oppure cacha una volta e ispeziona a basso costo dopo.
- Fidati che i filtri vengano spinti giù. Non devi micro-ottimizzare l’ordine delle tue clausole
where. L’optimizer le sposta il più vicino possibile al read. .explain()presto quando qualcosa è lento. Se un job sta impiegando più del previsto, il physical plan ti dice il perché molto più velocemente che tirare a indovinare. Cerca nodiExchangeinattesi (shuffle), filter pushdown mancanti, o full table scan dove ti aspettavi partition pruning.
Questo significa anche che hai una nuova piccola trappola di debug: il codice può sembrare funzionante quando non lo è, perché gli errori non emergono finché non arriva l’action. Un typo nel nome di una colonna dentro un withColumn non dara’ errore sulla riga in cui l’hai scritto: dara’ errore quando finalmente chiami .show(), magari centinaia di righe dopo. Leggi attentamente lo stack trace quando succede; l’AnalysisException nominera’ la colonna mancante e la riga vera è quella che l’ha referenziata.
Prossima lezione: il catalogo completo di transformation e action, inclusa la categoria intermedia scomoda (cache, persist) che sembrano action ma tecnicamente sono transformation. Dopo, scaveremo nel DAG che Spark costruisce dal tuo piano e infine, la lezione su cui pivota il resto del corso, la differenza tra narrow transformation (economiche) e wide (costose). Quella distinzione è il singolo concetto più importante di Spark ed è a tre lezioni di distanza.