Il tuo job Spark impiega 3 ore. Raddoppi la dimensione del cluster. Ora impiega 2 ore e 50 minuti. Cosa è andato storto? Quasi sicuramente, il partitioning.
Cosa sono davvero le partizioni
Un DataFrame Spark non è un’unica grande tabella. È suddiviso in partizioni — pezzi di dati distribuiti tra gli executor. Ogni partizione viene elaborata in modo indipendente e in parallelo. Se hai 200 partizioni e 20 executor, ogni executor gestisce circa 10 partizioni, una alla volta.
L’intera promessa di Spark — “elabora i dati in parallelo” — si riduce a quanto bene i tuoi dati sono distribuiti tra le partizioni. Se una partizione ha 10 milioni di righe e le altre ne hanno 1.000 ciascuna, complimenti: un executor fa tutto il lavoro mentre 19 stanno con le mani in mano. Questo si chiama data skew, ed è il problema di performance più comune in Spark.
Il default è quasi sempre sbagliato
Il conteggio delle partizioni predefinito di Spark viene da una di queste fonti:
spark.sql.shuffle.partitions— di default 200 per qualsiasi operazione che fa shuffle (join, group by, ecc.)- Il numero di file o blocchi nei dati sorgente (per le letture)
200 partizioni era un default ragionevole nel 2014, quando i cluster avevano 50 core. Oggi, con migliaia di core e terabyte di dati, 200 è quasi sempre sbagliato — di solito troppo poche, il che genera partizioni troppo grandi e causa pressione sulla memoria, o occasionalmente troppe, il che genera partizioni minuscole con alto overhead di scheduling.
Come controllare le partizioni
df.rdd.getNumPartitions()
Più utile — controllare la dimensione delle partizioni:
from pyspark.sql.functions import spark_partition_id, count
df.groupBy(spark_partition_id().alias("partition_id")) \
.agg(count("*").alias("row_count")) \
.orderBy("row_count", ascending=False) \
.show(10)
Se la partizione più grande è 100× più grande della più piccola, hai un problema di skew. Se ogni partizione ha 50 righe, hai troppe partizioni.
repartition() vs coalesce()
Questi sono i tuoi due strumenti per sistemare il numero di partizioni:
repartition(n) — shuffle completo. Ogni riga viene riassegnata a una delle n nuove partizioni. Costoso (muove dati attraverso la rete) ma produce partizioni di dimensioni uguali.
df = df.repartition(500) # 500 partizioni approssimativamente uguali
coalesce(n) — unisce le partizioni senza un shuffle completo. Può solo ridurre il conteggio, mai aumentarlo. Economico ma può creare partizioni sbilanciate.
df = df.coalesce(50) # accorpa da 200 a 50
La regola empirica
- Stai andando su con il numero di partizioni? →
repartition() - Stai andando giù (es. prima di scrivere i file di output)? →
coalesce() - Dati sbilanciati dopo un join? →
repartition()sulla colonna sbilanciata, o usa il salting
Linee guida sulle dimensioni delle partizioni
Non esiste una risposta universalmente corretta, ma queste regole funzionano nella maggior parte dei job reali:
- Punta a 128–256 MB per partizione. Questo bilancia parallelismo e overhead.
- Punta a 2–4 partizioni per core. Con 100 core, mira a 200–400 partizioni.
- Dopo una trasformazione wide (join, groupBy), controlla le partizioni. Il default di 200 potrebbe non essere più corretto.
- Prima di scrivere su disco, fai coalesce. Scrivere 200 partizioni in Parquet ti dà 200 file minuscoli. Chi leggerà quei dati dopo di te non sarà contento. Fai coalesce fino a un numero che produce file da 128–512 MB.
Partizionamento per colonna (per le scritture)
df.write.partitionBy("year", "month").parquet("s3://bucket/data/")
Questo crea una struttura di directory tipo year=2025/month=03/part-00001.parquet. Le query downstream che filtrano per anno e mese possono saltare le cartelle irrilevanti — il partition pruning. Il miglioramento è spesso di 10–100× su dataset grandi.
Scegli colonne di partizionamento che:
- Vengono usate frequentemente nelle clausole WHERE
- Hanno cardinalità bassa o media (anno, paese, stato — non user_id)
- Non creano milioni di directory minuscole
La via di fuga per lo skew: il salting
Quando una chiave di join domina (ad esempio, l’80% degli ordini viene da un singolo cliente), i dati di quella chiave finiscono tutti in una partizione e tutto si blocca. La soluzione: aggiungere una colonna random di “salt”, joinare su salt + chiave, e unire i risultati.
from pyspark.sql.functions import lit, rand, floor, col, concat
salt_buckets = 10
# Aggiungi il salt alla tabella sbilanciata (grande)
large = large.withColumn("salt", floor(rand() * salt_buckets).cast("int"))
# Esplodi la tabella piccola per matchare tutti i valori di salt
from pyspark.sql.functions import explode, array
small = small.withColumn("salt", explode(array([lit(i) for i in range(salt_buckets)])))
# Joina su chiave originale + salt
result = large.join(small, ["join_key", "salt"]).drop("salt")
Brutto? Sì. Efficace? Enormemente. Se ti ritrovi spesso a combattere con lo skew, questo pattern vale la pena di memorizzarlo.
La versione in una frase
La maggior parte dei problemi di performance in Spark sono problemi di partitioning. Prima di aggiungere hardware, controlla il numero e le dimensioni delle partizioni — la soluzione di solito è una singola riga di codice.