PySpark, dalle fondamenta Lezione 10 / 60

Show, count, collect: le action che ogni principiante esegue per prime

Le tre action con cui inizia ogni notebook PySpark, la differenza tra loro, e perché confonderle a scala è pericoloso.

Nella lezione precedente abbiamo portato i dati dentro Spark. Oggi li guardiamo. I tre comandi che digiterai nei primi dieci secondi di ogni notebook sono .show(), .count() e .collect(). Sembrano innocui. Due lo sono. Il terzo è responsabile di più crash per out-of-memory di qualunque altra chiamata PySpark che abbia mai visto.

Questa è anche la lezione in cui ci infiliamo dentro il modello mentale più importante di Spark: le transformation sono lazy, le action sono eager. Puoi concatenare cento .filter() e .select() e Spark non fa niente. Nel momento in cui chiami una action, ovvero .show(), .count(), .collect(), .write, Spark si sveglia, pianifica l’intera pipeline e la esegue. Sbaglia questa distinzione e passerai un pomeriggio a chiederti perché il tuo codice “veloce” è lento.

Setup

Stesso starter SparkSession. Riusiamo i dati orders/customers che abbiamo generato nella lezione 9:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("ShowCountCollect")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .getOrCreate())

spark.sparkContext.setLogLevel("WARN")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

Se non hai ./data/orders.csv in giro dalla scorsa lezione, torna alla lezione 9 e rilancia lo snippet di generazione dei dati. Useremo lo stesso file da sei righe per tutta la lezione.

.show(): quella che userai mille volte

.show() stampa righe in console come una tabella ASCII formattata. Tutto qui. Nessun valore di ritorno, nessun DataFrame restituito, nessun oggetto Pandas. È un side effect.

orders.show()
+-------+----------+-----+-------+----------+
|OrderId|CustomerId|Total|Country| OrderDate|
+-------+----------+-----+-------+----------+
|   1001|         1| 59.0|     NL|2026-03-05|
|   1002|         1| 29.0|     NL|2026-03-18|
|   1003|         2|149.0|     IT|2026-02-15|
|   1004|         2| 89.5|     IT|2026-03-22|
|   1005|         3|199.0|     DE|2026-03-10|
|   1006|         4|42.42|     RO|2026-03-28|
+-------+----------+-----+-------+----------+

La firma completa è show(n=20, truncate=True, vertical=False). Tre manopole, tutte utili.

n è quante righe stampare. Default 20. Passa qualunque intero positivo:

orders.show(3)         # Prime 3 righe
orders.show(1000)      # Prime 1000, ma Spark non recupera più righe di quante ne esistano

truncate controlla le colonne con stringhe lunghe. Default True, che taglia a 20 caratteri e mostra ... per il resto. Passa False per vedere tutto, oppure passa un intero per una larghezza personalizzata:

orders.show(truncate=False)   # Stringhe complete
orders.show(truncate=50)      # Taglia a 50 caratteri

Se hai mai chiamato .show() su un DataFrame pieno di blob JSON da 5KB e hai visto il tuo terminale piangere, ecco perché truncate=True è il default.

vertical ribalta il layout da colonne-come-colonne a colonne-come-righe, il che è meraviglioso per tabelle larghe:

orders.show(2, vertical=True)
-RECORD 0--------------
 OrderId    | 1001
 CustomerId | 1
 Total      | 59.0
 Country    | NL
 OrderDate  | 2026-03-05
-RECORD 1--------------
 OrderId    | 1002
 ...

Uso la modalità verticale ogni volta che un DataFrame ha più di 8-10 colonne. È la differenza tra leggere e strizzare gli occhi.

Cosa fa davvero .show() sotto il cofano: Spark pianifica la tua pipeline, ne esegue solo quanto basta per produrre n righe, poi raccoglie quelle n righe sul driver e le stampa. È una action, ma piccola. Chiedere 20 righe vuol dire che Spark potrebbe aver bisogno di leggere una sola partizione del file prima di averne abbastanza.

.count(): quella che esegue tutta la tua pipeline

.count() restituisce il numero di righe come int Python:

n = orders.count()
print(n)   # 6

Sembra innocente. Non lo è. Per contare le righe Spark deve eseguire l’intera pipeline fino a questo punto. Ogni read, ogni filter, ogni join. Non ci sono scorciatoie: non puoi sapere quante righe sopravvivono a un join senza fare il join.

Quindi questo:

big = (spark.read.parquet("./data/huge_orders.parquet")
       .filter("OrderDate >= '2026-01-01'")
       .join(customers, "CustomerId")
       .filter("Country = 'IT'"))

print(big.count())

