L’ultima lezione ha stabilito la regola: le transformation descrivono il lavoro, le action lo fanno partire. Questa lezione è il catalogo. Vale la pena memorizzarlo, perché ogni volta che debugghi un job Spark lento ti farai la stessa domanda: quale riga in questo script di 80 righe è stata l’action che ha innescato tutto questo?
A volte è ovvio: il .write.parquet(...) in fondo. A volte non lo è: qualcuno ha messo un .count() sei righe sopra per “logging” e ora la tua pipeline gira due volte.
La definizione, più precisamente
Una transformation è un metodo che restituisce un nuovo DataFrame e non aggiunge niente al mondo a parte un nodo nel piano logico. È una pura chiamata di funzione contro una descrizione di DataFrame. Non si muovono dati. Non si accendono executor. Il driver ottiene un nuovo riferimento a un DataFrame e tu prosegui.
Un’action è un metodo che restituisce qualcosa di diverso da un DataFrame, oppure causa un side effect (tipicamente una scrittura su disco o una callback eseguita contro le righe). L’action è il momento in cui Spark dice “ok, è ora di fare davvero tutto quel lavoro che hai descritto.” Il driver compila il piano logico in un piano fisico, lo spezza in stage, manda task agli executor, raccoglie i risultati e li restituisce al tuo codice.
La regola empirica “questa è un’action?” che è giusta nel 95% dei casi:
Questo metodo restituisce qualcosa di diverso da un DataFrame? Se sì, è un’action.
L’altro 5% sono casi limite scomodi (cache, persist, checkpoint, a volte createOrReplaceTempView) che gestiremo separatamente.
Catalogo: transformation comuni
Ogni metodo qui restituisce un DataFrame e non innesca niente. Elencati grosso modo per frequenza d’uso:
select(*cols)- proietta colonne. Probabilmente la singola transformation più usata.selectExpr(*expr_strings)- lo stesso, con espressioni in stringa SQL invece che oggetticol().where(cond)/filter(cond)- mantieni le righe in cui la condizione è vera. Stesso operatore, due nomi.withColumn(name, expr)- aggiungi o sostituisci una colonna.withColumnRenamed(old, new)- rinomina una colonna. Visto la lezione scorsa.drop(*cols)- rimuovi colonne.join(other, on, how)- combina due DataFrame su colonne chiave.groupBy(*cols).agg(...)- raggruppa righe e aggrega. Nota:groupByda solo restituisce unGroupedData, non unDataFrame; è l’.agg(...)che ti riporta in territorio DataFrame. Le due chiamate insieme sono una singola transformation.orderBy(*cols)/sort(*cols)- ordina le righe. Stesso operatore, due nomi.distinct()- rimuove righe duplicate su tutte le colonne.dropDuplicates(*cols)- rimuove duplicati considerando solo le colonne elencate (o tutte se non viene data nessuna).union(other)/unionAll(other)- aggiunge righe da un altro DataFrame, per posizione.unionByName(other, allowMissingColumns=False)- aggiunge righe per nome di colonna. Più sicuro diunionquando gli schema potrebbero differire.intersect(other)- tieni solo le righe presenti in entrambi i DataFrame.exceptAll(other)/subtract(other)- rimuovi righe presenti in un altro DataFrame.repartition(n, *cols)- ridistribuisce i dati innpartition, opzionalmente con hash sulle colonne.coalesce(n)- riduce il numero di partition senza un full shuffle.sample(fraction, withReplacement=False, seed=None)- subset casuale di righe.limit(n)- tieni al massimo n righe. Lazy! Non restituisce righe; restituisce un DataFrame il cui piano finisce con unLIMIT n. Per tirare fuori le righe ti serve comunque un’action.na.fill(...),na.drop(...),na.replace(...)- gestione dei null.withWatermark(col, threshold)- per lo streaming.
Ognuna di queste restituisce un DataFrame. Nessuna esegue niente da sola.
Catalogo: action comuni
Ognuna di queste restituisce qualcosa di diverso da un DataFrame, o scrive su disco:
show(n=20, truncate=True)- stampa righe su stdout. RestituisceNone.count()- numero di righe. Restituisce unint.collect()- tira tutte le righe sul driver come una lista di oggettiRow. Restituiscelist[Row]. Pericoloso: porta l’intero DataFrame nella memoria del driver.take(n)- restituisce le prime n righe come lista. Restituiscelist[Row].first()- restituisce la prima riga. Restituisce unRow.head(n=1)- uguale atakequando n > 1, uguale afirstquando n == 1.tail(n)- ultime n righe. Restituiscelist[Row]. Disponibile da Spark 3+.toPandas()- converte in un DataFrame Pandas. Comecollect(), materializza tutto sul driver.toLocalIterator()- itera sulle righe nel driver, una partition alla volta. Più lento dicollect()ma con memoria limitata.foreach(func)/foreachPartition(func)- applica una funzione con side effect a ogni riga o partition. Si usa per scrivere su sink esterni non coperti dawrite.*.write.format(...).save(...)- e le scorciatoie:write.parquet(...),write.csv(...),write.json(...),write.orc(...),write.saveAsTable(...),write.insertInto(...). La famiglia DataFrameWriter. Ognuna di queste è un’action: nel momento in cui chiami.save(),.parquet(), ecc., Spark esegue la pipeline e scrive i risultati.describe()esummary()- calcolano statistiche di riepilogo. Entrambe restituiscono DataFrame, ma ogni chiamata internamente innesca un passaggio completo per calcolare le statistiche, quindi si comportano come action in termini di costo. (Tecnicamente transformation che contengono action nascoste nella loro costruzione. Un caso limite pedante.)
Lo schema: se ti da un numero, una lista, una stampa su stdout o un file su disco, è un’action.
Leggere una pipeline vera
Ecco una pipeline. Trova l’action.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder
.appName("OrdersETL")
.master("local[*]")
.getOrCreate())
orders = spark.read.option("header", True).csv("./data/orders.csv")
customers = spark.read.option("header", True).csv("./data/customers.csv")
joined = orders.join(customers, on="CustomerId", how="left")
filtered = joined.where(col("Country") == "IT")
withVat = filtered.withColumn(
"TotalWithVat",
col("Total").cast("double") * 1.22
)
agg = (withVat
.groupBy("CustomerId", "Country")
.agg(F.sum("TotalWithVat").alias("LifetimeValue"),
F.count("*").alias("Orders")))
ranked = agg.orderBy(F.desc("LifetimeValue"))
(ranked
.write
.mode("overwrite")
.parquet("./out/customer_ltv"))
Ogni riga dalle chiamate read fino a ranked = ... è una transformation. L’intera catena è solo costruzione di un piano. Le chiamate read non aprono nemmeno i file; registrano solo i metadati.
Il write.parquet(...) è l’action. È la riga in cui Spark va davvero su disco, legge entrambi i CSV, esegue il join, il filter, il withColumn, il groupBy, l’aggregazione e il sort, e scrive l’output Parquet. Un solo job, innescato da una sola action.
Se aprissi la Spark UI subito dopo aver eseguito questo script, vedresti esattamente un job elencato. La pagina del job mostrerebbe diversi stage (uno per shuffle boundary, ne parliamo nella prossima lezione) e la riga del tuo codice che lo ha sottomesso: il write.parquet.
Quando ci sono diverse action, conta i job
df = (spark.read.parquet("./big_table")
.where(col("Country") == "IT")
.withColumn("Year", F.year("OrderDate")))
print("Row count:", df.count()) # job 1
df.show(5) # job 2
df.write.parquet("./out/it_orders") # job 3
Tre action, tre job. La lettura Parquet avviene tre volte se non fai prima .cache() su df. Ogni job ricammina il piano da capo. Questo è il problema “ho chiamato .count() e ho aspettato tre minuti” della lezione scorsa, moltiplicato per tre.
Se stai scrivendo codice di produzione, conta le action nella tua pipeline. Ognuna è un’esecuzione completa. Due action sullo stesso DataFrame intermedio sono due pipeline complete se non cachi.
La categoria intermedia scomoda
Alcuni metodi non si incastrano bene. Vale la pena conoscerli perché compaiono nel codice vero e confondono la gente.
cache() e persist()
Queste sono tecnicamente transformation. Restituiscono un DataFrame e non eseguono niente da sole. Quello che fanno è impostare un flag sul DataFrame: “la prossima volta che un’action gira contro questo DataFrame, dopo aver calcolato, tieni il risultato in memoria (o memoria + disco, per persist) così le action successive possono riusarlo.”
expensive = (spark.read.parquet("./big_table")
.where(col("Country") == "IT")
.join(other_big_table, "CustomerId"))
expensive.cache() # transformation: marks for caching, runs nothing
expensive.count() # action: runs the pipeline, materializes the cache
expensive.show() # action: reads from cache, fast
expensive.write... # action: reads from cache, fast
La chiamata .cache() da sola non fa nessun lavoro. La prossima action esegue la pipeline e memorizza il risultato. Le action successive saltano la ricomputazione.
A volte vedrai codice che fa df.cache().count(): il count() è un’action deliberata di “scaldare la cache” che forza la materializzazione. Idioma comune in pipeline ottimizzate per le performance. Copriremo l’intero playbook del caching nella lezione 23.
checkpoint()
Taglia il lineage salvando fisicamente il DataFrame su disco e facendo partire un nuovo lineage da quello snapshot. Restituisce un DataFrame. Tecnicamente una transformation, ma in pratica innesca un job per scrivere il checkpoint. Trattalo come un’action ai fini del costo.
createOrReplaceTempView(name) e createGlobalTempView(name)
Queste restituiscono None, quindi per la regola empirica sembrano action. Non lo sono. Registrano il piano del DataFrame sotto un nome nel catalogo SQL, in modo che tu possa referenziarlo da spark.sql(...). Non viene eseguito niente. La view è solo un altro nodo nel grafo della query.
printSchema(), columns, dtypes, schema
Tutti metadati. Nessuno di loro tocca i dati. Gratis. (Non farti ingannare dal fatto che printSchema() è una chiamata di metodo con le parentesi: legge dal piano logico, non dai dati.)
explain()
Stampa il piano della query senza eseguirlo. Gratis. Usalo a volontà.
Come la Spark UI racconta la storia
Apri la Spark UI (default http://localhost:4040 per una sessione locale). Il tab Jobs elenca ogni action eseguita, con la riga di codice che l’ha innescata e gli stage in cui si è spezzata. La UI è il tuo strumento di debug primario da qui in avanti. Due abitudini che vale la pena costruire:
- Dopo aver eseguito una pipeline, dai un’occhiata al tab Jobs. Se il numero di job è superiore al numero di action che intendevi, hai un’action nascosta da qualche parte: cerca chiamate
.count()o.show()vaganti. - Clicca dentro lo stage lento. Se un task è 10 volte più lungo degli altri, hai skew. Se la dimensione di input è 100 GB ma ti aspettavi 10 MB, il tuo filtro non è stato spinto giù.
Alcune conseguenze pratiche
Il posizionamento delle action conta per la memoria. collect() e toPandas() tirano tutto sul driver. Su un DataFrame da 50 GB questo manda in OOM il tuo driver e fa crashare la sessione. Usa take(n) per sbirciare, show(n) per ispezionare, e collect() solo quando hai già filtrato a piccoli set di risultato.
Le action ripetute ripetono tutto il lavoro. Tre action sullo stesso DataFrame intermedio sono tre pipeline complete se non cachi.
limit(n).show() è economico. limit è una transformation; show esegue l’action con il limit nel piano. Spark non leggerà l’intero input: leggerà partition finché non ha n righe e si ferma. Buono per prototipare contro tabelle grandi.
count() dopo un filtro non è gratis. Cammina comunque l’intero file di input. Se hai bisogno solo di sapere “ci sono righe?”, df.limit(1).take(1) è più economico.
I due failure mode che ho visto più spesso: il “logger che costa 300 euro al giorno” (qualcuno aggiunge print(f"rows: {df.count()}") a metà di una pipeline da 2 TB per visibilità, raddoppiandone il costo), e il “i test passano, la prod fallisce” (un .collect() funziona benissimo su 6 righe in memoria nei test e va in OOM sul driver con 80 GB in produzione). Entrambi sono problemi di conoscenza del catalogo. Se sai che .count() è un’action e che .collect() materializza sul driver, non scrivi nessuno dei due.
Esegui questo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder.appName("Catalog").master("local[*]").getOrCreate())
orders = spark.createDataFrame(
[(1, "IT", 59.0), (2, "IT", 29.0), (3, "NL", 149.0),
(4, "IT", 89.5), (5, "DE", 14.0), (6, "IT", 42.42)],
"OrderId INT, Country STRING, Total DOUBLE",
)
# Pure transformation chain — instant
chain = (orders
.where(col("Country") == "IT")
.withColumn("WithVat", col("Total") * 1.22)
.groupBy("Country")
.agg(F.sum("WithVat").alias("Total"),
F.count("*").alias("N")))
# 1. Inspect the plan without running anything
chain.explain()
# 2. Action: show() runs the whole chain
chain.show()
# 3. Multiple actions => repeated work
print("count:", chain.count()) # full re-execution
print("count:", chain.count()) # full re-execution again
# 4. Cache, then multiple actions => one execution + cache reads
chain.cache()
chain.count() # materializes the cache
chain.count() # cache hit, fast
chain.show() # cache hit, fast
# 5. Open http://localhost:4040 and look at the Jobs tab.
# You should see one job per action and only one full pipeline execution
# after the cache warmup.
Apri la UI accanto a questo e guarda ogni action aggiungere un job alla lista. Poi sperimenta: commenta la riga .cache() e riesegui; conta i job di nuovo. Il numero di job è esattamente il numero di action, ogni volta.
Prossima lezione: il DAG. Cosa costruisce davvero Spark quando un’action innesca, perché un job ha più stage, dove sono i confini, e come il lineage graph sopravvive ai fallimenti degli executor.