Catalyst rescrie interogarea ta în cel mai bun plan pe care îl poate găsi. Asta e jumătate din poveste. Cealaltă jumătate e cum rulează Spark acel plan, iar răspunsul s-a schimbat dramatic în 2015 când a aterizat Project Tungsten. RDD Spark și DataFrame Spark sunt tehnic același motor, dar în practică se comportă ca produse diferite, iar Tungsten e majoritatea motivului. Lecția asta e despre ce face de fapt Tungsten, de ce codul DataFrame îl bate atât de des pe codul RDD cu 5-10x și setul mic de markeri de plan pe care ar trebui să înveți să-i recunoști.
Ce a venit Tungsten să repare
Până în 2015, Spark avea o problemă pe care o aveau multe motoare in-memory: era blocat pe JVM. CPU-urile deveniseră dramatic mai rapide, lățimea de bandă a RAM-ului se îmbunătățise, dar codul bazat pe JVM nu profita. Blocajele erau specifice:
Overhead de memorie. O valoare Long într-un rând Object[] JVM ia 16 octeți pentru Long-ul boxed plus 8 octeți pentru pointer, plus padding, să zicem 32 de octeți, ca să stocheze 8 octeți de date efective. Un rând de zece long-uri ocupa 320 de octeți când datele erau 80. Pentru un sistem care se presupune că e in-memory, asta era catastrofal.
Garbage collection. Execuția RDD din Spark aloca milioane de obiecte Java mici per task. Timpii de pauză GC care rezultau dominau timpul executorului pe workload-uri mari. Tunarea GC-ului JVM era o parte permanentă din rularea Spark în producție și niciodată n-a funcționat cu adevărat bine.
Pattern-uri de acces neprietenoase cu cache-ul. Obiectele Java trăiesc oriunde le-a pus alocatorul de heap, ceea ce înseamnă că iterarea peste un „rând” e o secvență de pointer chases, fiecare un cache miss probabil L2 sau L3. CPU-urile petreceau majoritatea timpului așteptând memorie.
Overhead de virtual function call între operatori. Fiecare operator în physical plan era un obiect separat cu o metodă iterator.next(). Ca să producă un rând din rezultatul final, motorul apela next() pe operatorul de sus, care apela next() pe copilul lui și așa mai departe în jos pe arbore. Fiecare apel era un virtual dispatch pe care JIT-ul nu putea mereu să-l facă inline. Pentru un milion de rânduri printr-o etapă cu cinci operatori, ăștia sunt cinci milioane de apeluri virtuale.
Tungsten le-a atacat pe toate patru. Rezultatul e motorul pe care l-ai folosit.
Piesa 1: memorie off-heap gestionată și UnsafeRow
Tungsten a introdus un format de rând binar numit UnsafeRow. În loc de Object[], un rând e dispus ca un header de mărime fixă (null bitmap, offset-uri de câmp) urmat de valori binare strâns împachetate. Câmpurile cu lățime fixă precum int și double merg inline; câmpurile cu lungime variabilă precum string-urile își stochează datele într-o coadă contiguă. Un rând de zece long-uri ia 88 de octeți, cei 80 de octeți de date plus un mic header, în loc de 320.
Acest format trăiește în memorie pe care Spark o gestionează singur, adesea off-heap (configurat prin spark.memory.offHeap.enabled și spark.memory.offHeap.size), alocată în plăci mari pe care Spark le împarte. Garbage collection nu o atinge. Nu există boxing: o coloană de int e cu adevărat patru octeți per valoare.
Nu interacționezi direct cu UnsafeRow în Python. Vezi efectele lui: presiune de memorie dramatic mai scăzută per executor, fără bătăi de cap de tunare GC și capacitatea de a păstra sute de milioane de rânduri în memorie pe o singură mașină fără să cadă.
Piesa 2: calcul conștient de cache
Odată ce datele sunt în această formă binară densă, le poți așeza pentru cache-ul CPU. Tungsten plasează rândurile unei partiții în regiuni de memorie contiguă. Iterarea unei partiții devine o plimbare secvențială peste un buffer, ceea ce iubește prefetcher-ul CPU. Compară cu RDD Spark, unde iterarea unei partiții însemna parcurgerea unei liste de obiecte Java împrăștiate pe heap.
Pentru citiri columnare (Parquet, ORC) câștigul e și mai mare. În loc să materializeze rândurile imediat, Spark citește chunks de coloană în loturi columnare gestionate de Tungsten, de obicei 4.096 de valori dintr-o singură coloană, împachetate împreună. Operațiile care ating o coloană pe rând, filtre, proiecții, agregări, citesc un loop strâns pe un buffer, rămân în cache L2 și permit unităților SIMD ale CPU-ului să facă efectiv muncă. Un cititor Parquet vectorizat poate fi de 5-10x mai rapid decât un cititor rând-cu-rând pe același fișier.
Vezi asta în planuri ca noduri ColumnarToRow: marchează granița unde Spark trece de la procesare columnară pe loturi (folosită în scanări și unii operatori) la procesare rând-cu-rând (folosită în alții). Spark modern face o cantitate tot mai mare de muncă în mod columnar pur, iar ColumnarToRow apare mai târziu în plan decât obișnuia.
Piesa 3: whole-stage code generation
Asta e capul de afiș. În loc să execute physical plan-ul ca un arbore de obiecte operator, Tungsten generează bytecode Java la runtime care fuzionează operatorii adiacenți într-un singur loop strâns, apoi îl JIT-compilează.
O etapă care scanează, filtrează, proiectează și agregă în vechiul model are patru operatori cu patru iteratori și patru apeluri virtuale next() per rând. Whole-stage codegen rescrie asta în aproximativ:
while (input.hasNext()) {
UnsafeRow row = input.next();
// Filtru inline-uit
if (row.getDouble(2) <= 40.0) continue;
// Proiectie inline-uita
long userId = row.getLong(1);
double amount = row.getDouble(2);
// Agregare partiala inline-uita
aggBuffer.update(...);
}
O metodă, un loop, fără virtual dispatch, prietenos cu JIT-ul. JIT-ul poate scoate verificările de null, vectoriza părți și inline-ui agresiv. Rezultatul e adesea în limita a 2-3x față de C strâns scris de mână, pe Java.
Semnătura vizibilă în physical plan e asteriscul:
*(2) HashAggregate(keys=[country#X], functions=[partial_sum(amount#Y)])
+- *(2) Project [country#X, amount#Y]
+- *(2) Filter (amount#Y > 40)
+- *(2) Scan ExistingRDD[user_id#A, amount#Y, country#X]
Fiecare operator cu *(2) e parte din codul generat al etapei 2. Numărul e id-ul stage-ului de codegen. Când vezi un subarbore contiguu de operatori *(N) cu același N, ăla e un singur loop fuzionat.
Când asteriscul lipsește, codegen-ul nu s-a activat. Cele mai comune motive:
- Un Python UDF obișnuit în plan. Nodul
BatchEvalPythonși tot ce e în jurul lui nu pot fi fuzionate cu codegen: Spark trebuie să rupă etapa ca să trimită rândurile workerului Python. Lecția 40 a spus că UDF-urile sunt scumpe; ăsta e motivul cel mai profund. - Operatori care nu suportă codegen. O mână de operatori fizici (unele variante de join, unele căi de agregare, unele funcții de fereastră) nu au implementări de codegen. Spark cade pe iterația în stil volcano de altădată.
- Codul generat a depășit limita de mărime de bytecode JVM. JVM-ul limitează o singură metodă la 64KB de bytecode. Pentru scheme foarte largi (sute de coloane) sau expresii generate foarte mari, codegen-ul Spark poate trece de asta. Când se întâmplă, Spark prinde eroarea și cade pe calea interpretată. Vei vedea asta în log-urile executorului ca un avertisment despre
JaninoRuntimeExceptionsau „method too large.” Dacă o vezi repetat, soluția e să-ți spargi interogarea în etape mai mici sau să reduci numărul de coloane.
Există un buton de debug:
spark.conf.set("spark.sql.codegen.wholeStage", "false")
Asta dezactivează whole-stage codegen pentru sesiune. Aproape nimeni n-ar trebui vreodată să-l setezi. Singurul caz în care e util e depanarea unui bug suspectat de codegen: dezactivează-l, vezi dacă comportamentul tău ciudat dispare, depune un bug report Spark. În operare normală ar trebui să fie pornit, iar pe versiuni recente de Spark e implicit pornit.
Cititori vectorizați
Strâns legat de Tungsten e cititorul columnar vectorizat pentru Parquet și ORC. În loc să decodifice un rând odată, cititorul scoate un lot de valori dintr-o coloană, aplică decodarea (RLE, dictionary) pe întregul lot și predă un buffer columnar următorului operator. Filtrele care se potrivesc cu forma columnară rulează și ele vectorizat.
Configurațiile relevante:
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true") # implicit true
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true") # implicit true
Ambele sunt implicit true. Motivul pentru care merită să știi că există e depanarea: dacă vezi un nod Scan parquet care alimentează un ColumnarToRow foarte devreme în plan, citirea vectorizată e pornită. Dacă vezi citiri simple rând-cu-rând, ceva e dezactivat, de obicei din cauza unui tip nesuportat (unele tipuri imbricate complexe) sau a unei suprascrieri de config.
Un benchmark concret
Ia o agregare simplă și compară RDD vs DataFrame pe aceleași date. Numerele sunt ilustrative, nu benchmark-uri pe care să le citezi, dar forma e ce a văzut fiecare echipă prima dată când au încercat asta:
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("TungstenDemo").master("local[*]").getOrCreate()
# 50M de randuri, doua coloane
df = spark.range(0, 50_000_000).select(
(F.col("id") % 1000).alias("group"),
(F.rand() * 100).alias("value"),
)
# Materializeaza o data ca sa fie costul in agregare, nu in generare
df.write.mode("overwrite").parquet("/tmp/tungsten_demo")
df = spark.read.parquet("/tmp/tungsten_demo")
# Calea DataFrame: codegen, citire vectorizata, loturi columnare
t0 = time.time()
df.groupBy("group").agg(F.sum("value")).collect()
print("DataFrame:", time.time() - t0, "seconds")
# Calea RDD: fara codegen, fara beneficii UnsafeRow, obiecte Python
t0 = time.time()
(df.rdd
.map(lambda r: (r["group"], r["value"]))
.reduceByKey(lambda a, b: a + b)
.collect())
print("RDD:", time.time() - t0, "seconds")
Pe laptopul meu varianta DataFrame rulează în aproximativ 3 secunde, iar varianta RDD în aproximativ 30. Aproape întreaga diferență e Tungsten: codegen-ul fuzionând filter/project/partial-agg, cititorul Parquet vectorizat, loturile columnare, lipsa overhead-ului de obiect Python în JVM-ul executorului. Query plan-ul spune aceeași poveste:
df.groupBy("group").agg(F.sum("value")).explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- HashAggregate(keys=[group#X], functions=[sum(value#Y)])
# +- Exchange hashpartitioning(group#X, 200), ENSURE_REQUIREMENTS
# +- *(1) HashAggregate(keys=[group#X], functions=[partial_sum(value#Y)])
# +- *(1) ColumnarToRow
# +- FileScan parquet [group#X, value#Y]
# Batched: true,
# PushedFilters: [],
# ReadSchema: struct<group:bigint,value:double>
*(1) acoperă întreaga muncă de dinainte de shuffle: scan, columnar-to-row, agregare parțială, totul fuzionat. Batched: true confirmă citirea vectorizată. Shuffle-ul e singurul lucru care nu e în codegen, iar asta pentru că traversarea executorilor nu e fundamental o problemă de fuziune de cod per rând.
Varianta RDD nu are nimic din asta. Fiecare rând trece în Python (pentru că lambda e Python) și înapoi, reducerea parțială rulează în Python, iar optimizatorul nu vede nimic ce ar putea rescrie. E o conductă pură de operații, fără fuziune, fără batching, fără codegen.
De aceea aproape fiecare întrebare „ar trebui să folosesc RDD-uri?” are același răspuns: nu. Performanța motorului DataFrame vine din Tungsten și Catalyst lucrând împreună, iar le pierzi pe amândouă în momentul în care cobori la RDD.
Ce să reții
Tungsten înseamnă trei lucruri, memorie off-heap gestionată cu formatul binar UnsafeRow, layout-uri columnare conștiente de cache și whole-stage codegen, și ele explică majoritatea motivului pentru care Spark cu DataFrame e rapid. Citește query plan-uri pentru prefixele *(N) ca să confirmi că codegen-ul se angajează; asteriscurile lipsă sunt semnalul de alarmă zgomotos că ceva (de obicei un UDF) sparge fuziunea. Citirile vectorizate Parquet/ORC sunt drogul de inițiere: column pruning-ul și predicate pushdown-ul pe care le primești de la Catalyst alimentează direct în citirile columnare pe loturi, care alimentează direct în execuția fuzionată de codegen. Întreaga stivă e proiectată să mențină datele dense curgând prin loop-uri strânse pe CPU-uri moderne.
Asta închide Modulul 7. Acum poți citi un plan Spark, prezice ce va face optimizatorul cu codul tău și recunoaște când superputerile motorului se angajează sau nu. Modulul 8 începe lecția următoare și se întoarce spre exterior: surse de date. Parquet, ORC, Avro, JDBC, metastore-ul Hive, Delta Lake, Iceberg, ce sunt, prin ce diferă și care se potrivește workload-ului tău.
Surse: „Project Tungsten: Bringing Spark Closer to Bare Metal” (https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html) și ghidul Apache Spark SQL performance tuning (https://spark.apache.org/docs/latest/sql-performance-tuning.html). Consultat 2026-05-01.