…legge l’intero dataset Parquet, applica entrambi i filtri, esegue il join e conta ciò che resta. Su un dataset da 200GB, è un job di diversi minuti, anche se volevi solo un numero.

L’errore che fanno tutti (me compreso, ripetutamente) è spargere chiamate .count() in un notebook per “sanità”:

df = read_orders()
print("Dopo read:",   df.count())          # Job 1
df = df.filter(...)
print("Dopo filter:", df.count())          # Job 2: rilegge tutto
df = df.join(customers)
print("Dopo join:",   df.count())          # Job 3: ri-legge + ri-filtra
df.write.parquet("...")                     # Job 4: ri-legge + ri-filtra + ri-joina

Ognuna di quelle chiamate .count() esegue tutta la pipeline fino a quel punto. Da zero. Spark non mette in cache i risultati tra le action a meno che non glielo dica. Quindi un “rapido sanity check” può silenziosamente quadruplicare il runtime del tuo job.

C’è una soluzione, ovvero .cache() / .persist(), ma è un argomento da lezione 23. Per ora la disciplina è: non fare .count() in mezzo a una pipeline a meno che tu non voglia pagarlo.

.collect(): quella che non dovresti quasi mai chiamare

.collect() restituisce ogni riga del DataFrame, come lista Python di oggetti Row, sul driver:

rows = orders.collect()

print(type(rows))      # <class 'list'>
print(len(rows))       # 6
print(rows[0])         # Row(OrderId=1001, CustomerId=1, Total=59.0, ...)
print(rows[0].Total)   # 59.0
print(rows[0]["Total"])# 59.0: funzionano sia accesso ad attributo che a dict

Per un DataFrame da sei righe, va bene. È una lista di sei tuple. Forse un kilobyte di memoria.

Per un DataFrame da 100GB, è un disastro. .collect() tira ogni riga su una singola JVM, il driver, e cerca di farle entrare tutte nella memoria del driver. Il tuo driver ha, quanto, 4GB? 16GB? 64GB se sei fortunato? Andrà in OOM. Rumorosamente. Spesso dopo aver girato per dieci minuti, quindi aspetterai pure prima di prenderti il crash.

La regola che scrivo su ogni lavagna:

Mai .collect() su un DataFrame di cui non sai già che è piccolo.

Se vieni da Pandas, questa è la singola differenza comportamentale più grossa. pandas.DataFrame è sempre in memoria per definizione. I DataFrame PySpark di solito non sono in memoria e potrebbero non entrare nemmeno nella memoria di una singola macchina. .collect() è l’unica operazione che finge che Spark sia Pandas, e Spark ti punisce per questo.

Quando .collect() va davvero bene

  • Tabelle di lookup minuscole (paesi, codici valuta, mappe di configurazione).
  • Aggregazioni dove hai già ridotto il numero di righe, tipo df.groupBy("Country").count().collect() su 30 paesi.
  • Test, dove gli input sono deliberatamente piccoli.

Per tutto il resto, usa una delle alternative più sicure.

Le alternative sicure: .take(), .first(), .head()

Tre action che tirano alcune righe sul driver senza fingere che possa contenere tutto:

first_three = orders.take(3)        # lista di 3 oggetti Row
print(first_three)

one_row = orders.first()            # singolo Row, oppure None se vuoto
print(one_row)

head_three = orders.head(3)         # lista di 3 Row (alias di .take())
print(head_three)

.take(n) è .collect() al guinzaglio. Tira n righe e si ferma. Spark è abbastanza intelligente da leggere solo quante partizioni servono per ottenere n righe, quindi su un dataset da 100GB, .take(5) legge grosso modo una partizione e torna in pochi secondi.

.first() è .take(1)[0], tranne che restituisce None invece di crashare su un DataFrame vuoto. Usalo quando vuoi davvero una riga, ad esempio per dare un’occhiata a un record di esempio per lo schema.

.head(n) e .take(n) sono alias. Alcuni preferiscono .head() perché è quello che usa Pandas; altri preferiscono .take() perché è come si chiamava sempre negli RDD. Scegli uno.

Se ti ritrovi a cercare .collect(), chiediti “mi serve davvero ogni riga?”. La risposta onesta è quasi sempre no, e .take(100) ti darà più che abbastanza per fare debug.

I compagni: .printSchema(), .describe(), .summary()

Altri tre metodi “guarda i dati” che userai costantemente. Nessuno di questi è pericoloso.

.printSchema() stampa lo schema. Economico, istantaneo: Spark conosce già lo schema, nessun dato viene letto:

orders.printSchema()
root
 |-- OrderId:    integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Total:      double  (nullable = true)
 |-- Country:    string  (nullable = true)
 |-- OrderDate:  date    (nullable = true)

