Le due lezioni precedenti hanno preparato il terreno: una partizione è un blocco di righe assegnato a un task, e a Spark importa parecchio quante ne hai e quanto sono distribuite in modo uniforme. Questa lezione parla dei due operatori che ti permettono di cambiare quel numero di proposito: repartition e coalesce.
Sembrano intercambiabili. Non lo sono. Uno è un martello da fabbro che innesca un full shuffle e ti dà esattamente quello che hai chiesto. L’altro è un cacciavite che combina partizioni sul posto, economico, ma con un lato tagliente che può silenziosamente serializzare l’intera pipeline a monte su un solo task. La gente ricorre a coalesce(1) per “scrivere un solo file” e poi si chiede perché il job che prima finiva in due minuti adesso ne impiega quaranta.
Questa lezione spiega entrambi, quando usarli, e la trappola che becca tutti la prima volta.
repartition(N): il full shuffle
df.repartition(N) ricostruisce il DataFrame in esattamente N partizioni, distribuite in modo uniforme. Lo fa con un full shuffle: ogni riga passa per la rete (o per il disco locale in modalità single-machine), viene hashata in un bucket di destinazione, e finisce in una nuova partizione.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("RepartitionVsCoalesce")
.master("local[*]")
.getOrCreate())
events = spark.range(0, 1_000_000).select(
F.col("id").alias("event_id"),
(F.col("id") % 1000).alias("user_id"),
F.lit("data").alias("payload"),
)
print(events.rdd.getNumPartitions()) # likely 8 on a local[*] machine
big = events.repartition(50)
print(big.rdd.getNumPartitions()) # 50
Due cose da notare. Primo, repartition funziona in entrambe le direzioni: puoi passare da 8 a 50, oppure da 200 a 10. Secondo, le partizioni risultanti sono uniformi: all’incirca lo stesso numero di righe in ognuna, indipendentemente da quanto fosse sbilanciato l’input. Quell’uniformità è esattamente il motivo per cui repartition è costoso: per ottenerla, ogni riga deve essere reinstradata attraverso uno shuffle.
Guarda il piano:
big.explain()
# == Physical Plan ==
# Exchange RoundRobinPartitioning(50), REPARTITION_BY_NUM, [plan_id=...]
# +- Range (0, 1000000, ...)
Exchange è la parola di Spark per “shuffle qui”. RoundRobinPartitioning(50) significa che le righe sono distribuite su 50 bucket in modalità round-robin: nessuna chiave, semplicemente le si spande in modo uniforme. È il default per repartition(N).
repartition(N, *keys): hash-partitioning per colonna
C’è un overload che prende delle colonne:
hashed = events.repartition(50, "user_id")
hashed.explain()
# == Physical Plan ==
# Exchange hashpartitioning(user_id#3L, 50), REPARTITION_BY_NUM, ...
# +- Range ...
Adesso Spark fa l’hash di user_id per ogni riga, modulo 50, e usa quel valore come numero di partizione. Stesso lavoro totale di repartition(50), sempre un full shuffle, ma le righe sono ora raggruppate: ogni evento per user_id = 42 finisce nella stessa partizione.
Questo è utile subito prima di un join o di una window function su quella chiave. Spark farebbe comunque uno shuffle per colocare le chiavi corrispondenti; se lo fai esplicitamente prima, controlli il numero di partizioni e puoi mettere il risultato in cache e riusarlo tra più query. La lezione 36 mostrerà come la stessa idea, persistita su disco, diventa il bucketing.
Una trappola: hash-partitioning su una chiave skewed riproduce esattamente lo skew. Se user_id = 1 ha un milione di righe e l’utente mediano ne ha cento, repartition(50, "user_id") farà l’hash di tutte le righe dell’utente 1 in una singola partizione. La diagnosi dello skew della lezione 28 si applica qui: repartition(N, key) non è una soluzione allo skew, è un controllo del numero di partizioni e della distribuzione delle chiavi.
repartitionByRange(N, *keys): output ordinato
Usato meno spesso, utile quando vuoi un output approssimativamente ordinato:
ranged = events.repartitionByRange(50, "event_id")
Spark campiona i dati, sceglie 50 confini di range su event_id, e instrada le righe verso il bucket di range in cui ricadono. Il risultato sono partizioni ordinabili individualmente e globalmente in ordine approssimativo. È quello che usa df.sort() sotto il cofano quando ordini su tutto il DataFrame. Usalo quando vuoi scrivere file Parquet in cui ogni file copre un range contiguo, comodo per il partition pruning quando lo storage layer non supporta partitionBy su colonne continue.
coalesce(N): la riduzione economica
df.coalesce(N) è l’altro strumento. Non fa shuffle. Invece, fonde le partizioni esistenti sul posto:
shrunk = events.coalesce(2)
print(shrunk.rdd.getNumPartitions()) # 2
shrunk.explain()
# == Physical Plan ==
# Coalesce 2
# +- Range (0, 1000000, ...)
Nessun Exchange nel piano. Spark decide semplicemente che, invece di far girare 8 task ciascuno sulla propria partizione, ne farà girare 2 in cui ogni task legge da 4 delle partizioni originali in sequenza. I dati non si sono mossi; sono i confini ad essersi mossi.
Questo è molto più economico di repartition. È anche più debole:
coalescepuò solo ridurre il numero di partizioni. Chiederecoalesce(50)su un DataFrame con 8 partizioni ti restituisce 8 partizioni, senza errore. Per aumentare, ti serverepartition.- Le partizioni risultanti possono essere disuniformi. Se le tue 8 partizioni di input erano sbilanciate,
coalesce(2)produce 2 partizioni le cui dimensioni sono la somma delle 4 originali che si sono fuse insieme. Non avviene nessun ribilanciamento. - E quella grossa: la trappola a monte.
La trappola di coalesce(1) a monte
Questa è la trappola. df.coalesce(1) non mette solo il risultato finale in una partizione: spinge quel vincolo su, lungo il DAG, rendendo spesso l’intera pipeline a monte single-threaded.
Considera:
result = (spark.read.parquet("/data/big-input") # 200 partitions
.filter(F.col("country") == "IT")
.withColumn("year", F.year("dt"))
.groupBy("year").count()
.coalesce(1) # because we want 1 file
.write.parquet("/data/output"))
L’intuizione dice: leggi 200 partizioni in parallelo, filtra, raggruppa, poi fondi il risultato in un solo file alla fine. Quello che succede davvero: Spark vede coalesce(1) e propaga il parallelismo all’indietro più che può senza attraversare un confine di shuffle. Il filter, la withColumn e la read girano tutti con parallelismo 1. Duecento partizioni di input vengono lette in modo seriale da un singolo task. Il job che prima impiegava due minuti adesso ne impiega quaranta.
Il motivo: coalesce è una narrow transformation (lezione 21). Spark non inserisce uno shuffle per soddisfarla; invece assorbe il nuovo numero di partizioni nello stage precedente. Se fai coalesce(1), lo stage precedente adesso gira con un solo task. Se c’è uno shuffle più a monte, come la groupBy nell’esempio, la propagazione all’indietro si ferma lì, perché lo shuffle è un confine di stage. Quindi in questo caso la groupBy e quanto sta prima potrebbero ancora parallelizzare, a seconda di come Spark pianifica gli stage, ma il lavoro dopo la groupBy è single-threaded.
La soluzione è usare repartition(1) invece di coalesce(1) ogni volta che il costo di uno shuffle finale è inferiore al costo di far girare la parte a monte in modo seriale:
result = (spark.read.parquet("/data/big-input")
.filter(F.col("country") == "IT")
.withColumn("year", F.year("dt"))
.groupBy("year").count()
.repartition(1) # full shuffle, but upstream stays parallel
.write.parquet("/data/output"))
Adesso la read, il filter e la groupBy usano tutto il parallelismo che Spark vuole, e lo shuffle finale verso una singola partizione è un costo una tantum su un risultato aggregato minuscolo. Wall clock totale: di nuovo due minuti.
La regola empirica: coalesce(N) è sicuro quando N è vicino al numero di partizioni esistente e N è abbastanza grande perché lo stage a monte sia ancora contento con quel livello di parallelismo. coalesce(1) non è quasi mai quello che vuoi, a meno che la parte a monte non sia già economica.
Esempio pratico: il problema dei file piccoli
Un motivo comune per cambiare il numero di partizioni è il problema dei file piccoli al momento della scrittura. Immagina un job il cui stage finale ha 5.000 partizioni a causa di uno shuffle a monte. Se lo scrivi in Parquet, ottieni 5.000 file minuscoli: terribile per i lettori a valle, lento da listare, e un sacco di costi di S3 PUT.
Tu ne vuoi forse 50 di dimensione ragionevole. Due opzioni:
# Option A: coalesce — cheap, but watch upstream
final.coalesce(50).write.parquet("/data/out")
# Option B: repartition — more expensive, but uniform and parallelism-safe
final.repartition(50).write.parquet("/data/out")
L’opzione A salta lo shuffle. I task fondono 100 partizioni di input ciascuno (5.000 / 50). Lo stage precedente gira con 50 task invece di 5.000, di solito ancora bene, a volte un problema se la parte a monte era già CPU-bound e 50 non sono abbastanza core di parallelismo.
L’opzione B fa un full shuffle. Più costosa, ma lo stage a monte gira con il suo parallelismo originale di 5.000 partizioni, e le partizioni di output sono uniformi.
Per “ridurre da N a N/100” la risposta di solito è coalesce. Per “ridurre da N a un singolo file o due” la risposta è quasi sempre repartition. Per “ridurre da N a N/10 ma sono preoccupato che lo stage a monte sia lento”, misura entrambe. L’AQE di Spark 3.x può anche fare il coalesce delle partizioni di shuffle in automatico, il che spesso elimina la necessità di farlo a mano. La lezione 59 copre AQE.
Riferimento rapido
| Vuoi questo | Usa | Innesca lo shuffle? |
|---|---|---|
| Esattamente N partizioni uniformi | repartition(N) | Sì |
| N partizioni hashate per chiave | repartition(N, "key") | Sì |
| N partizioni approssimativamente ordinate | repartitionByRange(N, "key") | Sì |
| Ridurre il numero di partizioni a basso costo | coalesce(N) | No |
| Un solo file di output, parte a monte grossa | repartition(1) | Sì |
| Un solo file di output, parte a monte minuscola | coalesce(1) | No |
Quella riga di repartition(1) è quella che la maggior parte della gente sbaglia al primo contatto.
Falla girare sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("RepartitionDemo")
.master("local[*]")
.getOrCreate())
events = spark.range(0, 1_000_000).select(
F.col("id").alias("event_id"),
(F.col("id") % 1000).alias("user_id"),
)
print("default:", events.rdd.getNumPartitions())
# repartition up
up = events.repartition(50)
print("after repartition(50):", up.rdd.getNumPartitions())
up.explain()
# repartition by key
hashed = events.repartition(20, "user_id")
print("after repartition(20, user_id):", hashed.rdd.getNumPartitions())
hashed.explain()
# coalesce down — no shuffle
down = events.coalesce(2)
print("after coalesce(2):", down.rdd.getNumPartitions())
down.explain()
# coalesce up doesn't actually go up
no_op = events.coalesce(50)
print("after coalesce(50):", no_op.rdd.getNumPartitions()) # still 8
# Simulate the small-files write
(events.repartition(10)
.write.mode("overwrite").parquet("/tmp/repartition-out"))
Fai girare ogni .explain() e cerca la riga Exchange. repartition ce l’ha sempre. coalesce non ce l’ha mai. È la differenza, in una parola.
Prossima lezione: scritture partizionate, partitionBy su disco, il layout di directory che produce, e come Spark lo usa per saltare file in fase di lettura. Più la trappola della cardinalità che rende partitionBy("user_id") peggio di nessun partizionamento.
Riferimenti: documentazione di Apache Spark SQL (https://spark.apache.org/docs/latest/sql-data-sources.html) e post di engineering di Databricks sul tuning delle partizioni. Recuperato il 2026-05-01.