PySpark, dalle fondamenta Lezione 28 / 60

Il problema dello skew: quando una chiave ha 100 volte le righe

Come lo skew dei dati rallenta i job anche quando il lavoro totale è poco, come individuarlo nella Spark UI, e che aspetto hanno i sintomi in produzione.

Uno stage di Spark è veloce solo quanto il suo task più lento. Quella frase è tutta la lezione. Rileggila, falla tua, e la maggior parte del resto sono conseguenze.

In uno stage Spark sano, ogni task processa all’incirca la stessa quantità di dati. 200 partition, ognuna che tiene magari 50 MB di righe, ognuna che finisce in 12 secondi. L’intero stage finisce in forse 14 secondi, perché i task girano in parallelo e il più lento è a malapena più lento della mediana.

In uno stage skewed, 199 task finiscono in 12 secondi e un task, lo sfortunato, gira per 30 minuti. L’intero stage ci mette 30 minuti. Aggiungere più core non aiuta. Aggiungere più memoria non aiuta. Il collo di bottiglia è un task, su un executor, che processa una partition oscenamente grossa. Quello è lo skew dei dati.

Questa lezione parla di riconoscere lo skew, trovarlo nella Spark UI, e capire perché è un problema in primo luogo. Il rimedio, il salting, ha la sua lezione dedicata, la prossima.

Come succede lo skew

Spark partiziona i dati dopo uno shuffle facendo l’hash della chiave di join o di group-by:

partition_for(row) = hash(row.key) % num_partitions

Se le tue chiavi sono distribuite uniformemente, le righe si spalmano più o meno equamente sulle partition. Se non lo sono, e nei dati veri quasi mai lo sono, alcune partition prendono molte più righe di altre.

Tre scenari comuni dal codice di produzione:

Power user. Una tabella di eventi con chiave user_id. L’utente in cima ha 100 milioni di eventi. L’utente mediano ne ha 12. Dopo uno shuffle di groupBy("user_id"), ogni evento dell’utente in cima atterra nella stessa partition. Quella partition fa 100 MB, le altre 1 KB.

Concentrazione geografica. Transazioni con chiave country. Il 60% del traffico viene dagli US, il 30% da una lunga coda, il 10% da “tutto il resto”. Dopo il group-by per paese, la partition US è 6 volte la dimensione di tutte le altre messe insieme.

Chiavi nulle o vuote. Una colonna dove la maggior parte delle righe ha un valore vero ma il 30% ha null o "". Tutti i null fanno hash sullo stesso bucket e si accumulano. Questo è il più comune e il più insidioso perché nessuno lo prevede: si fa spallucce e si va avanti, e il job è misteriosamente lento.

Il pattern è identico in tutti e tre: la chiave di join o di group-by è sbilanciata, lo shuffle preserva quello sbilanciamento, e un task finisce per fare molto più lavoro degli altri.

Perché “tutto lo stage aspetta”

Gli stage Spark hanno una barriera. Lo stage successivo non può iniziare finché ogni task dello stage corrente non è completato e non ha scritto il suo output di shuffle su disco. Se 199 dei tuoi 200 task finiscono in 12 secondi e uno gira per 30 minuti, lo stage successivo sta in ozio per 29 minuti e 48 secondi, in attesa di quel task.

Lo puoi vedere nella timeline degli stage della Spark UI come una barra lunga e sottile: quasi tutto finisce presto, e c’è un singolo task strascicato in fondo. Vedremo che aspetto ha qui sotto.

Vuol dire anche: il lavoro totale nel cluster non è il modo giusto di pensare a un job skewed. Il cluster nel suo insieme potrebbe essere utilizzato solo al 5% per 29 di quei 30 minuti. Un core è inchiodato, gli altri si annoiano. I dashboard di CPU mentono in proposito.

Un dataset chiaramente skewed

Costruiamo un po’ di skew per poterlo guardare. Costruisco un DataFrame di eventi da 1,1 milioni di righe in cui un utente (user_id = 1) conta per 1 milione di righe e le altre 100.000 righe sono distribuite su 999 utenti.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("TheSkewProblem")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "200")
         .getOrCreate())

# 1 million rows for user 1
big_user = spark.range(0, 1_000_000).select(
    F.lit(1).alias("user_id"),
    F.col("id").alias("event_id"),
)

# 100k rows distributed across user_ids 2..1000
other_users = spark.range(0, 100_000).select(
    ((F.col("id") % 999) + 2).alias("user_id"),
    F.col("id").alias("event_id"),
)