Stampo lo schema dopo ogni read. Becca subito gli errori di inferenza dei tipi: nel momento in cui vedi OrderDate: string sai che inferSchema non ha funzionato e ti serve uno schema esplicito.

.describe() calcola count / mean / stddev / min / max per ogni colonna numerica:

orders.describe().show()
+-------+------------------+------------------+------------------+-------+
|summary|           OrderId|        CustomerId|             Total|Country|
+-------+------------------+------------------+------------------+-------+
|  count|                 6|                 6|                 6|      6|
|   mean|            1003.5|               2.5| 94.65333333333334|   null|
| stddev|1.8708286933869707|1.2247448713915890| 65.42818134849872|   null|
|    min|              1001|                 1|              29.0|     DE|
|    max|              1006|                 4|             199.0|     RO|
+-------+------------------+------------------+------------------+-------+

Nota: restituisce un DataFrame, non una tabella stampata. Quindi ti serve comunque .show() per vederla. (Una di quelle piccole stranezze di PySpark che fanno inciampare tutti almeno una volta.)

.summary() è il fratello più potente. Di default aggiunge i quartili:

orders.summary().show()

Ottieni count, mean, stddev, min, 25%, 50%, 75%, max. Puoi anche passare percentili personalizzati:

orders.summary("count", "min", "25%", "50%", "75%", "99%", "max").show()

Per un primo passaggio esplorativo su un nuovo dataset, .summary() è la one-liner più utile in PySpark. È l’equivalente di describe(include='all', percentiles=[...]) di Pandas.

Sia .describe() che .summary() sono operazioni full-pass: leggono tutto il DataFrame. Su un CSV minuscolo è gratis; su un lake da 200GB, è un job vero. Lanciali con parsimonia su dati di dimensione produttiva, oppure lanciali prima su un .sample(0.01).

Action contro transformation: l’anteprima eager/lazy

Lo faremo per bene nella lezione 19, ma ecco subito il titolo così il resto del corso ha senso.

Una transformation descrive un nuovo DataFrame in termini di uno esistente. select, filter, withColumn, join, groupBy, orderBy, union: sono tutte transformation. Restituiscono un nuovo DataFrame senza fare alcun lavoro. Spark si limita a registrare la ricetta.

Una action chiede a Spark un risultato concreto: una tabella stampata, un numero, una lista di Row, o una scrittura su disco. Le action triggerano l’esecuzione. Spark guarda la ricetta, pianifica un’esecuzione fisica ottimale, e la lancia.

In altre parole:

# Queste quattro righe non fanno assolutamente niente, per ora.
filtered  = orders.filter("Country = 'IT'")
projected = filtered.select("OrderId", "Total")
ordered   = projected.orderBy("Total")
big_only  = ordered.filter("Total > 100")

# Questa riga esegue tutto quanto sopra, più la show.
big_only.show()

Spark ottimizza l’intera catena prima di eseguire. Potrebbe spingere i filtri dentro la scansione del file (“non leggere nemmeno le righe non-IT”), riordinare le operazioni, sfoltire colonne. Il piano di query che ottieni è spesso radicalmente diverso da quello che hai scritto.

Questo è anche il motivo per cui un .show() a metà di un notebook può sembrare sospettosamente veloce: Spark ha calcolato solo righe a sufficienza per riempire il display, mentre .count() sullo stesso DataFrame impiega minuti. Hanno fatto quantità di lavoro diverse.

Uno script di tour

Mettendo insieme la lezione, ecco uno script che esercita ogni action che abbiamo coperto, più o meno nell’ordine in cui le useresti su un dataset fresco:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("ActionsTour")
         .master("local[*]")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

# 1. Che aspetto ha?
orders.printSchema()
orders.show(5)

# 2. Statistiche rapide
orders.summary("count", "min", "25%", "50%", "75%", "max").show()

# 3. Conteggio righe economico (dataset piccolo, va bene)
print("Total rows:", orders.count())

# 4. Sbircia dati filtrati senza collect-are tutto
italian = orders.filter(col("Country") == "IT")
italian.show()                             # sicuro
print("IT rows:", italian.count())         # sicuro su dati piccoli

sample_rows = italian.take(2)              # sempre sicuro
print(sample_rows)

# 5. Collect: solo perché sappiamo che è minuscolo
country_totals = (orders.groupBy("Country")
                        .sum("Total")
                        .collect())
for row in country_totals:
    print(f"{row['Country']}: {row['sum(Total)']:.2f}")

spark.stop()

Sei righe di input. Sei righe di output significativo. Nessun OOM. Quello è il workflow.

Nella prossima lezione andiamo nella direzione opposta: scrivere i dati indietro, save mode, write partizionate, e il problema del numero di file che prima o poi morde ogni team.

Cerca