Prima dată când m-am așezat lângă un coleg care migra un script Pandas la PySpark, exact asta s-a întâmplat. A scris un lanț: read, filter, select, group, aggregate, a apăsat Shift+Enter, iar celula s-a întors în vreo 12 milisecunde. S-a uitat la mine. „A mers?” Apoi a tastat df.show() și a așteptat 90 de secunde, în timp ce un job pe care îl credea deja rulat se apuca, în sfârșit, să ruleze efectiv.
Asta e lazy evaluation într-un paragraf. Bun venit la Modulul 4.
Lucrul care îi încurcă pe toți cei care vin din Pandas
În Pandas, fiecare linie de cod face muncă. df = df[df.country == "IT"] citește o bucată de memorie, o scanează și produce un DataFrame nou în RAM, chiar acum. Linia următoare operează pe rezultat. Pandas e eager: fiecare apel calculează imediat.
PySpark e lazy. Transformările nu calculează nimic. Construiesc un plan.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder
.appName("LazyDemo")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1, "IT", 59.0, "2026-03-05"),
(2, "IT", 29.0, "2026-03-18"),
(3, "NL", 149.0, "2026-02-15"),
(4, "IT", 89.5, "2026-03-22"),
(5, "DE", 14.0, "2026-03-10"),
(6, "IT", 42.42, "2026-03-26"),
],
"OrderId INT, Country STRING, Total DOUBLE, OrderDate STRING",
)
# Build a chain of transformations
result = (orders
.where(col("Country") == "IT")
.withColumn("Total", col("Total") * 1.22) # add VAT
.groupBy("Country")
.agg(F.sum("Total").alias("TotalWithVat"),
F.count("*").alias("OrderCount")))
print(result)
# DataFrame[Country: string, TotalWithVat: double, OrderCount: bigint]
Acel print(result) s-a întors în microsecunde. Nimic nu a fost filtrat. Nimic nu a fost înmulțit. Nimic nu a fost grupat. Spark nici măcar nu a deschis datele, iar orders aici e în memorie, deci nu există nimic de deschis. Tot ce este result în acest moment e o descriere a muncii de făcut.
Dacă n-ai mai văzut asta, pare stricat. Nu este. E exact ideea.
De ce lazy: optimizatorul are nevoie de tot planul
Motivul e simplu și merită luat în serios: un optimizator care vede o singură operație pe rând nu poate lua decât decizii locale. Un optimizator care vede întregul pipeline înainte ca vreuna să ruleze poate rearanja lucrurile.
Exemple concrete despre ce face Spark odată ce poate vedea tot planul:
- Filter pushdown. Ai scris
read.parquet(...).select("a","b","c").where(col("country")=="IT"). Spark observă filtrul și îl împinge în interiorul citirii Parquet, astfel încât doar grupurile de rânduri cu țara IT sunt încărcate de pe disc. Dacă lanțul ar fi fost eager, citirea s-ar fi întâmplat deja până să apară filtrul. - Column pruning. Citești 80 de coloane din Parquet, dar folosești doar 4 mai jos în lanț. Spark încarcă doar acele 4. Același truc: optimizatorul trebuie să vadă utilizarea ulterioară înainte să decidă ce să citească.
- Combinare de predicate. Ai scris
.where(col("a") > 10).where(col("b") < 100)ca două apeluri. Spark le fuzionează într-o singură trecere de filtrare. - Alegerea strategiei de join. Spark poate alege între broadcast joins, shuffle hash joins și sort-merge joins. Alegerea corectă depinde de dimensiunile tabelelor și de cheile de join, lucruri pe care le poate ști doar după ce întreg planul e asamblat.
- Fuziunea operațiilor. Mai multe apeluri
withColumnșiselectsunt compilate într-o singură trecere peste date, în loc de N treceri.
Primești toate astea gratis, dar numai pentru că nimic nu se execută până când o action nu o forțează. Eager evaluation ar fi nevoită să se angajeze la o strategie pe fiecare linie; lazy evaluation poate aștepta imaginea completă și o poate alege pe cea mai bună.
E aceeași idee ca un SQL query planner. Scrii un SELECT cu un WHERE, iar baza de date decide dacă să folosească un index, să scaneze un heap sau să facă ceva inteligent cu statistici. Transformările PySpark sunt echivalentul pentru DataFrame: tu descrii ce, motorul alege cum.
Transformările descriu; actions execută
Modelul mental de internalizat:
- O transformation întoarce un DataFrame nou și adaugă un nod la planul logic. Niciun calcul. Exemple:
select,where,withColumn,join,groupBy.agg,orderBy,distinct,union. - O action întoarce un rezultat (o valoare Python, o scriere pe disc, un efect secundar) și declanșează execuția a tot din plan de care depinde. Exemple:
show,count,collect,take,first,toPandas,write.parquet,write.csv,foreach.
Lecția 20 are catalogul complet. Deocamdată testul e: „întoarce această metodă un DataFrame?” Dacă da, aproape sigur e o transformation. Dacă întoarce orice altceva (un număr, o listă de obiecte Row, None după scriere pe disc) e o action.
# Transformations: instant. The chain is just being described.
print("Building plan...")
plan = (orders
.where(col("Country") == "IT")
.withColumn("Total", col("Total") * 1.22)
.groupBy("Country")
.agg(F.sum("Total").alias("TotalWithVat")))
print("Plan built. Type:", type(plan).__name__)
# DataFrame
# Action: this is when work happens
print("Calling show()...")
plan.show()
# Now the executors actually compute. The first .show() on a non-trivial
# pipeline is the moment your laptop's fan spins up.
Dacă pui un time.time() în jurul lanțului de transformări, vei vedea numere în microsecunde. În jurul lui show() vei vedea cât costă efectiv calculul: milisecunde pentru date mici în memorie, minute sau ore pentru fișiere mari.
Surpriza „am chemat .count() și am așteptat trei minute”
Asta e experiența canonică a începătorului. Ai construit un lanț cu 12 pași care citește un tabel Parquet de 200 GB, îl unește cu două tabele de referință, filtrează, agregă, iar tu vrei să te uiți la rezultat. Tastezi result.count(). Aștepți. Și aștepți. Și fierbe ibricul, și începe ședința, iar tu încă aștepți.
Ce s-a întâmplat e exact ce trebuia să se întâmple: count() e prima action din lanț. Până în acel moment, niciunul dintre cei 12 pași nu rulase. Stăteau toți în planul logic. count() e cel care a făcut motorul să se ducă efectiv să deschidă tabelul de 200 GB, să facă join-urile și să numere rândurile.
Nu e un bug. Bug-ul e așteptarea ta că count() e ieftin. În Pandas chiar este: datele sunt deja în RAM, numărarea rândurilor e o căutare de lungime. În Spark, count() rulează din nou întreg pipeline-ul care a produs DataFrame-ul.
Anti-pattern-ul de notebook
Acum, varianta și mai rea. Prototipezi. Construiești lanțul pas cu pas, aruncând un ochi la fiecare etapă:
# Build the chain piece by piece, with .show() at each step (DON'T)
df1 = orders.where(col("Country") == "IT")
df1.show() # action #1: runs the read + filter
df2 = df1.withColumn("Total", col("Total") * 1.22)
df2.show() # action #2: re-runs read + filter + withColumn
df3 = df2.groupBy("Country").agg(F.sum("Total").alias("TotalWithVat"))
df3.show() # action #3: re-runs everything from scratch
print(df3.count()) # action #4: re-runs everything AGAIN
print(df3.count()) # action #5: re-runs everything ONE MORE TIME
Cinci actions. De cinci ori a rulat tot pipeline-ul. Spark nu cache-uiește rezultatele între actions în mod implicit: fiecare action pornește de la sursa originală și parcurge din nou tot planul.
Pe 6 rânduri de date în memorie nu contează. Pe un pipeline real care citește din S3 sau HDFS, tocmai ai taxat echipa cu de 5 ori mai mult compute și de 5 ori mai mult network IO ca să te uiți la aceleași numere din cinci unghiuri diferite.
Soluția e .cache() (sau .persist()), pe care o vom acoperi în lecția 23. Versiunea scurtă: .cache() e un marker care spune „după următoarea action, păstrează rezultatul acestui DataFrame în memorie, ca actions ulterioare să-l poată reutiliza.” Aplică-l pe intermediare scumpe la care urmează să te uiți de mai multe ori în timpul dezvoltării.
Versiunea și mai scurtă: cât timp înveți, nu presăra apeluri .show() și .count() între fiecare linie. Construiește tot lanțul. Rulează o singură action la sfârșit. Dacă vrei vizibilitate intermediară, adaugă un .cache() înainte de apelurile de vizibilitate.
Vezi cu ochii tăi
Trei experimente concrete care merită rulate o dată.
Experimentul 1: un lanț fără action se afișează instant.
import time
t0 = time.time()
big_chain = (orders
.where(col("Country") == "IT")
.withColumn("Total", col("Total") * 1.22)
.withColumn("VatRate", F.lit(0.22))
.withColumn("PreVat", col("Total") / (1 + col("VatRate")))
.groupBy("Country")
.agg(F.sum("Total").alias("TotalWithVat"),
F.sum("PreVat").alias("PreVatTotal"),
F.count("*").alias("OrderCount"))
.orderBy(F.desc("TotalWithVat")))
print(f"Building chain took {(time.time() - t0)*1000:.2f} ms")
# Building chain took 4.13 ms
Opt transformări. Sub 10 ms. Nimic calculat.
Experimentul 2: action-ul declanșează tot lanțul.
t0 = time.time()
big_chain.show()
print(f"show() took {(time.time() - t0)*1000:.2f} ms")
# show() took 850 ms (or whatever — depends on your machine)
Acele 850 ms sunt citirea, filtrul, cele patru operații withColumn, grupul, agregarea, sortarea și network shuffle pentru groupBy. Toate împachetate într-un singur job, pentru că nicio action n-a venit înainte.
Experimentul 3: .explain() îți arată planul fără să-l ruleze.
big_chain.explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [TotalWithVat#... DESC NULLS LAST], true, 0
# +- Exchange rangepartitioning(TotalWithVat#... DESC NULLS LAST, 200), ...
# +- HashAggregate(keys=[Country#...], functions=[sum(Total#...), ...])
# +- Exchange hashpartitioning(Country#..., 200), ENSURE_REQUIREMENTS, ...
# +- HashAggregate(keys=[Country#...], functions=[partial_sum(...), ...])
# +- Project [Country#..., (Total#... * 1.22) AS Total#..., ...]
# +- Filter (isnotnull(Country#...) AND (Country#... = IT))
# +- Scan ExistingRDD[OrderId#..., Country#..., Total#..., ...]
Citește-l de jos în sus. Scanează sursa, filtrează la rândurile IT, proiectează coloanele noi, agregare parțială per partiție, shuffle (Exchange), agregare finală, încă un shuffle pentru sortare, sortare. Acela e tot planul fizic și nu a rulat niciodată. explain() e cel mai rapid mod de a-ți dezvolta intuiția pentru ce urmează să facă Spark, iar îl vei vedea din nou în lecția următoare, când vom acoperi DAG-ul.
Ce schimbă asta în felul în care scrii cod
Trei obiceiuri care vin odată cu internalizarea lazy evaluation:
- Construiește lanțuri lungi; rezistă tentației de a trage cu ochiul. Fiecare privire e o reluare completă. Lasă inspecția pentru sfârșit, sau cache-uiește o dată și inspectează ieftin după.
- Ai încredere că filtrele sunt împinse în jos. Nu trebuie să micro-optimizezi ordinea clauzelor
where. Optimizatorul le va muta cât mai aproape de citire posibil. .explain()devreme când ceva e lent. Dacă un job durează mai mult decât te aștepți, planul fizic îți spune de ce mult mai repede decât ghicitul. Caută noduriExchangeneașteptate (shuffles), filter pushdown lipsă sau scanări complete de tabel acolo unde așteptai partition pruning.
Asta înseamnă și că ai o nouă mică capcană de debug: codul poate părea că funcționează când nu funcționează, pentru că erorile nu apar decât la action. O greșeală de tipar într-un nume de coloană în withColumn nu va da eroare pe linia unde ai scris-o; dă eroare când chemi în sfârșit .show(), posibil cu sute de linii mai târziu. Citește atent stack trace-ul când se întâmplă asta; AnalysisException va numi coloana lipsă, iar linia reală e cea care a referit-o.
Lecția următoare: catalogul complet de transformări și actions, inclusiv categoria stângace de mijloc (cache, persist) care arată ca actions, dar tehnic sunt transformări. După aceea, vom săpa în DAG-ul pe care Spark îl construiește din planul tău și, în sfârșit, lecția pe care pivotează restul cursului: diferența dintre transformări narrow (ieftine) și wide (scumpe). Acea distincție e cel mai important concept Spark și e la trei lecții distanță.