PySpark, de la zero Lecția 36 / 60

Bucketing: cand partitioning nu e suficient

Hash-partitioning intr-un numar fix de bucket-uri la write time, optimizarea de bucket join si de ce bucketing e subutilizat.

Am văzut acum ambele concepte de partitioning: partiții in-memory controlate de repartition și coalesce, și partitioning pe disc via partitionBy. Ambele sunt grozave când coloana care te interesează are cardinalitate mică: year, month, country. Ambele cad când coloana are cardinalitate mare: user_id, account_id, transaction_id.

Dar coloanele cu cardinalitate mare sunt exact cele după care faci join. Dacă tabela ta de fapte are 500 de milioane de comenzi și o joinezi cu o tabelă de dimensiuni pe user_id de zece ori pe zi, fiecare dintre acele join-uri declanșează un shuffle al unei jumătăți de miliard de rânduri. Chiar și cu partiții sănătoase și fără skew, plătești zeci de secunde de muncă de shuffle per interogare.

Există o a treia unealtă, și e cea pe care aproape nimeni n-o folosește: bucketing. Lecția asta e despre de ce există bucketing, ce face, când merită costul operațional și capcana care îi prinde pe toți la prima încercare.

Ideea

Când faci bucket pe o tabelă, îi spui Spark la write time să hash-partitioneze fiecare partiție de input într-un număr fix de bucket-uri și să înregistreze specificația de bucket, coloana, numărul de bucket-uri, funcția de hash, în metadatele tabelei.

(orders
 .write
 .bucketBy(64, "user_id")
 .sortBy("user_id")               # optional but recommended
 .saveAsTable("warehouse.orders_bucketed"))

Două lucruri s-au schimbat față de o scriere normală cu partitionBy:

  1. Fișierele de output sunt numite după bucket. Fiecare task care scrie date produce 64 de fișiere: part-00000-...-bucket=0.snappy.parquet, bucket=1, …, bucket=63. Fiecare rând al cărui hash(user_id) % 64 == 17 ajunge în bucket=17, indiferent din ce partiție de input a venit.
  2. Ai folosit saveAsTable, nu save. Asta e nenegociabil pentru bucketing. Specificația de bucket trăiește în metastore (Hive metastore, AWS Glue, Databricks Unity Catalog, orice ai configurat). Spark o caută la read time. Fără o tabelă bazată pe metastore, layout-ul e doar o grămadă de fișiere Parquet pe disc și Spark n-are cum să știe ce înseamnă.

Al doilea punct e motivul principal pentru care bucketing e subutilizat: multe echipe scriu Parquet pe căi S3 simple și nu au un metastore. Revenim la asta.

Optimizarea de bucket join

Tot motivul pentru care bucketing există e optimizarea de join. Imaginează-ți două tabele, ambele cu bucket pe user_id cu 64 de bucket-uri:

(orders.write
 .bucketBy(64, "user_id")
 .sortBy("user_id")
 .saveAsTable("warehouse.orders_bucketed"))

(users.write
 .bucketBy(64, "user_id")
 .sortBy("user_id")
 .saveAsTable("warehouse.users_bucketed"))

Acum le joinezi:

joined = (spark.table("warehouse.orders_bucketed")
          .join(spark.table("warehouse.users_bucketed"), "user_id"))

joined.explain()
# == Physical Plan ==
# *(3) Project [...]
# +- *(3) SortMergeJoin [user_id#...], [user_id#...], Inner
#    :- *(1) Sort [user_id#... ASC NULLS FIRST], false, 0
#    :  +- *(1) Filter isnotnull(user_id#...)
#    :     +- *(1) ColumnarToRow
#    :        +- FileScan parquet warehouse.orders_bucketed[...] Bucketed: true, ...
#    +- *(2) Sort [user_id#... ASC NULLS FIRST], false, 0
#       +- *(2) Filter isnotnull(user_id#...)
#          +- *(2) ColumnarToRow
#             +- FileScan parquet warehouse.users_bucketed[...] Bucketed: true, ...

Observă ce nu e în planul ăla: un nod Exchange înainte de join. Shuffle-ul a fost eliminat. Fiecare task citește bucket i din orders_bucketed și bucket i din users_bucketed: aceeași funcție de hash, același număr de bucket-uri, deci cheile potrivite sunt garantate a fi în același bucket. Sort-merge join-ul rulează local per bucket. Niciun transfer prin rețea al unei jumătăți de miliard de rânduri.

Pentru comparație, același join pe tabele fără bucket ar avea un Exchange hashpartitioning(user_id, 200) pe fiecare parte înainte de sort-merge. Acela e shuffle-ul pe care bucketing îl elimină. Pe un join real fapte-cu-fapte peste sute de milioane de rânduri, diferența e minute față de secunde.

Când se declanșează efectiv optimizarea

