Shuffle-ul este cea mai costisitoare operație din Spark. Când faci join între două DataFrame-uri mari, fiecare rând din fiecare parte trebuie să fie hash-uit după cheia de join și trimis peste rețea către executorul responsabil pentru acel bucket de hash. Zeci de gigaocteți zburând între mașini, scriși și citiți de pe disc local între timp. Un join care durează zece minute ar putea petrece opt din ele cu shuffle-ul.
Un broadcast join sare peste tot circul. Dacă o parte a join-ului este suficient de mică, chiar mică, încape-în-memorie de mică, Spark trimite tot DataFrame-ul mic către fiecare executor din cluster. Fiecare executor are deja felia lui din DataFrame-ul mare. Acum fiecare executor poate face join între felia lui și partea mică complet local. Fără shuffle. Fără rețea pentru partea mare. Partea mare nu se mișcă deloc.
Aceasta este cea mai mare optimizare de performanță din join-urile Spark și se întâmplă automat, de cele mai multe ori. Lecția asta este despre când se întâmplă automat, cum să o forțezi când Spark o ratează și când să nu o forțezi pentru că vei face OOM driver-ul.
Imaginea
Join-ul standard (sort-merge sau shuffle hash) arată așa:
big_df (300 GB) small_df (50 MB)
| |
[shuffle by key] [shuffle by key]
| |
\________ ___________/
\/
[executors do local joins]
Două shuffle-uri. 300 GB de trafic de rețea plus 50 MB de trafic de rețea, în mare parte primul.
Un broadcast join arată așa:
small_df (50 MB)
|
[collect to driver]
|
[send to every executor]
|
big_df (300 GB) ---------------+
| |
+---- local join on each executor's slice
Partea mare nu intră niciodată în shuffle. Partea mică călătorește exact o dată per executor. Dacă ai un tabel de lookup de 50 MB și un tabel de fapte de 300 GB, broadcast-ul e undeva între de 10x și 100x mai rapid.
Când alege Spark broadcast automat
Spark are un parametru de configurare numit spark.sql.autoBroadcastJoinThreshold. Implicit: 10485760 (10 MB). Orice DataFrame a cărui dimensiune estimată e sub acest prag e eligibil pentru broadcast.
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
# '10485760'
10 MB e conservator. Majoritatea cluster-elor de producție ridică asta la 100 MB, 200 MB, uneori 1 GB, în funcție de memoria driver-ului și a executorilor:
# 200 MB threshold — typical for a cluster with 16 GB executors
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 200 * 1024 * 1024)
Trucul este cuvântul estimată. Spark estimează dimensiunea unui DataFrame din statisticile lui dacă există (trebuie să rulezi ANALYZE TABLE pentru ele), sau din dimensiunea fișierului dacă citește direct un fișier, sau din dimensiunea de intrare dacă e rezultatul unei operații anterioare. Nu rulează interogarea ca să afle. Asta este sursa majorității surprizelor „de ce nu a făcut Spark broadcast la asta?”.
Un fișier Parquet de 5 GB pe care îl filtrezi până la 50 MB e, în ochii lui Spark, tot un DataFrame de 5 GB pentru planificarea join-ului. Filtrul se întâmplă la runtime; planner-ul ia decizia înainte. Așa că filtrul greu nu declanșează broadcast automat chiar dacă datele alăturate efectiv ar încăpea ușor.
Soluția este broadcast hint-ul.
Broadcast hint-ul
from pyspark.sql.functions import broadcast
big_facts.join(broadcast(small_dim), on="dim_id", how="inner")
broadcast(small_dim) îi spune lui Spark: „crede-mă, partea asta e mică, fă-i broadcast.” Spark execută, fără întrebări, chiar dacă propria estimare de dimensiune zice altceva.
Aceasta este cea mai utilă funcție PySpark pentru performanța de join. Dacă ai identificat un join intens și o parte e cu adevărat mică la runtime, chiar dacă nu pare mică static, pune un broadcast() în jurul ei și uită-te cum scade timpul de execuție al job-ului.
Un mic exemplu concret. Voi crea un tabel de fapte cu comenzi și un tabel de dimensiuni cu țări, voi rula join-ul în două moduri și voi privi planul.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
spark = (SparkSession.builder
.appName("BroadcastJoins")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
# Pretend this is huge — millions of rows
orders = spark.createDataFrame(
[(i, f"IT" if i % 3 == 0 else "NL" if i % 3 == 1 else "DE", i * 1.5)
for i in range(10_000)],
"order_id INT, country STRING, total DOUBLE",
)
# Tiny lookup table — three rows
countries = spark.createDataFrame(
[("IT", "Italy", 22.0),
("NL", "Netherlands", 21.0),
("DE", "Germany", 19.0)],
"country STRING, country_name STRING, vat_rate DOUBLE",
)
# Without explicit hint — at this size Spark picks broadcast on its own
orders.join(countries, on="country").explain()
Output-ul planului (tăiat la părțile relevante) va arăta cam așa:
== Physical Plan ==
*(2) BroadcastHashJoin [country#5], [country#15], Inner, BuildRight
:- *(2) Filter isnotnull(country#5)
: +- *(2) Scan ExistingRDD[order_id#4, country#5, total#6]
+- BroadcastExchange HashedRelationBroadcastMode([country#15])
+- *(1) Filter isnotnull(country#15)
+- *(1) Scan ExistingRDD[country#15, country_name#16, vat_rate#17]
Cele două cuvinte la care să te uiți: BroadcastHashJoin și BroadcastExchange. Dacă le vezi, partea mică e trimisă prin broadcast. Dacă vezi SortMergeJoin în loc, partea mică e trimisă prin shuffle.
Acum dezactivează auto-threshold-ul și rulează din nou:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
orders.join(countries, on="country").explain()
# == Physical Plan ==
# *(5) SortMergeJoin [country#5], [country#15], Inner
# :- *(2) Sort [country#5 ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(country#5, 8) ...
# ...
Același join, alt plan. Ambele fac shuffle pentru că broadcast-ul e oprit. Zeci de secunde versus sub o secundă pe date reale.
Forțează broadcast-ul înapoi chiar și cu threshold-ul oprit:
orders.join(broadcast(countries), on="country").explain()
# Back to BroadcastHashJoin.
Hint-ul învinge threshold-ul. Hint-ul e suprascrierea.
Hint-ul opus: forțarea unui sort-merge
Există cazuri în care Spark vrea să facă broadcast și ai prefera să n-o facă, de obicei pentru că estimarea de dimensiune a lui Spark e prea mică și broadcast-ul va face OOM. Hint-ul este merge:
df1.join(df2.hint("merge"), on="key").explain()
# Forces SortMergeJoin
Sau shuffle_hash dacă vrei specific strategia de shuffle hash join:
df1.join(df2.hint("shuffle_hash"), on="key")
Vei recurge la astea mai rar decât la broadcast, dar când Spark greșește strategia, contează să știi că toate trei hint-urile există.
Cum să dezactivezi broadcast-ul global
Uneori depanezi un job instabil și vrei să scoți broadcast-ul din ecuație:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
-1 înseamnă „niciodată auto-broadcast.” Hint-urile explicite broadcast() tot funcționează. Asta e util pentru a testa dacă broadcast-ul automat cauzează probleme, dar nu ar trebui să-l lași setat la -1 în producție: ai renunța la cea mai bună optimizare de join pe care o are Spark.
Modul de eșec: OOM la driver
Lucrul care face broadcast joins rapide, trimiterea întregii părți mici către fiecare executor, e și lucrul care le face periculoase. Mecanismul este:
- Spark apelează
.collect()pe DataFrame-ul mic, aducând toate rândurile lui înapoi la driver. - Driver-ul materializează tot în memorie.
- Driver-ul îl expediază către fiecare executor peste rețea.
Dacă partea „mică” e de fapt 4 GB iar driver-ul tău are 2 GB de memorie, pasul 2 face OOM. Driver-ul moare. Job-ul moare. Vezi un stack trace care se termină cu ceva de genul OutOfMemoryError: Java heap space sau Total size of serialized results of N tasks (X GB) is bigger than spark.driver.maxResultSize.
Soluția e fie:
- Nu face broadcast la acel DataFrame. Renunță la hint, lasă Spark să facă sort-merge.
- Mărește memoria driver-ului (
spark.driver.memory) șispark.driver.maxResultSize. - Filtrează sau agregă partea mică înainte de broadcast.
Regulă generală: nu face niciodată broadcast la ceva mai mare de ~1 GB nici măcar pe un driver puternic. Zona confortabilă este intervalul 100-200 MB. Mai mare de atât și flirtezi cu OOM și pierzi și avantajul broadcast-ului: la dimensiunea aia, costul de rețea al replicării către N executori începe să rivalizeze cu costul unui simplu shuffle.
Două pattern-uri practice
Pattern 1: filtrează, apoi broadcast. Dacă știi că partea mică va fi minusculă după un filtru, materializeaz-o și pune hint-ul:
active_users = (users
.where(F.col("status") == "active")
.select("user_id", "name", "tier"))
events.join(broadcast(active_users), on="user_id", how="left")
Dacă active_users e 5% din users și originalul era de 200 MB, faci broadcast la 10 MB. Câștig ușor.
Pattern 2: broadcast la dim, sort-merge la fact. Pattern-ul „star schema”. Tabel de fapte mare, tabele de dimensiuni mici. Broadcast la fiecare dim:
(facts
.join(broadcast(dim_country), on="country_id")
.join(broadcast(dim_product), on="product_id")
.join(broadcast(dim_date), on="date_id"))
Fiecare dim evită un shuffle. Trei join-uri, zero shuffle-uri pe partea de fapte. Asta e optimizarea de bază în orice workload de analytics.
Ce nu poate repara broadcast-ul
Broadcast joins rezolvă problema „o parte e mică”. Nu rezolvă problema „o cheie e uriașă”. Dacă DataFrame-ul tău mare are 100 de milioane de rânduri pentru country = 'US' și 1 milion de rânduri pentru toate celelalte la un loc, niciun broadcast nu ajută: partea de fapte tot e procesată local pe fiecare executor, iar un executor tot ajunge cu toate rândurile US. Asta e data skew. Asta e lecția 28.
Citirea lui .explain() ca un inspector
Odată ce începi să vânezi probleme de performanță, .explain() devine instrument zilnic. Output-ul implicit e doar text; .explain(True) îți dă planurile parsate, analizate, optimizate și fizice toate odată. Pentru întrebările de broadcast, planul fizic e cel care contează. Cuvintele cheie pe care să le cauți:
BroadcastHashJoin: calea rapidă, broadcast-ul a funcționat.BroadcastNestedLoopJoin: broadcast-ul a funcționat dar condiția de join nu e o egalitate, așa că Spark face o verificare per rând. Lent dacă partea de broadcast nu e trivială.SortMergeJoin: ambele părți shuffle-uite și sortate. Implicit pentru join-uri big-vs-big.ShuffleHashJoin: ambele părți shuffle-uite, partea mai mică hash-uită în memorie. Spark îl alege mai rar implicit; îl poți forța cudf.hint("shuffle_hash").BroadcastExchange: partea mică fiind colectată și trimisă prin broadcast. Mereu împerecheat cuBroadcastHashJoin(sauBroadcastNestedLoopJoin).
Dacă ai rulat job-ul, te așteptai la broadcast și .explain() arată SortMergeJoin, vinovatul cel mai probabil e estimarea de dimensiune: planner-ul crede că partea mică e mai mare decât threshold-ul. Fie ridică threshold-ul, fie repară statisticile, fie adaugă hint-ul explicit broadcast().
O mică notă despre AQE
Adaptive Query Execution din Spark 3.x poate converti un SortMergeJoin într-un BroadcastHashJoin la runtime dacă dimensiunea efectivă a uneia dintre părți se dovedește suficient de mică după ce rulează stage-urile inițiale. Ăsta e un răspuns parțial la problema „estimarea de dimensiune era greșită”: Spark ia o a doua privire odată ce datele s-au materializat. AQE este pornit implicit în Spark 3.2+, dar threshold-ul de broadcast pentru conversia AQE este o setare separată (spark.sql.adaptive.autoBroadcastJoinThreshold). Lecția 59 acoperă AQE complet; pentru moment, doar reține că „Spark nu a făcut broadcast la momentul planificării” nu înseamnă neapărat „Spark nu va face broadcast deloc.”
Catalyst, optimizatorul de interogări al Spark, ia majoritatea acestor decizii pentru tine când are statistici bune. Lecția 41 acoperă Catalyst de la cap la coadă și explică cum să-i citești planurile. Pentru moment, broadcast hint când știi că partea mică e mică, ai grijă la threshold și nu face broadcast la un DataFrame de 4 GB de pe un driver de 2 GB.
Referințe: documentația Apache Spark SQL despre join hints și autoBroadcastJoinThreshold; articole de pe blogul de inginerie Databricks despre broadcast joins și selecția strategiei de join. Consultat 2026-05-01.