Lecția anterioară te-a lăsat cu privirea fixată pe un Spark UI unde un task rula de treizeci de minute, în timp ce celelalte 199 terminaseră în douăsprezece secunde. Gâtuirea era o singură cheie fierbinte, user_id = 1, sau country = 'US', sau orice altceva domină datele tale. Aruncatul de mai mult cluster nu ajută, fiindcă munca e pe un singur task, pe un singur core. Trebuie să schimbăm forma cheii.
Asta e salting.
Ideea într-o singură propoziție
Iei cheia fierbinte, îi lipești un număr aleator mic și acum ce era o singură partiție devine N partiții. Cheia dominantă e împărțită pe mai multe task-uri, stage-ul se termină în aproximativ 1/N din timp și gata.
Capcana, fiindcă există mereu o capcană, e că cealaltă parte a join-ului trebuie acum replicată, fiindcă fiecare variantă sărată a cheii trebuie să-și găsească match-ul. Vom vedea exact cum funcționează mai jos, inclusiv versiunea mai inteligentă în care sărezi doar rândurile care au nevoie.
Tiparul în patru pași
Salting devine mecanic odată ce l-ai văzut. Patru pași:
Pasul 1: pe partea cu skew (fact), adaugi o coloană de salt. Alegi un interval de salt N, de obicei 4, 8 sau 16. Pentru fiecare rând pe partea cu skew, generezi un întreg aleator salt = floor(rand() * N). Cheia efectivă devine (original_key, salt) în loc de doar original_key. Cheia fierbinte, care înainte se hashuia într-o singură partiție, acum se hashuiește în până la N partiții.
Pasul 2: pe cealaltă parte (dimension), replici fiecare rând de N ori. Pentru fiecare rând din tabelul de dimension, emiți N copii, una pentru fiecare valoare de salt posibilă 0, 1, ..., N-1. Cheia efectivă pe această parte e tot (original_key, salt), dar acoperă fiecare salt posibil pentru fiecare cheie.
Pasul 3: faci join pe (original_key, salt). Fiecare rând de fact își găsește unicul rând dim corespunzător. Join-ul se distribuie acum pe N partiții pentru cheia fierbinte, uniform.
Pasul 4: arunci coloana de salt după. Și-a făcut treaba în timpul shuffle-ului. Rândurile de output sunt aceleași pe care le-ai fi obținut dintr-un join obișnuit, fără duplicate, fiindcă fiecare rând de fact a făcut match cu exact unul dintre cele N rânduri dim replicate.
Asta e tot. Pașii 2 și 4 sunt unde se împiedică majoritatea începătorilor. Replică partea dim, dar fă join pe salt, ca fiecare rând de fact să facă match cu exact un rând dim replicat. Nu agrega înainte de salting decât dacă vrei asta. Nu uita să elimini saltul înainte să numeri lucruri.
Exemplu lucrat
Hai s-o facem împotriva unui set de date cu o formă realistă. Un tabel de fact cu tranzacții unde 60% din rânduri au country = 'US', plus un mic tabel de dimension cu metadate de țară.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Salting")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.sql.adaptive.enabled", "false") # disable AQE so we see raw skew
.getOrCreate())
# Fact table: 1M rows, 60% are US, the rest spread across 9 other countries
us_rows = spark.range(0, 600_000).select(
F.lit("US").alias("country"),
F.col("id").alias("txn_id"),
(F.rand() * 1000).alias("amount"),
)
others = ["IT", "FR", "DE", "ES", "UK", "JP", "BR", "IN", "CA"]
other_rows = spark.range(0, 400_000).select(
F.element_at(F.array(*[F.lit(c) for c in others]),
(F.col("id") % 9 + 1).cast("int")).alias("country"),
F.col("id").alias("txn_id"),
(F.rand() * 1000).alias("amount"),
)
facts = us_rows.unionByName(other_rows)
# Dim table: one row per country
dim = spark.createDataFrame(
[(c, c + " full name", c + "-region") for c in ["US"] + others],
"country STRING, country_name STRING, region STRING",
)
Un join clasic pe country:
joined = facts.join(dim, on="country", how="inner")
joined.write.mode("overwrite").parquet("/tmp/skew-vanilla")
Dacă urmărești Spark UI, stage-ul de shuffle arată tiparul de manual din lecția 28: 199 de task-uri rapide, un task lent pe partiția care ține toate cele 600k de rânduri US. Pe un cluster real cu numere mai mari, acel task ar putea rula minute în șir cât restul termină în secunde.
Acum versiunea sărată. Varianta deșteaptă sărează doar US, cheia fierbinte, nu fiecare țară, fiindcă a săra cheile fără skew doar multiplică rândurile dim degeaba.
N = 8 # salt range
# Step 1: salt the fact side, but ONLY for the hot key
facts_salted = facts.withColumn(
"salt",
F.when(F.col("country") == "US",
(F.rand() * N).cast("int"))
.otherwise(F.lit(0))
)
# Step 2: replicate the dim side, but only for the hot key
us_dim = dim.filter(F.col("country") == "US")
others_dim = dim.filter(F.col("country") != "US")
# Build [0, 1, ..., N-1]
salts = spark.range(0, N).withColumnRenamed("id", "salt").withColumn(
"salt", F.col("salt").cast("int")
)
us_dim_salted = us_dim.crossJoin(salts) # N copies of the US row, one per salt
others_dim_salted = others_dim.withColumn("salt", F.lit(0))
dim_salted = us_dim_salted.unionByName(others_dim_salted)
# Step 3: join on (country, salt)
joined_salted = facts_salted.join(dim_salted, on=["country", "salt"], how="inner")
# Step 4: drop the salt column
joined_salted = joined_salted.drop("salt")
joined_salted.write.mode("overwrite").parquet("/tmp/skew-salted")
Rândurile US pe partea de fact se distribuie acum uniform în 8 buckets de salt. Rândul US de pe partea dim apare de 8 ori, o dată pentru fiecare valoare de salt, deci fiecare rând de fact face match cu exact un rând dim. Celelalte țări sunt neatinse: saltul e 0 pe ambele părți și se face join ca înainte.
În Spark UI, versiunea sărată arată complet diferit. Unde versiunea clasică avea un task de 30 de secunde și 199 rapide, versiunea sărată are 8 task-uri care procesează fiecare cam 75k rânduri US în aproximativ 4 secunde, în paralel cu restul. Wall clock-ul stage-ului scade de la „limitat de task-ul lent” la „limitat de task-ul median”, exact ce vrei.
Costul
Salting nu e gratis. Trei lucruri de ținut minte:
Rândurile dim replicate sunt rânduri reale. Dacă N = 8 și cheia fierbinte are, să zicem, 50 de rânduri pe partea dim (unele chei chiar au, gândește-te la chei compuse fierbinți), ai transformat 50 de rânduri în 400. Datele suplimentare trebuie shuffluite. Pentru părți dim minuscule precum tabelul de țări de mai sus, costul e o eroare de rotunjire. Pentru părți dim mai mari, un tabel de atribute per produs unde „SKU-ul fierbinte” are 200 de variante de atribute, multiplicarea poate aduna. Verifică.
Alege N cât mai mic posibil. O greșeală frecventă e să alegi N = 100 fiindcă „mai multă răspândire e mai bine.” Nu e. Cheia fierbinte de pe partea de fact avea doar 600k rânduri; răspândindu-le pe 8 partiții obții cam 75k pe partiție, ceea ce e suficient paralelism. Mergând la 100 face fiecare partiție minusculă, dar ai și multiplicat rândurile fierbinți de pe partea dim de 100 de ori. Pentru majoritatea workload-urilor, N între 4 și 16 e zona dulce. Pornește de la 8, măsoară, ajustează.
Sărează doar cheile fierbinți. Exemplul de mai sus e tiparul deștept: un WHEN ... OTHERWISE care sărează US și lasă restul în pace. Versiunea naivă, sărarea fiecărui rând de fact și replicarea fiecărui rând dim, funcționează, dar plătește costul replicării pe tot tabelul dim degeaba. Dacă skew-ul tău e concentrat pe una sau două chei cunoscute, alege-le. Dacă nu știi care chei sunt fierbinți, rulează diagnosticul din lecția 28 mai întâi.
Când să NU faci salting
Trei cazuri în care salting e unealta greșită:
Broadcast join funcționează. Dacă partea dim încape în memorie (lecția 27), fă-i broadcast și gata. Fără shuffle, fără skew, fără salt. Abordarea cu salting contează doar când partea dim e prea mare pentru broadcast iar partea de fact are skew.
AQE se ocupă pentru tine. Adaptive Query Execution din Spark 3.x (lecția 59) are suport pentru skew-join. Cu spark.sql.adaptive.enabled = true și spark.sql.adaptive.skewJoin.enabled = true, Spark detectează partițiile cu skew în runtime și le împarte automat, fără modificări de cod. AQE se ocupă doar de sort-merge join-uri și se activează doar peste un prag de mărime configurabil, deci nu înlocuiește salting-ul în orice caz, dar pe Spark 3.4+ rezolvă mult skew înainte să știi măcar că ai skew. Verifică mereu dacă AQE e pornit înainte să te apuci de salt.
„Skew-ul” e de fapt blând. Dacă cheia de top e de 3x mediana, n-ai o problemă de skew, ai un workload ușor neuniform. Mărirea spark.sql.shuffle.partitions (lecția 32) e o soluție mai ieftină. Salting e pentru cazurile unde o cheie are de 100x volumul restului, unde matematica spune că nimic altceva nu va merge.
Salting pentru group-by, nu doar join
Același truc funcționează pentru agregări. Dacă groupBy("user_id").sum("amount") are un user fierbinte, poți:
- Adaugi o coloană de salt pe input.
- Faci group by
(user_id, salt), agregare parțială, distribuită pe N partiții. - Faci group by
user_iddin nou pe rezultatele parțiale, mic, ieftin, fără skew.
salted = events.withColumn("salt", (F.rand() * 8).cast("int"))
partial = (salted.groupBy("user_id", "salt")
.agg(F.sum("amount").alias("partial_sum")))
final = (partial.groupBy("user_id")
.agg(F.sum("partial_sum").alias("total")))
Agregarea parțială face munca grea în paralel; agregarea finală vede doar N rânduri per user, ceea ce încape într-un singur task cu loc de manevră. Același tipar ca combiner / reduce în MapReduce: distribui munca, apoi colapsezi.
Ce urmează
Lecția 30 încheie modulul de join-uri și shuffle-uri parcurgând cum se citește planul fizic al unui join și cum prezici runtime-ul lui înainte să apeși go: broadcast-uri, sort-merge, shuffle hash și cum AQE rescrie planul când vede o problemă. După aia, modulul 6 (lecțiile 31 și 32) se depărtează de „repararea stage-ului lent” către „proiectarea partițiilor intenționat de la început”, fiindcă majoritatea problemelor de skew sunt probleme de partiții deghizate.
Două lucruri care merită memorate din lecția asta:
- Rețeta de salting: sărezi partea de fact, replici partea dim, faci join, arunci saltul. Patru pași. Nu sări niciunul.
- Alege
Nmic, sărează doar cheile fierbinți și verifică dacă AQE deja face asta pentru tine înainte să scrii o linie de cod.
Salting e genul de truc care pare deștept prima dată și rutină a zecea oară. Ține snippet-ul de mai sus pe undeva; data viitoare când un stage atârnă pe un task, vei ști exact ce să faci.
Referințe: documentația Apache Spark despre strategiile de join și Adaptive Query Execution; postări de pe blog-ul de inginerie Databricks despre tiparele de remediere a skew-ului. Consultat 2026-05-01.