Trei condiții trebuie să fie adevărate ca Spark să sară peste shuffle:

  1. Ambele părți cu bucket pe aceleași coloane. Bucket pe user_id pentru o tabelă și (user_id, date) pentru cealaltă? Nu merge.
  2. Ambele părți au același număr de bucket-uri. 64 vs 64 merge. 64 vs 128 nu (deși Spark 3.x le poate uneori coalesce-a; nu te baza pe asta).
  3. Ambele părți folosesc aceeași funcție de hash. În interiorul Spark e automat; dacă partajezi date cu Hive sau alt sistem care hash-uiește diferit, optimizarea se rupe în liniște.

Dacă vreuna dintre acestea e greșită, Spark cade înapoi pe un shuffle obișnuit ca și când n-ai fi făcut bucket deloc. Nu primești o eroare, doar nu primești speedup-ul. Verifică mereu planul fizic după un bucketed join ca să confirmi că Exchange a dispărut.

E și recomandarea cu sortBy. Bucketing singur hash-partitionează rândurile; adăugând .sortBy("user_id") le scrie sortate în interiorul fiecărui bucket. Sort-merge join-urile (default-ul pentru join-uri mare-cu-mare) au nevoie ca ambele părți să fie sortate; dacă fișierele de input sunt deja sortate, Spark poate sări și peste pasul de sortare. Fără sortBy, economisești shuffle-ul dar plătești tot pentru sortare la read time. Sortează mereu când faci bucket: aceeași coloană, același efort, câștig mare.

De ce bucketing e subutilizat

Bucketing e în Spark de la 2.0 și majoritatea echipelor nu l-au folosit niciodată. Există motive reale:

Ai nevoie de un metastore. bucketBy funcționează doar cu saveAsTable. Dacă data lake-ul tău e „fișiere Parquet în S3, fără Hive metastore, doar arată Spark spre cale”, și multe sunt așa, bucketing nu e disponibil fără muncă de infrastructură.

Era doar pentru Hive. Până la Spark 3.0, Spark putea citi tabele cu bucket scrise de Hive, dar sursa de date din file system nu păstra optimizarea bucket-aware singură. Layout-ul implicit Spark nu era optimizat pentru bucket joins. Asta e reparat în 3.x dar reputația a persistat ani.

Schemele se schimbă. Numărul de bucket-uri nu poate. Asta e capcana operațională. Odată ce ai scris o tabelă cu bucket cu 64 de bucket-uri, asta e ce are. Vrei să schimbi la 128? Rescriere completă. Vrei să adaugi o coloană la specificația de bucket? Rescriere completă. Numărul de bucket-uri face parte din layout-ul fizic, nu metadata pe care o poți modifica pe loc. Pentru o tabelă de fapte de 50 TB, „rescriere completă” e o decizie de mai multe ore, cu costuri S3 de mai mulți TB.

Interacționează ciudat cu alte feature-uri. Dynamic partition overwrite + bucketing are probleme cunoscute. Unele versiuni Delta Lake nu suportă bucket joins (Iceberg are propria sa transformare de bucket). Workload-uri de streaming cu sink-uri cu bucket sunt dureroase. Fiecare „dar” face propunerea de valoare mai tulbure.

Echipele cad pe broadcast joins. Când o parte a unui join e suficient de mică să fie broadcasted, n-ai nevoie de bucketing: broadcast face aceeași treabă gratis. Lecția 27 acoperă când se aplică broadcast. Bucketing e pentru cazul ambele-părți-mari, care e mai rar decât crede lumea.

Rezultatul: bucketing stă într-o nișă unde e răspunsul potrivit poate 5% din timp, iar majoritatea echipelor nu ajung niciodată acolo.

Când este răspunsul potrivit

Cazul de utilizare clasic e o tabelă de fapte joinată repetat cu una sau două dimensiuni mari pe o cheie cu cardinalitate mare. Exemple:

  • Detectare de fraudă. O tabelă de tranzacții de 2 miliarde de rânduri joinată cu o tabelă de conturi de 200 de milioane pe account_id, rulată la fiecare 15 minute pentru scoring de fraudă în streaming.
  • Sisteme de recomandări. O tabelă de events de utilizator joinată cu features de profil de utilizator pe user_id, rulată zilnic pentru training și orar pentru serving.
  • Analytics web. O tabelă de page-views joinată cu o tabelă de sesiuni pe session_id, rulată zeci de ori în interogări downstream diferite.

Pattern-ul: aceeași cheie de join, mare pe ambele părți, rulează de multe ori pe zi. Plătește costul de bucketing o dată la write, economisește costul de shuffle la fiecare read ulterior. Chiar și la un save vs zece reads, ești în câștig.

Dacă faci join o dată pe săptămână, doar fă shuffle. Dacă o parte e mică, broadcast. Dacă cheile tale de join diferă între interogări, nu poți face bucket util (ar trebui să faci bucket o dată per coloană de join, ceea ce rareori merită).

Un exemplu rezolvat

Construiește două tabele pe care le vom joina repetat. Vom folosi un dataset mic pentru demo:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("BucketingDemo")
         .master("local[*]")
         .config("spark.sql.warehouse.dir", "/tmp/warehouse")
         .enableHiveSupport()                  # required for saveAsTable + bucketing
         .getOrCreate())

