Abbiamo visto entrambi i concetti di partitioning: le partizioni in memoria controllate da repartition e coalesce, e il partitioning su disco via partitionBy. Entrambi sono ottimi quando la colonna che ti interessa ha bassa cardinalità: year, month, country. Entrambi crollano quando la colonna è ad alta cardinalità: user_id, account_id, transaction_id.
Ma le colonne ad alta cardinalità sono esattamente quelle su cui fai join. Se la tua fact table ha 500 milioni di ordini e la unisci a una tabella di dimensione su user_id dieci volte al giorno, ognuno di quei join innesca uno shuffle di mezzo miliardo di righe. Anche con partizioni in salute e nessuno skew, stai pagando decine di secondi di lavoro di shuffle per query.
C’è un terzo strumento, ed è quello che quasi nessuno usa: il bucketing. Questa lezione parla del perché il bucketing esiste, di cosa fa, di quando vale il costo operativo, e del footgun che becca tutti la prima volta.
L’idea
Quando fai il bucketing di una tabella, in fase di scrittura dici a Spark di hash-partizionare ogni partizione di input in un numero fisso di bucket, e di registrare la spec del bucket (la colonna, il numero di bucket, la funzione di hash) nei metadati della tabella.
(orders
.write
.bucketBy(64, "user_id")
.sortBy("user_id") # optional but recommended
.saveAsTable("warehouse.orders_bucketed"))
Due cose sono cambiate rispetto a una normale scrittura partitionBy:
- I file di output sono nominati per bucket. Ogni task che scrive dati produce 64 file:
part-00000-...-bucket=0.snappy.parquet,bucket=1, …,bucket=63. Ogni riga il cuihash(user_id) % 64 == 17finisce inbucket=17, indipendentemente dalla partizione di input da cui veniva. - Hai usato
saveAsTable, nonsave. Questo non è negoziabile per il bucketing. La spec del bucket vive nel metastore (Hive metastore, AWS Glue, Databricks Unity Catalog, qualunque cosa tu abbia configurato). Spark la cerca lì in fase di lettura. Senza una tabella backata da metastore, il layout è solo un mucchio di file Parquet su disco e Spark non ha modo di sapere cosa significhino.
Quel secondo punto è il motivo principale per cui il bucketing è sottoutilizzato: un sacco di team scrivono Parquet su path S3 nudi e non hanno un metastore. Ci torneremo.
L’ottimizzazione del bucket join
L’intero motivo per cui il bucketing esiste è l’ottimizzazione dei join. Immagina due tabelle, entrambe con bucket su user_id con 64 bucket:
(orders.write
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("warehouse.orders_bucketed"))
(users.write
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("warehouse.users_bucketed"))
Adesso le unisci:
joined = (spark.table("warehouse.orders_bucketed")
.join(spark.table("warehouse.users_bucketed"), "user_id"))
joined.explain()
# == Physical Plan ==
# *(3) Project [...]
# +- *(3) SortMergeJoin [user_id#...], [user_id#...], Inner
# :- *(1) Sort [user_id#... ASC NULLS FIRST], false, 0
# : +- *(1) Filter isnotnull(user_id#...)
# : +- *(1) ColumnarToRow
# : +- FileScan parquet warehouse.orders_bucketed[...] Bucketed: true, ...
# +- *(2) Sort [user_id#... ASC NULLS FIRST], false, 0
# +- *(2) Filter isnotnull(user_id#...)
# +- *(2) ColumnarToRow
# +- FileScan parquet warehouse.users_bucketed[...] Bucketed: true, ...
Nota cosa non c’è in quel piano: un nodo Exchange prima del join. Lo shuffle è stato eliminato. Ogni task legge il bucket i da orders_bucketed e il bucket i da users_bucketed: stessa funzione di hash, stesso numero di bucket, quindi le chiavi corrispondenti sono garantite essere nello stesso bucket. Il sort-merge join gira localmente per bucket. Nessun trasferimento di rete di mezzo miliardo di righe.
Per confronto, lo stesso join su tabelle senza bucket avrebbe un Exchange hashpartitioning(user_id, 200) su ogni lato prima del sort-merge. Quello è lo shuffle che il bucketing elimina. Su un join fact-to-fact reale su centinaia di milioni di righe, la differenza è minuti contro secondi.
Quando l’ottimizzazione si attiva davvero
Tre condizioni devono essere vere perché Spark salti lo shuffle:
- Entrambi i lati con bucket sulle stesse colonne. Bucket su
user_idper una tabella e su(user_id, date)per l’altra? Niente da fare. - Entrambi i lati hanno lo stesso numero di bucket. 64 vs 64 funziona. 64 vs 128 no (anche se Spark 3.x a volte può fare il coalesce; non contarci).
- Entrambi i lati usano la stessa funzione di hash. Dentro Spark è automatico; se stai condividendo dati con Hive o un altro sistema che hasha diversamente, l’ottimizzazione si rompe in silenzio.
Se una qualunque di queste è sbagliata, Spark torna a uno shuffle normale come se non avessi fatto il bucketing. Non ottieni un errore, semplicemente non ottieni lo speedup. Controlla sempre il piano fisico dopo un join con bucket per confermare che l’Exchange sia sparito.
C’è anche la raccomandazione di sortBy. Il bucketing da solo hash-partiziona le righe; aggiungere .sortBy("user_id") le scrive ordinate dentro ogni bucket. I sort-merge join (il default per i join big-to-big) hanno bisogno di entrambi i lati ordinati; se i file di input sono già ordinati, Spark può saltare anche lo step di sort. Senza sortBy, risparmi lo shuffle ma paghi comunque il sort in fase di lettura. Ordina sempre quando fai il bucketing: stessa colonna, stesso sforzo, grosso guadagno.
Perché il bucketing è sottoutilizzato
Il bucketing è in Spark dalla 2.0 e la maggior parte dei team non l’ha mai usato. Ci sono motivi reali:
Ti serve un metastore. bucketBy funziona solo con saveAsTable. Se il tuo data lake è “file Parquet su S3, nessun Hive metastore, basta puntare Spark al path”, e molti lo sono, il bucketing non è disponibile senza lavoro di infrastruttura.
Era solo Hive. Fino a Spark 3.0, Spark poteva leggere tabelle bucketate scritte da Hive, ma la data source di file system non preservava l’ottimizzazione bucket-aware da sola. Il layout Spark di default non era ottimizzato per i bucket join. È sistemato in 3.x ma la reputazione si è trascinata per anni.
Gli schemi cambiano. I numeri di bucket no. Questo è il footgun operativo. Una volta scritta una tabella con 64 bucket, è quello che ha. Vuoi cambiare a 128? Riscrittura completa. Vuoi aggiungere una colonna alla bucket spec? Riscrittura completa. Il numero di bucket fa parte del layout fisico, non sono metadati che puoi alterare sul posto. Per una fact table da 50 TB, “riscrittura completa” è una decisione da molte ore e molti TB di costo S3.
Interagisce in modo strano con altre feature. L’overwrite dinamico delle partizioni + bucketing ha problemi noti. Alcune versioni di Delta Lake non supportano i bucket join (Iceberg ha la sua propria bucket transform). I workload streaming con sink bucketati sono dolorosi. Ogni “ma” rende la value proposition più torbida.
I team vanno di default sui broadcast join. Quando un lato del join è abbastanza piccolo da fare broadcast, non ti serve il bucketing: il broadcast fa lo stesso lavoro gratis. La lezione 27 copre quando il broadcast si applica. Il bucketing è per il caso entrambi-i-lati-grossi, che è più raro di quanto la gente pensi.
Il risultato: il bucketing sta in una nicchia in cui è la risposta giusta forse il 5% delle volte, e la maggior parte dei team non ci arriva mai.
Quando è la risposta giusta
Il caso d’uso classico è una fact table unita ripetutamente a una o due grosse dimensioni su una chiave ad alta cardinalità. Esempi:
- Rilevamento frodi. Una tabella di transazioni da 2 miliardi di righe unita a una tabella di account da 200 milioni di righe su
account_id, eseguita ogni 15 minuti per scoring frodi in streaming. - Sistemi di raccomandazione. Una tabella di eventi utente unita a feature di profilo utente su
user_id, eseguita giornalmente per il training e oraria per il serving. - Web analytics. Una tabella di page-views unita a una tabella di sessioni su
session_id, eseguita decine di volte attraverso diverse query a valle.
Il pattern: stessa chiave di join, grosso su entrambi i lati, gira molte volte al giorno. Paga il costo del bucketing una volta in scrittura, risparmia il costo dello shuffle a ogni lettura successiva. Anche con un solo save contro dieci read, sei in vantaggio.
Se fai il join una volta a settimana, fa’ lo shuffle e basta. Se un lato è piccolo, fa’ broadcast. Se le tue chiavi di join differiscono tra le query, non puoi fare bucketing in modo utile (dovresti fare bucketing una volta per ogni colonna di join, raramente ne vale la pena).
Un esempio pratico
Costruiamo due tabelle che uniremo ripetutamente. Useremo un dataset piccolo per la demo:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("BucketingDemo")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/warehouse")
.enableHiveSupport() # required for saveAsTable + bucketing
.getOrCreate())
# Fact table — 1M orders
orders = spark.range(0, 1_000_000).select(
F.col("id").alias("order_id"),
(F.col("id") % 100_000).alias("user_id"),
(F.rand() * 100).alias("total"),
)
# Dim table — 100K users
users = spark.range(0, 100_000).select(
F.col("id").alias("user_id"),
F.concat(F.lit("user_"), F.col("id")).alias("name"),
F.lit("IT").alias("country"),
)
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
# Bucket both tables on user_id, 16 buckets, sorted within bucket
(orders.write
.mode("overwrite")
.bucketBy(16, "user_id")
.sortBy("user_id")
.saveAsTable("demo.orders_bucketed"))
(users.write
.mode("overwrite")
.bucketBy(16, "user_id")
.sortBy("user_id")
.saveAsTable("demo.users_bucketed"))
Adesso confronta i piani di join, bucketato contro non bucketato:
# Bucketed join — no Exchange
b = spark.table("demo.orders_bucketed").join(
spark.table("demo.users_bucketed"), "user_id")
b.explain()
# Unbucketed equivalent — has Exchange
u = orders.join(users, "user_id")
u.explain()
Nel piano bucketato vedrai Bucketed: true su ogni FileScan e nessun Exchange tra le scan e il SortMergeJoin. Nel piano non bucketato c’è un Exchange hashpartitioning(user_id, 200) su ogni lato. La versione bucketata sta facendo lo stesso join con I/O e CPU sensibilmente inferiori.
Alcune versioni di Spark hanno bisogno di una config esplicita per abilitare l’ottimizzazione:
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
spark.conf.set("spark.sql.sources.bucketing.autoBucketedScan.enabled", "true")
Entrambe sono true di default sulle versioni recenti, ma se non vedi lo speedup, controlla queste per prima cosa.
Stratificare con partitionBy
Bucketing e partitioning non sono mutuamente esclusivi. Puoi fare entrambi, e per una tabella interrogata per data e unita per utente, entrambi sono la risposta giusta:
(orders.write
.mode("overwrite")
.partitionBy("year", "month")
.bucketBy(64, "user_id")
.sortBy("user_id")
.saveAsTable("warehouse.orders"))
Il layout su disco diventa year=2024/month=3/part-00000-bucket=17.snappy.parquet. Una query che filtra su year e month ottiene il partition pruning. Una query che fa join su user_id ottiene l’ottimizzazione di bucket join. Le query che fanno entrambe le cose, la maggior parte di quelle interessanti, ottengono entrambe.
Il costo: adesso hai (num_partitions × num_buckets) file per scrittura, che possono essere tanti. Per 24 mesi × 64 bucket, sono 1.536 file anche per un dataset piccolo. Riduci il numero di bucket per dataset piccoli, alzalo per quelli enormi.
Falla girare sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("BucketingDemo")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/warehouse")
.enableHiveSupport()
.getOrCreate())
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
orders = spark.range(0, 100_000).select(
F.col("id").alias("order_id"),
(F.col("id") % 10_000).alias("user_id"),
(F.rand() * 100).alias("total"),
)
users = spark.range(0, 10_000).select(
F.col("id").alias("user_id"),
F.concat(F.lit("user_"), F.col("id")).alias("name"),
)
(orders.write
.mode("overwrite")
.bucketBy(8, "user_id")
.sortBy("user_id")
.saveAsTable("demo.orders_b"))
(users.write
.mode("overwrite")
.bucketBy(8, "user_id")
.sortBy("user_id")
.saveAsTable("demo.users_b"))
# Bucketed join — confirm no Exchange in the plan
spark.table("demo.orders_b").join(spark.table("demo.users_b"), "user_id").explain()
# For comparison — DataFrame join with shuffle
orders.join(users, "user_id").explain()
# Listing files shows the bucket layout
import os
for path, _, files in os.walk("/tmp/warehouse/demo.db/orders_b"):
for f in files:
if f.endswith(".parquet"):
print(os.path.join(path, f))
Confronta i due output di .explain(). Quello bucketato dice Bucketed: true e non ha Exchange. Quello non bucketato ha lo shuffle. Quello è tutto il punto del capitolo, in due piani fisici.
Questo chiude il Modulo 6. Adesso hai un modello mentale completo per il partitioning su entrambi gli assi, in memoria e su disco, più l’ottimizzazione in fase di join che il bucketing ti compra. Il Modulo 7 inizia con la prossima lezione e va un livello più in profondità: Spark SQL e Catalyst, l’ottimizzatore che trasforma il tuo codice DataFrame nei piani fisici che abbiamo letto. Una volta capito come Catalyst riscrive il tuo codice, puoi prevedere (e influenzare) la maggior parte di queste decisioni prima di premere run.
Riferimenti: documentazione di Apache Spark SQL sulle data source (https://spark.apache.org/docs/latest/sql-data-sources.html) e post di engineering di Databricks su bucketing e Delta Lake. Recuperato il 2026-05-01.