PySpark, de la zero Lecția 41 / 60

Catalyst: creierul din spatele fiecarui DataFrame

Cum transforma Spark codul tau intr-un query plan, cele patru faze de optimizare si cum sa citesti .explain(True).

Fiecare operație DataFrame pe care ai scris-o în acest curs a trecut prin Catalyst. Fiecare .filter, .join, .groupBy, fiecare expresie de coloană, fiecare string de Spark SQL: Catalyst le-a văzut pe toate, le-a rescris pe majoritatea și a decis cum să le ruleze efectiv. Motivul pentru care Spark cu DataFrame e cu mult mai rapid decât Spark cu RDD e în mare parte Catalyst (cu Tungsten făcând restul, despre care e lecția următoare).

Până acum am dat din mână vag cu „Spark își dă seama de planul corect”. E momentul să ne uităm cum. Odată ce poți citi un query plan, poți prezice performanța din cod, poți depana surprize și poți scrie Spark cu intenție în loc să speri.

Ce este Catalyst

Catalyst e optimizatorul de interogări al Spark, scris în Scala și livrat ca parte din spark-catalyst. E stratul dintre „codul tău DataFrame” și „task-urile care rulează pe executori”. De fiecare dată când apelezi o acțiune, .show(), .count(), .write(), Catalyst ia operațiile pe care le-ai descris, le transformă prin patru faze și predă rezultatul motorului de execuție Spark.

Fazele:

  1. Parsed logical plan: codul tău, sintactic valid, încă fără verificări semantice.
  2. Analyzed logical plan: referințele de coloană sunt rezolvate față de schemă; tipurile sunt verificate; totul e concret.
  3. Optimized logical plan: rescrieri bazate pe reguli: predicate pushdown, column pruning, constant folding, projection collapse, join reordering, zeci altele.
  4. Physical plan: alegeri de strategie efectivă de execuție: care algoritm de join, care implementare de scan, care strategie de exchange.

În Spark modern există și un strat runtime peste faza 4, Adaptive Query Execution, care ajustează physical plan-ul în mijlocul interogării pe baza statisticilor de shuffle observate. Vom ajunge acolo la final.

Întreaga conductă rulează de fiecare dată când declanșezi o acțiune, iar costul e neglijabil față de rularea efectivă a interogării.

O interogare mică și planurile ei

Hai să luăm ceva concret. Două tabele, un filtru, o agregare, un join.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("CatalystDemo").master("local[*]").getOrCreate()

orders = spark.createDataFrame(
    [(1, 100, 50.0, "2024-01-01"),
     (2, 100, 75.0, "2024-01-02"),
     (3, 200, 30.0, "2024-01-03")],
    ["order_id", "user_id", "amount", "dt"]
)

users = spark.createDataFrame(
    [(100, "alice", "IT"),
     (200, "bob", "DE"),
     (300, "carol", "FR")],
    ["user_id", "name", "country"]
)

q = (orders
     .filter(F.col("amount") > 40)
     .join(users, "user_id")
     .groupBy("country")
     .agg(F.sum("amount").alias("total"))
     .filter(F.col("country") == "IT"))

q.explain(True)

.explain(True) afișează toate cele patru faze. Hai să parcurgem ce arată fiecare.

Faza 1: parsed logical plan