# Fact table — 1M orders
orders = spark.range(0, 1_000_000).select(
    F.col("id").alias("order_id"),
    (F.col("id") % 100_000).alias("user_id"),
    (F.rand() * 100).alias("total"),
)

# Dim table — 100K users
users = spark.range(0, 100_000).select(
    F.col("id").alias("user_id"),
    F.concat(F.lit("user_"), F.col("id")).alias("name"),
    F.lit("IT").alias("country"),
)

spark.sql("CREATE DATABASE IF NOT EXISTS demo")

# Bucket both tables on user_id, 16 buckets, sorted within bucket
(orders.write
 .mode("overwrite")
 .bucketBy(16, "user_id")
 .sortBy("user_id")
 .saveAsTable("demo.orders_bucketed"))

(users.write
 .mode("overwrite")
 .bucketBy(16, "user_id")
 .sortBy("user_id")
 .saveAsTable("demo.users_bucketed"))

Acum compară planurile de join, cu bucket vs fără:

# Bucketed join — no Exchange
b = spark.table("demo.orders_bucketed").join(
        spark.table("demo.users_bucketed"), "user_id")
b.explain()

# Unbucketed equivalent — has Exchange
u = orders.join(users, "user_id")
u.explain()

În planul cu bucket vei vedea Bucketed: true pe fiecare FileScan și niciun Exchange între scan-uri și SortMergeJoin. În planul fără bucket există un Exchange hashpartitioning(user_id, 200) pe fiecare parte. Versiunea cu bucket face același join cu semnificativ mai puțin I/O și CPU.

Unele versiuni Spark au nevoie de un config explicit ca să activeze optimizarea:

spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")

Ambele sunt implicit true pe versiuni recente, dar dacă nu vezi speedup-ul, verifică-le pe astea întâi.

Stratificarea cu partitionBy

Bucketing și partitioning nu se exclud reciproc. Le poți face pe amândouă, iar pentru o tabelă interogată după dată și joinată pe utilizator, ambele e răspunsul potrivit:

(orders.write
 .mode("overwrite")
 .partitionBy("year", "month")
 .bucketBy(64, "user_id")
 .sortBy("user_id")
 .saveAsTable("warehouse.orders"))

Layout-ul de disc devine year=2024/month=3/part-00000-bucket=17.snappy.parquet. O interogare care filtrează pe year și month primește partition pruning. O interogare care face join pe user_id primește optimizarea de bucket-join. Interogările care fac ambele, majoritatea celor interesante, primesc ambele.

Costul: ai acum (num_partitions × num_buckets) fișiere per write, ceea ce poate fi mult. Pentru 24 de luni × 64 de bucket-uri sunt 1.536 de fișiere chiar și pentru un dataset mic. Tunează numărul de bucket-uri în jos pentru dataset-uri mici, în sus pentru cele uriașe.

Rulează asta pe propria mașină

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("BucketingDemo")
         .master("local[*]")
         .config("spark.sql.warehouse.dir", "/tmp/warehouse")
         .enableHiveSupport()
         .getOrCreate())

spark.sql("CREATE DATABASE IF NOT EXISTS demo")

orders = spark.range(0, 100_000).select(
    F.col("id").alias("order_id"),
    (F.col("id") % 10_000).alias("user_id"),
    (F.rand() * 100).alias("total"),
)
users = spark.range(0, 10_000).select(
    F.col("id").alias("user_id"),
    F.concat(F.lit("user_"), F.col("id")).alias("name"),
)

(orders.write
 .mode("overwrite")
 .bucketBy(8, "user_id")
 .sortBy("user_id")
 .saveAsTable("demo.orders_b"))

(users.write
 .mode("overwrite")
 .bucketBy(8, "user_id")
 .sortBy("user_id")
 .saveAsTable("demo.users_b"))

# Bucketed join — confirm no Exchange in the plan
spark.table("demo.orders_b").join(spark.table("demo.users_b"), "user_id").explain()

# For comparison — DataFrame join with shuffle
orders.join(users, "user_id").explain()

# Listing files shows the bucket layout
import os
for path, _, files in os.walk("/tmp/warehouse/demo.db/orders_b"):
    for f in files:
        if f.endswith(".parquet"):
            print(os.path.join(path, f))

Compară cele două output-uri .explain(). Cel cu bucket zice Bucketed: true și nu are Exchange. Cel fără bucket are shuffle-ul. Asta e tot rostul capitolului, în două planuri fizice.

Cu asta se închide Modulul 6. Ai acum un model mental complet pentru partitioning pe ambele axe, in-memory și pe disc, plus optimizarea de la join time pe care bucketing o cumpără. Modulul 7 începe cu lecția următoare și merge un nivel mai adânc: Spark SQL și Catalyst, optimizator-ul care transformă codul tău DataFrame în planurile fizice pe care le-am citit. Odată ce înțelegi cum rescrie Catalyst codul tău, poți prezice (și influența) majoritatea acestor decizii înainte să apeși run.


Referințe: Documentația Apache Spark SQL data sources (https://spark.apache.org/docs/latest/sql-data-sources.html) și articole de inginerie Databricks despre bucketing și Delta Lake. Consultat 2026-05-01.

Caută