Dacă ai venit la PySpark din SQL, deja știi join-urile. Modelul mental se transferă aproape perfect: INNER, LEFT, RIGHT, FULL OUTER, semi, anti, cross, aceleași șapte arome, aceeași semnificație combinatorică, același „așa transformi două tabele într-unul singur”. Ce se schimbă în Spark este costul. Un join în PostgreSQL pe un tabel cu un milion de rânduri durează câteva sute de milisecunde. Un join în Spark pe un tabel cu un miliard de rânduri poate dura trei minute sau trei ore, în funcție de cum decide Spark să-l execute. Despre acea distincție este Modulul 5.
Lecția de față acoperă sintaxa și semantica: cele șapte tipuri de join, cele trei stiluri de condiție de join și capcana coloanelor duplicate care îi prinde pe toți exact o dată. Lecția 27 acoperă broadcast joins (calea ieftină). Lecția 28 acoperă data skew (calea lentă). Lecția 29 acoperă salting (ieșirea din calea lentă).
Forma canonică
Orice join Spark se reduce la:
result = df1.join(df2, on=<condition>, how=<type>)
on este ce contează drept potrivire. how este ce să faci cu rândurile care n-au potrivire. Atât. Valorile implicite sunt how="inner" și niciun on, ceea ce îți dă un cross join cu un avertisment dacă uiți condiția.
Un exemplu mic recurent
Voi folosi două DataFrame-uri minuscule pentru ca output-ul fiecărui join să fie suficient de mic ca să poată fi citit:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("JoinsInPySpark")
.master("local[*]")
.getOrCreate())
customers = spark.createDataFrame(
[
(1, "Anna", "IT"),
(2, "Bjorn", "NL"),
(3, "Chiara","IT"),
(4, "Diego", "ES"), # never ordered
],
"customer_id INT, name STRING, country STRING",
)
orders = spark.createDataFrame(
[
(1001, 1, 59.00),
(1002, 1, 29.00),
(1003, 2, 149.00),
(1004, 3, 89.50),
(1005, 99, 12.00), # ghost customer — id 99 doesn't exist
],
"order_id INT, customer_id INT, total DOUBLE",
)
Patru clienți (unul fără comenzi), cinci comenzi (una pentru un client care nu există). Toate cele șapte join-uri de mai jos rulează pe acestea.
Inner join: doar potrivirile, nimic altceva
Implicit. how="inner" este ce primești dacă nu transmiți deloc how.
customers.join(orders, on="customer_id", how="inner").show()
# +-----------+-------+-------+--------+------+
# |customer_id| name|country|order_id| total|
# +-----------+-------+-------+--------+------+
# | 1| Anna| IT| 1001| 59.00|
# | 1| Anna| IT| 1002| 29.00|
# | 2| Bjorn| NL| 1003|149.00|
# | 3| Chiara| IT| 1004| 89.50|
# +-----------+-------+-------+--------+------+
Diego (fără comenzi) a dispărut. Comanda 1005 (fără client corespunzător) a dispărut. Inner join înseamnă „rânduri care se potrivesc pe ambele părți, punct.”
Observă că Anna apare de două ori: are două comenzi. Join-urile multiplică. Un rând din stânga poate produce N rânduri în dreapta dacă există N potriviri. Asta este iar combinatorica de rânduri. Ușor de uitat când scrii repede.
Left (left_outer) join: păstrezi tot din stânga
customers.join(orders, on="customer_id", how="left").show()
# +-----------+-------+-------+--------+------+
# |customer_id| name|country|order_id| total|
# +-----------+-------+-------+--------+------+
# | 1| Anna| IT| 1001| 59.00|
# | 1| Anna| IT| 1002| 29.00|
# | 2| Bjorn| NL| 1003|149.00|
# | 3| Chiara| IT| 1004| 89.50|
# | 4| Diego| ES| null| null|
# +-----------+-------+-------+--------+------+
Diego e înapoi, cu coloane null pentru tot ce vine din orders. how="left" și how="left_outer" sunt aliasuri: ambele funcționează, ambele înseamnă același lucru.
Întrebarea „clienții fără comenzi” devine un filtru trivial pe deasupra:
(customers
.join(orders, on="customer_id", how="left")
.where(F.col("order_id").isNull())
.select("customer_id", "name", "country")
.show())
Right (right_outer) join: păstrezi tot din dreapta
customers.join(orders, on="customer_id", how="right").show()
# +-----------+-------+-------+--------+------+
# |customer_id| name|country|order_id| total|
# +-----------+-------+-------+--------+------+
# | 1| Anna| IT| 1001| 59.00|
# | 1| Anna| IT| 1002| 29.00|
# | 2| Bjorn| NL| 1003|149.00|
# | 3| Chiara| IT| 1004| 89.50|
# | 99| null| null| 1005| 12.00|
# +-----------+-------+-------+--------+------+
Clientul fantomă al comenzii 1005 trece cu null-uri pe partea de client. Right join-urile există de dragul completitudinii. În practică, nu le scrie nimeni: schimbă operanzii și scrie un left join. Un revizor de cod va citi customers.join(orders, ..., how="left") mai repede decât versiunea cu right join, de fiecare dată.
Full outer join: apar toți
customers.join(orders, on="customer_id", how="full_outer").show()
# Diego appears (no orders), order 1005 appears (no customer),
# everyone else matches.
how="full", how="outer" și how="full_outer" sunt toate identice. Util pentru rapoarte de reconciliere: „ce e în sistemul A care nu e în sistemul B, ce e în B care nu e în A și ce se potrivește” totul într-o singură interogare. Îl folosesc cam la fel de des cât FULL OUTER JOIN în SQL: rar, dar când îți trebuie nu se potrivește nimic altceva.
Left semi: un filtru, nu un join
left_semi păstrează rândurile din partea stângă care au cel puțin o potrivire în dreapta. Nu aduce nicio coloană din dreapta. Nu multiplică rânduri.
# Customers who have placed at least one order
customers.join(orders, on="customer_id", how="left_semi").show()
# +-----------+-------+-------+
# |customer_id| name|country|
# +-----------+-------+-------+
# | 1| Anna| IT|
# | 2| Bjorn| NL|
# | 3| Chiara| IT|
# +-----------+-------+-------+
Anna apare o singură dată chiar dacă are două comenzi. Semi join-ul este doar pentru apartenență: un rând per potrivire stângă, fără duplicare. Este echivalentul Spark al lui WHERE EXISTS (...) din SQL și aproape mereu este ce vrei când folosești un join doar pe post de filtru.
Dacă ai scris vreodată în cod de producție df.join(other, "key", "inner").select(df["*"]).distinct(): oprește-te. left_semi e mai rapid, mai curat și nu va exploda accidental rândurile când partea dreaptă are duplicate.
Left anti: inversul
left_anti păstrează rândurile din partea stângă care nu au potrivire în dreapta.
# Customers with no orders
customers.join(orders, on="customer_id", how="left_anti").show()
# +-----------+-----+-------+
# |customer_id| name|country|
# +-----------+-----+-------+
# | 4|Diego| ES|
# +-----------+-----+-------+
Acesta este WHERE NOT EXISTS (...) din SQL. E cel mai curat mod de a răspunde la orice întrebare „lucruri în stânga fără lucruri în dreapta”: clienți fără comenzi, produse care nu s-au vândut niciodată, utilizatori care nu s-au logat niciodată. Mai curat decât pattern-ul left join + where col is null, iar optimizatorul Spark îl tratează mai direct.
Cross join: produsul cartezian
Fiecare rând din stânga împerecheat cu fiecare rând din dreapta. 4 clienți ori 5 comenzi fac 20 de rânduri, dintre care niciunul nu înseamnă nimic.
# Spark refuses to run this unless you explicitly ask:
customers.crossJoin(orders).count() # 20
Motivul pentru care crossJoin are propria lui metodă (în loc de how="cross") este că un produs cartezian accidental pe date reale poate fi catastrofal: un milion de rânduri pe fiecare parte devine un trilion. Spark vrea să tastezi crossJoin ca să nu te împiedici în el.
Când e util un cross join? Pentru construirea de grile de dimensiuni („fiecare produs × fiecare zi din ultimele 90 de zile, ca LEFT JOIN-ul la vânzări să nu aibă goluri de date”), generarea de date de test combinatorice și cazuri rare de raportare. În munca de zi cu zi, aproape niciodată.
Trei moduri de a scrie condiția de join
Parametrul on acceptă trei stiluri și pe toate trei le vei vedea în cod de producție:
# 1. String — works when both sides have the column with the same name
customers.join(orders, on="customer_id")
# 2. List of strings — same idea, multiple equality keys
events.join(sessions, on=["user_id", "session_date"])
# 3. Explicit column expression — most flexible, required if names differ
customers.join(orders, on=customers["customer_id"] == orders["customer_id"])
Primele două stiluri au un beneficiu ascuns: Spark elimină coloana de join duplicată din rezultat. Stilul cu expresie nu face asta. Privește ce se întâmplă:
result = customers.join(
orders,
on=customers["customer_id"] == orders["customer_id"],
how="inner",
)
result.columns
# ['customer_id', 'name', 'country', 'order_id', 'customer_id', 'total']
# ^^ two columns named customer_id ^^
Două coloane numite customer_id, ambele legale, ambele ambigue de referit. Încearcă result.select("customer_id") și Spark aruncă AMBIGUOUS_REFERENCE. Bun venit la capcana coloanelor duplicate.
Capcana coloanelor duplicate și cum o eviți
Trei rezolvări, în ordinea preferinței.
Rezolvarea 1: folosește forma string sau listă când poți. E cea mai curată și Spark deduplichează pentru tine:
customers.join(orders, on="customer_id").select("customer_id", "name", "total")
Rezolvarea 2: dă alias DataFrame-urilor înainte de join. Acum nu mai există ambiguitate, fiindcă coloanele sunt calificate:
c = customers.alias("c")
o = orders.alias("o")
(c.join(o, F.col("c.customer_id") == F.col("o.customer_id"))
.select("c.customer_id", "c.name", "o.total")
.show())
Rezolvarea 3: redenumește sau aruncă o parte înainte de join. Mai puțin elegant, uneori pragmatic:
orders_renamed = orders.withColumnRenamed("customer_id", "cust_id")
customers.join(orders_renamed, customers["customer_id"] == orders_renamed["cust_id"])
Alege o convenție și ține-te de ea în tot codul. Amestecul de stiluri creează bug-urile.
Condiții dincolo de simpla egalitate
Expresia on e doar un boolean: orice returnează adevărat sau fals pe o pereche de rânduri e valid. Range join-uri, join-uri cu mai multe condiții, join-uri pe inegalitate, toate funcționează:
# Range join: events that happened during a user's session
sessions.join(
events,
on=(
(sessions["user_id"] == events["user_id"]) &
(events["timestamp"] >= sessions["start"]) &
(events["timestamp"] < sessions["end"])
),
how="inner",
)
Util, dar atenție: doar predicatele de egalitate pot folosi calea rapidă de hash join a lui Spark. Adaugă o inegalitate și Spark cade înapoi pe un sort-merge sau, mai rău, pe un broadcast nested loop. Job-ul tot rulează, doar mai lent. Vom vedea asta în output-ul .explain() în lecția 27.
Câteva pattern-uri practice
Trei pattern-uri pe care merită să le știi pentru că apar constant:
Enrichment. Un tabel larg de evenimente alăturat unui tabel subțire de dimensiune: fiecare eveniment trebuie să știe numele țării, nu doar codul țării. Inner join pe cheia primară a dim-ului, proiectează coloanele de care chiar ai nevoie:
events.join(
dim_country.select("country_code", "country_name"),
on="country_code",
how="left",
)
Observă select-ul pe partea dreaptă. Limitează coloanele pe care Spark trebuie să le poarte prin shuffle. Obiceiuri de genul ăsta contează la scară: fiecare coloană pe care o cari sunt octeți pe rețea.
De-duplicare prin anti-join. Ai un tabel „lucruri procesate ieri” și „lucruri de procesat azi”. Vrei azi minus ieri:
today.join(yesterday.select("id"), on="id", how="left_anti")
Mai curat decât echivalentul WHERE NOT IN și e instrumentul potrivit. NOT IN cu null-uri în partea dreaptă dă rezultate surprinzătoare (orice null returnează zero rânduri); left_anti nu are capcana asta.
Verificare de existență prin semi. Un utilizator e „activ” dacă apare și în logins_30d și în purchases_30d:
active = (logins_30d.select("user_id").distinct()
.join(purchases_30d.select("user_id"), on="user_id", how="left_semi"))
Două distinct-uri, un semi join. Fără multiplicare accidentală de rânduri, fără null-uri de care să-ți faci griji.
Ce urmează
Astea sunt cele șapte join-uri. Semantica are formă de SQL și e necontroversată. Întrebarea interesantă, cea care decide dacă job-ul tău rulează în 30 de secunde sau în 30 de minute, e cum execută Spark fizic join-ul. Există trei strategii principale (broadcast hash join, sort-merge join, shuffle hash join), iar Spark o alege pe una în funcție de mărimea fiecărei părți, tipul de join și configurare.
Lecția 27 intră în broadcast joins: când o parte e suficient de mică să fie trimisă peste tot, sari complet peste shuffle, iar join-urile devin ieftine. Lecția 28 acoperă ce se întâmplă când o parte are o cheie cu de 100 de ori mai multe rânduri decât celelalte, problema temută a skew-ului. Lecția 29 acoperă salting, soluția standard.
Pentru moment, modelul mental potrivit este: scrie join-ul așa cum ai scrie SQL, alege how-ul potrivit și evită capcana coloanelor duplicate. Performanța vine după.
Referințe: documentația Apache Spark SQL despre tipurile de join și API-ul de join al DataFrame-urilor; articole de pe blogul de inginerie Databricks despre strategiile de join. Consultat 2026-05-01.