events = big_user.unionByName(other_users)
events.count()  # 1,100,000

Adesso lancia il diagnostico: group by user_id e count, ordinato per count discendente:

(events
 .groupBy("user_id")
 .count()
 .orderBy(F.desc("count"))
 .show(10))

# +-------+-------+
# |user_id|  count|
# +-------+-------+
# |      1|1000000|
# |      2|    101|
# |      3|    101|
# |      4|    101|
# |      5|    101|
# |      6|    101|
# |      7|    101|
# |      8|    101|
# |      9|    101|
# |     10|    101|
# +-------+-------+

L’utente 1 ha 1.000.000 di righe. Tutti gli altri utenti ne hanno circa 100. È un rapporto di 10.000 a uno. Qualunque cosa con chiave su user_id manderà tutte le righe dell’utente 1 a una sola partition.

Che aspetto ha nella Spark UI

Lancia qualsiasi operazione che fa shuffle su user_id:

result = events.groupBy("user_id").agg(F.count("*").alias("n"))
result.write.mode("overwrite").parquet("/tmp/skew-demo")

Apri la Spark UI (http://localhost:4040 quando giri in locale), entra nello stage che fa lo shuffle, e guarda la tabella Tasks in fondo. Le colonne da osservare:

  • Duration: tempo wall clock per task
  • Shuffle Read Size / Records: quanti dati ogni task ha tirato dal monte

In uno stage sano queste colonne saranno raggruppate strette attorno a una mediana. In uno stage skewed vedrai qualcosa tipo:

Min:    Median:   Max:
50 ms   80 ms     45 s    <- duration
2 KB    4 KB      40 MB   <- shuffle read size

Quando la durata massima è più di 5 volte la mediana, hai skew. Quando è 100 volte la mediana, hai skew severo. La Spark UI evidenzia anche questo con una riga “summary” che mostra min/25esimo/mediana/75esimo/max: ci passi sopra l’occhio e la risposta è lì.

L’altra spia è la vista stage timeline. Gli stage sani sembrano un mattone strettamente impilato di barre orizzontali tutte che finiscono più o meno nello stesso momento. Gli stage skewed hanno una barra lunga che sporge, a volte tirata fuori 10 volte oltre il resto. Una volta che hai visto una timeline skewed, la riconosci a colpo d’occhio.

In produzione: il sintomo della long-tail

Non avrai sempre la Spark UI a portata di mano: a volte stai facendo debug solo dai log. La firma dello skew lì è il sintomo della long-tail:

[INFO] Stage 14: 195/200 tasks finished in 28s
[INFO] Stage 14: 198/200 tasks finished in 32s
[INFO] Stage 14: 199/200 tasks finished in 35s
[INFO] Stage 14: 199/200 tasks finished in 2m 14s
[INFO] Stage 14: 199/200 tasks finished in 5m 31s
[INFO] Stage 14: 199/200 tasks finished in 12m 09s
[INFO] Stage 14: 200/200 tasks finished in 24m 58s

195 task finiscono in 28 secondi. Il duecentesimo finisce 25 minuti dopo. Quel buco è un task che processa una partition troppo grassa. Il wall clock complessivo del job è dominato da quell’unico task, anche se la maggior parte del lavoro era stata fatta nel primo mezzo minuto.

Perché nient’altro aiuta

Quando gli sviluppatori vedono questo per la prima volta, l’istinto è tirargli addosso risorse. Nessuna delle mosse ovvie funziona:

  • più executor? No. Il collo di bottiglia è un singolo task che gira su un singolo executor. Gli executor in più stanno in ozio.
  • più core per executor? No. Un singolo task usa un singolo core. Il multi-core aiuta solo se ci sono più task da eseguire.
  • più memoria? A volte: se il task skewed stava facendo spill su disco, più memoria lo rende più veloce. Ma il task sta comunque girando da solo, e sei comunque inchiodato a lui.
  • più partition di shuffle? Solo se lo skew è lieve. Aumentare spark.sql.shuffle.partitions da 200 a 2000 spalma le chiavi più leggere su più partition, ma ogni riga con la chiave pesante fa comunque hash sulla stessa partition unica.
  • Repartition? Una semplice repartition(2000) è uniforme e non aiuterà: farà solo shuffle dello stesso risultato sbilanciato in un conto di partition uniforme diverso, e la chiave pesante atterrerà comunque in un posto solo.

La cosa che funziona è cambiare la forma della chiave stessa, in modo che la chiave pesante venga spezzata su più partition. Quello è il salting. Quella è la lezione 29.

Soluzioni, in breve

Una mappa completa dei rimedi allo skew, classificati grossolanamente per facilità:

  1. Broadcast join, quando un lato è piccolo. La lezione 27 ha coperto questo. Se la tabella che ha la chiave pesante viene joinata contro una piccola lookup, fai broadcast della lookup e il join diventa locale: niente shuffle, niente skew.

  2. Filtra l’offender ovvio. Se il 30% delle tue righe ha user_id = null e dei null nel join non ti importa, filtrali prima di fare il join. Vittoria gratis.

  3. Salting. Aggiungi un suffisso casuale alle chiavi pesanti in modo che facciano hash su più partition, fai il join, poi ricomponi. Funziona per il caso “entrambi i lati sono grossi”. Copertura completa nella lezione 29.

  4. Gestione skew di AQE. Spark 3.x viene con l’Adaptive Query Execution, che può rilevare lo skew a runtime e spezzare automaticamente le partition pesanti. Si abilita con spark.sql.adaptive.enabled = true e spark.sql.adaptive.skewJoin.enabled = true. Non è magia: aiuta solo per i sort-merge join, scatta solo oltre una soglia configurabile, e funziona solo sul join in se’ (non su group-by arbitrari). Ma su Spark 3.4+ risolve molto skew senza modifiche al codice. La lezione 59 va a fondo su AQE.

  5. Pre-aggrega prima del join. Se la chiave pesante è pesante perché ha molti duplicati che andrai comunque ad aggregare, fai prima l’aggregazione. Una .groupBy("user_id").agg(...) davanti al join restringe il conto di righe dell’utente 1 da un milione a uno.

L’ordine delle operazioni in una sessione di debug vera di solito è: (1) conferma che è skew con la query diagnostica, (2) controlla se AQE è attivo e ha una soglia adeguata per i tuoi dati, (3) se un lato è una piccola lookup, fanne broadcast, (4) se no, filtra le chiavi null/spazzatura, (5) se no, salta.

Cosa arriva dopo

Adesso sai cos’è lo skew, perché è un problema a livello di stage piuttosto che a livello di cluster, e come individuarlo dalla UI o dai log. La lezione 29 cammina attraverso il salting da capo a fondo con codice, inclusa la fregatura in cui un salting implementato male peggiora le cose. Dopodiché, la lezione 30 chiude il modulo join-e-shuffle legando tutto: come leggere il piano fisico di un join e prevedere il runtime prima di premere go.

La query diagnostica per lo skew, df.groupBy(key).count().orderBy(F.desc("count")).show(20), vale la pena impararla a memoria. Lanciala su qualunque DataFrame su cui stai per fare qualcosa con chiave. Se la chiave in cima fa più di ~10 volte la mediana, pianifica per lo skew prima di fare debug del job lento.

Un diagnostico più stretto per la produzione

Il semplice groupBy().count() funziona su dataset piccoli. Su dati di produzione veri, fare un group-by completo solo per controllare lo skew è di per se’ uno shuffle costoso. Un approccio più veloce è fare prima un sample:

sample = events.sample(fraction=0.01, seed=42)

(sample
 .groupBy("user_id")
 .count()
 .orderBy(F.desc("count"))
 .show(20))

Un sample dell’1% di solito basta per individuare le chiavi pesanti, e salta lo shuffle completo. Se l’utente 1 domina nel sample, domina nell’insieme completo.

Un altro one-liner utile è il rapporto di skew: chiave in cima contro mediana.

counts = events.groupBy("user_id").count()
top    = counts.agg(F.max("count")).first()[0]
median = counts.approxQuantile("count", [0.5], 0.01)[0]
print(f"top={top:,}  median={median:,}  ratio={top/median:.1f}x")

Un rapporto sopra 100 è skew severo. Sopra 10 vale la pena pensarci. Sotto 5 va bene.

Entrambi i diagnostici sono abbastanza economici da buttarli in una pipeline come sanity check prima di qualunque shuffle costoso. Il tu del futuro ringrazierà il tu del presente la prima volta in cui l’alternativa è fare debug di uno stage da 4 ore a partire dai log.


Riferimenti: documentazione Apache Spark sul comportamento dello shuffle e sull’Adaptive Query Execution; post sul blog di engineering di Databricks sull’identificazione e il rimedio dello skew dei dati. Recuperato il 2026-05-01.

Cerca