== Parsed Logical Plan ==
'Filter ('country = IT)
+- Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
   +- Join Inner, (user_id#A = user_id#B)
      :- Filter (amount#Y > 40)
      :  +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
      +- LogicalRDD [user_id#B, name#U, country#X], false

Ăsta e codul tău, reflectat fidel ca arbore. Observă apostroafele, 'Filter, 'country: marchează referințe nerezolvate. La momentul parsării, Spark încă nu știe dacă country există sau ce tip are. E doar un nume. Frunzele LogicalRDD sunt placeholder-e pentru DataFrame-urile de intrare.

Faza asta e despre validitatea sintactică: ai folosit operații care există, cu aritatea corectă? O greșeală de tipar într-un nume de metodă pică aici. O greșeală de tipar într-un nume de coloană nu pică: aceea e treaba fazei următoare.

Faza 2: analyzed logical plan

== Analyzed Logical Plan ==
country: string, total: double
Filter (country#X = IT)
+- Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
   +- Project [user_id#A, order_id#W, amount#Y, dt#V, name#U, country#X]
      +- Join Inner, (user_id#A = user_id#B)
         :- Filter (amount#Y > 40)
         :  +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
         +- LogicalRDD [user_id#B, name#U, country#X], false

Apostroafele au dispărut. Fiecare coloană are un exprId unic (numerele #X, #Y), prin care Catalyst urmărește identitatea pe parcursul rescrierilor ulterioare. Schema de output e tipărită sus.

Analizatorul e și locul în care afli că ai greșit. Referențiezi o coloană care nu există, încerci să compari un string cu un struct, alias către un nume care există deja în scope: toate aceste erori ies din această fază, cu mesajul „cannot resolve foo given input columns …”. Când oamenii se plâng că „erorile Spark sunt ilizibile”, de obicei se referă la erori de analizator, iar trucul e să citești planul ca să vezi unde s-a stricat efectiv rezoluția.

Faza 3: optimized logical plan

Aici Catalyst își câștigă pâinea:

== Optimized Logical Plan ==
Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
+- Project [amount#Y, country#X]
   +- Join Inner, (user_id#A = user_id#B)
      :- Project [user_id#A, amount#Y]
      :  +- Filter ((isnotnull(amount#Y) AND (amount#Y > 40)) AND isnotnull(user_id#A))
      :     +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
      +- Project [user_id#B, country#X]
         +- Filter ((country#X = IT) AND isnotnull(country#X) AND isnotnull(user_id#B))
            +- LogicalRDD [user_id#B, name#U, country#X], false

Compară-l cu planul analizat. Mai multe lucruri s-au schimbat:

  • Filter (country = IT) exterior din codul original a fost împins în jos prin agregare și join, până la scanarea users. Optimizatorul a observat că country vine doar din users și că filtrarea ei înainte de join e logic echivalentă și fizic mult mai ieftină.
  • Deasupra fiecărui scan vezi acum noduri Project care selectează doar coloanele de care are nevoie restul planului. order_id, dt, name, niciodată referențiate după scan, sunt aruncate imediat. Asta e column pruning.
  • Filtre isnotnull au fost adăugate implicit. Inner join-urile pe chei null nu se potrivesc niciodată, așa că Catalyst inserează filtrul de null ca să reducă input-ul join-ului. Câștig gratis.
  • Ordinea originală dintre filter(amount > 40) și join a fost păstrată aici, dar în planuri mai mari Catalyst va reordona filtrele și proiecțiile ca să împingă cât mai multă muncă spre frunze.

Lista regulilor optimizatorului e lungă: PushDownPredicates, ColumnPruning, ConstantFolding, ReorderJoin, EliminateOuterJoin, CollapseProject, zeci altele. Nu trebuie să le memorezi. Trebuie să recunoști efectele lor în planul optimizat și să ai încredere că optimizatorul face alegeri rezonabile.

Partea cost-based (CBO), introdusă în Spark 2.2, intervine când ai colectat statistici pe un tabel (ANALYZE TABLE ... COMPUTE STATISTICS). Cu statistici, Catalyst poate estima numărul de rânduri în fiecare nod și să aleagă ordinea de join mai bună, să decidă când o parte e suficient de mică pentru broadcast, să aleagă din două planuri echivalente pe cel mai ieftin. Fără statistici cade pe euristici. Cele mai multe echipe de producție nu rulează niciodată ANALYZE, ceea ce înseamnă că majoritatea deciziilor optimizatorului sunt euristice. Merită știut: CBO devine vizibil mai bun când îi dai statistici.

Faza 4: physical plan

== Physical Plan ==
*(5) HashAggregate(keys=[country#X], functions=[sum(amount#Y)])
+- Exchange hashpartitioning(country#X, 200), ENSURE_REQUIREMENTS
   +- *(4) HashAggregate(keys=[country#X], functions=[partial_sum(amount#Y)])
      +- *(4) Project [amount#Y, country#X]
         +- *(4) BroadcastHashJoin [user_id#A], [user_id#B], Inner, BuildRight
            :- *(4) Project [user_id#A, amount#Y]
            :  +- *(4) Filter ((isnotnull(amount#Y) AND (amount#Y > 40)) AND isnotnull(user_id#A))
            :     +- *(4) Scan ExistingRDD[order_id#W, user_id#A, amount#Y, dt#V]
            +- BroadcastExchange HashedRelationBroadcastMode(...), [plan_id=...]
               +- *(2) Project [user_id#B, country#X]
                  +- *(2) Filter ((country#X = IT) AND isnotnull(country#X) AND isnotnull(user_id#B))
                     +- *(2) Scan ExistingRDD[user_id#B, name#U, country#X]

Fiecare operator logic a fost mapat la o implementare fizică. Trei lucruri de citit aici:

Numele operatorilor îți spun algoritmul. BroadcastHashJoin pentru că users era mic după filtru: Catalyst a ales să-l difuzeze (broadcast) în loc să-l facă shuffle. Dacă ambele părți erau mari, ai vedea SortMergeJoin, cu două noduri Exchange hashpartitioning(...) precedându-l. HashAggregate e un GROUP BY bazat pe hash; pentru interogări ordonate sau cu rollup vezi uneori SortAggregate în schimb. BroadcastNestedLoopJoin e antipatternul: apare când ai scris un join pe care Catalyst nu-l poate exprima altfel (un join non-equi pe date mari, de obicei) și aproape întotdeauna înseamnă că ar trebui să rescrii.

Nodurile Exchange sunt marcatorii de shuffle. Oriunde vezi Exchange, Spark va scrie fișiere de shuffle și le va citi înapoi. HashAggregate de aici are un Exchange hashpartitioning(country, 200) între agregările parțială și finală, pattern-ul standard de agregare în două faze. Numărarea nodurilor Exchange e cea mai rapidă cale de a estima cât shuffle va face o interogare.

Prefixele *(N) indică whole-stage codegen. E un lucru Tungsten, lecția următoare.

PushedFilters apare pe scanări de fișiere. Când citești Parquet sau alt format columnar, vei vedea ceva de genul:

+- FileScan parquet warehouse.orders[user_id#A, amount#Y]
   PushedFilters: [IsNotNull(amount), GreaterThan(amount,40)],
   ReadSchema: struct<user_id:int,amount:double>

Acelea sunt predicate care au fost împinse până jos în cititorul formatului de fișier. Parquet le evaluează în timp ce citește și sare peste row groups întregi care nu se pot potrivi. Combinat cu column pruning (ReadSchema arată doar coloanele pe care le folosești efectiv), de aceea e Parquet atât de mai rapid decât CSV: nu doar compresia, ci capacitatea optimizatorului de a citi mai puține date.

Citirea planurilor în practică

.explain(True) afișează toate cele patru faze și e ce vrei când se întâmplă ceva surprinzător. .explain() (fără argument) afișează doar physical plan-ul, care e cel mai mult ce-ți trebuie zi de zi. Există și .explain("formatted"), care randează physical plan-ul ca o listă numerotată cu detalii defalcate dedesubt: mai ușor pe ochi pentru planuri mari, mai greu de scanat rapid.

Un workflow care tinde să fie util când depanezi o interogare lentă:

  1. Rulează .explain() și numără nodurile Exchange. Fiecare e un shuffle. Trei sau mai puține sunt de obicei OK; șase e suspect; zece înseamnă că ceva nu e în regulă.
  2. Uită-te la algoritmii de join. Orice BroadcastNestedLoopJoin e steag roșu.
  3. Verifică scanările frunză. Apar filtrele pe care le-ai scris în PushedFilters? E ReadSchema setul minim de coloane de care ai nevoie?
  4. Dacă ceva pare în neregulă, comută la .explain(True) și parcurge optimized logical plan ca să vezi de ce a avut sau nu loc rescrierea.

Ăsta e și momentul potrivit să menționez Adaptive Query Execution. AQE (activat implicit din Spark 3.2) adaugă o a cincea fază care rulează în timpul execuției: după fiecare shuffle, Spark se uită la mărimile reale ale partițiilor și poate dinamic să unească partiții mici, să comute un sort-merge join la un broadcast join dacă o parte s-a dovedit mai mică decât se aștepta sau să spargă partiții skew. AQE apare în plan ca AdaptiveSparkPlan la rădăcină și noduri AQEShuffleRead după exchange-uri. Dacă physical plan-ul tău arată ciudat pentru că s-a schimbat la runtime, de aia.

Puncte de extensie

Poți să-ți cuplezi propriile reguli în Catalyst: Spark Session Extensions îți permite să înregistrezi reguli de optimizator, strategii de planner, extensii de parser și reguli de analiză la pornirea sesiunii. Iceberg, Delta Lake și Hudi folosesc toate asta ca să injecteze propriile rescrieri pentru lucruri precum predicate pushdown în transaction logs. Să-ți scrii propriile reguli e rar și avansat; mai degrabă vei întâlni această suprafață ca utilizator, când o bibliotecă de data source îți spune să adaugi un config precum spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension. Acum știi ce face.

Ce să reții

Catalyst rescrie codul tău prin patru faze, iar .explain(True) e fereastra ta către toate. Planurile optimizate îți spun ce a gândit Spark despre interogarea ta; planurile fizice îți spun ce va face. Citește-le suficient de des încât numele operatorilor să devină familiare. Odată ce poți citi un plan, restul muncii de performanță Spark, join-uri, partiționare, caching, tot Modulul 6, se leagă ca alegeri care apar direct în plan.

Lecția următoare: stratul de sub Catalyst. Tungsten, generare de cod, formatul de memorie binar și de ce Spark cu DataFrame e atât de mai rapid decât RDD-ul pe care l-a înlocuit.


Surse: „Deep Dive into Spark SQL’s Catalyst Optimizer” (https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html) și ghidul Apache Spark SQL performance tuning (https://spark.apache.org/docs/latest/sql-performance-tuning.html). Consultat 2026-05-01.

Caută