PySpark, de la zero Lecția 20 / 60

Transformations vs actions: dihotomia si catalogul

Fiecare operatie PySpark e fie o transformation, fie o action. Sa stii care e care e jumatate din debugging.

Lecția trecută a stabilit regula: transformations descriu munca, actions o declanșează. Lecția asta e catalogul. Merită memorat, fiindcă de fiecare dată când debug-ezi un job Spark lent vei pune aceeași întrebare: care linie din scriptul ăsta de 80 de linii a fost action-ul care a declanșat tot?

Uneori e evident: .write.parquet(...) de jos. Alteori nu: cineva a pus un .count() cu șase linii mai sus „pentru logging” și acum pipeline-ul tău rulează de două ori.

Definiția, mai precis

O transformation e o metodă care întoarce un DataFrame nou și nu adaugă lumii nimic în afară de un nod în planul logic. E un apel de funcție pură pe o descriere de DataFrame. Nu se mișcă date. Nu se aprind executors. Driverul primește o referință nouă de DataFrame și mergi mai departe.

O action e o metodă care întoarce altceva decât un DataFrame, sau provoacă un efect secundar (de obicei o scriere pe disc sau un callback executat pe rânduri). Action-ul e momentul în care Spark spune „bine, acum chiar e timpul să fac toată munca aia descrisă.” Driverul compilează planul logic într-un plan fizic, îl sparge în stages, trimite task-uri la executors, colectează rezultatele și le întoarce codului tău.

Regula empirică „e asta o action?” care e corectă în 95% din cazuri:

Întoarce această metodă altceva decât un DataFrame? Dacă da, e o action.

Restul de 5% sunt cazuri stângace de margine (cache, persist, checkpoint, uneori createOrReplaceTempView) pe care le tratăm separat.

Catalog: transformations comune

Fiecare metodă de aici întoarce un DataFrame și nu declanșează nimic. Listate aproximativ după frecvența de utilizare:

  • select(*cols): proiectează coloane. Posibil cea mai folosită transformation.
  • selectExpr(*expr_strings): la fel, dar cu expresii SQL-string în loc de obiecte col().
  • where(cond) / filter(cond): păstrează rândurile unde condiția e adevărată. Același operator, două nume.
  • withColumn(name, expr): adaugă sau înlocuiește o coloană.
  • withColumnRenamed(old, new): redenumește o coloană. Acoperit lecția trecută.
  • drop(*cols): șterge coloane.
  • join(other, on, how): combină două DataFrames pe coloane-cheie.
  • groupBy(*cols).agg(...): grupează rânduri și agregă. Notă: groupBy singur întoarce un GroupedData, nu un DataFrame; .agg(...) e cel care te aduce înapoi în teritoriul DataFrame. Cele două apeluri împreună sunt o singură transformation.
  • orderBy(*cols) / sort(*cols): sortează rânduri. Același operator, două nume.
  • distinct(): elimină rândurile duplicate pe toate coloanele.
  • dropDuplicates(*cols): elimină duplicatele luând în calcul doar coloanele listate (sau toate dacă nu se dă niciuna).
  • union(other) / unionAll(other): adaugă rânduri dintr-un alt DataFrame, după poziție.
  • unionByName(other, allowMissingColumns=False): adaugă rânduri după numele coloanei. Mai sigur decât union când schemele pot diferi.
  • intersect(other): păstrează doar rândurile prezente în ambele DataFrames.
  • exceptAll(other) / subtract(other): elimină rândurile prezente într-un alt DataFrame.
  • repartition(n, *cols): redistribuie datele în n partiții, opțional hash-uite după coloane.
  • coalesce(n): reduce numărul de partiții fără un shuffle complet.
  • sample(fraction, withReplacement=False, seed=None): submulțime aleatorie de rânduri.
  • limit(n): păstrează cel mult n rânduri. Lazy! Nu întoarce rânduri; întoarce un DataFrame al cărui plan se termină cu LIMIT n. Ca să scoți rândurile afară ai tot nevoie de o action.
  • na.fill(...), na.drop(...), na.replace(...): tratarea null-urilor.
  • withWatermark(col, threshold): pentru streaming.

Fiecare dintre acestea întoarce un DataFrame. Niciuna nu rulează ceva de una singură.

Catalog: actions comune

Fiecare dintre acestea întoarce altceva decât un DataFrame, sau scrie pe disc:

  • show(n=20, truncate=True): afișează rânduri la stdout. Întoarce None.
  • count(): numărul de rânduri. Întoarce un int.
  • collect(): trage toate rândurile în driver ca o listă de obiecte Row. Întoarce list[Row]. Periculos: aduce întreg DataFrame-ul în memoria driverului.
  • take(n): întoarce primele n rânduri ca listă. Întoarce list[Row].
  • first(): întoarce primul rând. Întoarce un Row.
  • head(n=1): la fel ca take când n > 1, la fel ca first când n == 1.
  • tail(n): ultimele n rânduri. Întoarce list[Row]. Disponibil în Spark 3+.
  • toPandas(): convertește la un DataFrame Pandas. La fel ca collect(), materializează totul în driver.
  • toLocalIterator(): iterează peste rânduri pe driver, o partiție pe rând. Mai lent decât collect(), dar cu memorie mărginită.
  • foreach(func) / foreachPartition(func): aplică o funcție cu efect secundar pe fiecare rând sau partiție. Folosit pentru scriere către sink-uri externe neacoperite de write.*.
  • write.format(...).save(...): și scurtăturile: write.parquet(...), write.csv(...), write.json(...), write.orc(...), write.saveAsTable(...), write.insertInto(...). Familia DataFrameWriter. Fiecare dintre acestea e o action: în momentul în care chemi .save(), .parquet() etc., Spark rulează pipeline-ul și scrie rezultatele.
  • describe() și summary(): calculează statistici sumare. Ambele întorc DataFrames, dar fiecare apel declanșează intern o trecere completă pentru calcul, deci se comportă ca actions din punct de vedere al costului. (Tehnic, transformări care conțin actions ascunse în construcția lor. Un caz de margine pedant.)

Pattern-ul: dacă îți dă un număr, o listă, un afișaj la stdout sau un fișier pe disc, e o action.

Citirea unui pipeline real

Iată un pipeline. Găsește action-ul.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("OrdersETL")
         .master("local[*]")
         .getOrCreate())

orders = spark.read.option("header", True).csv("./data/orders.csv")

customers = spark.read.option("header", True).csv("./data/customers.csv")

joined = orders.join(customers, on="CustomerId", how="left")

filtered = joined.where(col("Country") == "IT")

withVat = filtered.withColumn(
    "TotalWithVat",
    col("Total").cast("double") * 1.22
)

agg = (withVat
    .groupBy("CustomerId", "Country")
    .agg(F.sum("TotalWithVat").alias("LifetimeValue"),
         F.count("*").alias("Orders")))

ranked = agg.orderBy(F.desc("LifetimeValue"))

(ranked
 .write
 .mode("overwrite")
 .parquet("./out/customer_ltv"))

Fiecare linie de la apelurile read până jos la ranked = ... e o transformation. Tot lanțul e doar construirea planului. Apelurile read nici măcar nu deschid fișierele; doar înregistrează metadata.

write.parquet(...) e action-ul. Aceea e linia unde Spark se duce efectiv pe disc, citește ambele CSV-uri, face join-ul, filtrul, withColumn, groupBy, agregarea și sortarea și scrie output-ul Parquet. Un singur job, declanșat de o singură action.

Dacă deschideai Spark UI imediat după ce rulai scriptul, ai fi văzut exact un singur job listat. Pagina jobului ar arăta câteva stages (una per shuffle boundary; mai multe la lecția următoare) și linia din codul tău care l-a trimis: write.parquet.

Când sunt mai multe actions, numără joburile

df = (spark.read.parquet("./big_table")
      .where(col("Country") == "IT")
      .withColumn("Year", F.year("OrderDate")))

print("Row count:", df.count())                 # job 1
df.show(5)                                       # job 2
df.write.parquet("./out/it_orders")             # job 3

Trei actions, trei joburi. Citirea Parquet se întâmplă de trei ori, dacă nu faci .cache() pe df mai întâi. Fiecare job parcurge planul de la zero. E problema „am chemat .count() și am așteptat trei minute” din lecția trecută, înmulțită cu trei.

Dacă scrii cod de producție, numără actions-urile din pipeline-ul tău. Fiecare e o execuție completă. Două actions pe același DataFrame intermediar înseamnă două pipeline-uri complete dacă nu cache-uiești.

Categoria stângace de mijloc

Câteva metode nu se potrivesc curat. Merită cunoscute fiindcă apar în cod real și încurcă lumea.

cache() și persist()

Acestea sunt tehnic transformations. Întorc un DataFrame și nu rulează nimic de unele singure. Ce fac e să seteze un flag pe DataFrame: „următoarea oară când o action rulează pe acest DataFrame, după calcul, păstrează rezultatul în memorie (sau memorie + disc, pentru persist), ca actions ulterioare să-l poată reutiliza.”

expensive = (spark.read.parquet("./big_table")
             .where(col("Country") == "IT")
             .join(other_big_table, "CustomerId"))

expensive.cache()        # transformation: marks for caching, runs nothing

expensive.count()        # action: runs the pipeline, materializes the cache
expensive.show()         # action: reads from cache, fast
expensive.write...       # action: reads from cache, fast

Apelul .cache() singur nu face muncă. Următoarea action rulează pipeline-ul și stochează rezultatul. Actions ulterioare sar peste recalcul.

Vei vedea uneori cod care face df.cache().count(): count() e o action deliberată „încălzește cache-ul” care forțează materializarea. Idiom comun în pipeline-uri performance-tuned. Vom acoperi playbook-ul complet de caching în lecția 23.

checkpoint()

Taie lineage-ul salvând fizic DataFrame-ul pe disc și pornind un nou lineage de la acel snapshot. Întoarce un DataFrame. Tehnic, o transformation, dar în practică declanșează un job care scrie checkpoint-ul. Tratează-l ca pe o action din punct de vedere al costului.

createOrReplaceTempView(name) și createGlobalTempView(name)

Acestea întorc None, deci după regula empirică arată ca actions. Nu sunt. Înregistrează planul DataFrame-ului sub un nume în catalogul SQL, ca să-l poți referi din spark.sql(...). Nu se execută nimic. View-ul e doar un alt nod în graful query-ului.

printSchema(), columns, dtypes, schema

Toate metadata. Niciuna nu atinge datele. Gratis. (Nu te lăsa păcălit că printSchema() e un apel de metodă cu paranteze: citește din planul logic, nu din date.)

explain()

Afișează planul query-ului fără să-l ruleze. Gratis. Folosește-l liber.

Cum spune Spark UI povestea

Deschide Spark UI (implicit http://localhost:4040 pentru o sesiune locală). Tab-ul Jobs listează fiecare action care a rulat, cu linia de cod care a declanșat-o și stages-urile în care s-a spart. UI-ul e principalul tău instrument de debugging de aici încolo. Două obiceiuri care merită construite:

  1. După rularea oricărui pipeline, aruncă un ochi la tab-ul Jobs. Dacă numărul de joburi e mai mare decât numărul de actions intenționate, ai o action ascunsă pe undeva: caută apeluri .count() sau .show() rătăcite.
  2. Intră în stage-ul lent. Dacă un task e de 10x mai lung decât celelalte, ai skew. Dacă input size e 100 GB, dar te așteptai la 10 MB, filtrul tău nu a fost împins în jos.

Câteva consecințe practice

Plasarea action-ului contează pentru memorie. collect() și toPandas() trag totul în driver. Pe un DataFrame de 50 GB asta va da OOM driverului tău și va crăpa sesiunea. Folosește take(n) ca să arunci o privire, show(n) ca să inspectezi și collect() doar când ai filtrat deja la rezultate mici.

Actions repetate repetă toată munca. Trei actions pe același DataFrame intermediar e trei pipeline-uri complete dacă nu cache-uiești.

limit(n).show() e ieftin. limit e o transformation; show rulează action-ul cu limit-ul în plan. Spark nu va citi tot input-ul: va citi partiții până are n rânduri și se oprește. Bun pentru prototipare pe tabele mari.

count() după un filtru nu e gratis. Tot parcurge întreg fișierul de input. Dacă ai nevoie doar să știi „există rânduri?”, df.limit(1).take(1) e mai ieftin.

Cele două moduri de eșec pe care le-am văzut cel mai des: „logger-ul care costă 300 $ pe zi” (cineva adaugă print(f"rows: {df.count()}") la jumătatea unui pipeline de 2 TB pentru vizibilitate, dublându-i costul) și „testele trec, prod cade” (un .collect() merge bine pe 6 rânduri în memorie la teste și dă OOM driverului pe 80 GB în producție). Ambele sunt probleme de cunoaștere a catalogului. Dacă știi că .count() e o action și că .collect() materializează în driver, nu scrii niciuna.

Rulează asta pe propria ta mașină

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder.appName("Catalog").master("local[*]").getOrCreate())

orders = spark.createDataFrame(
    [(1, "IT", 59.0), (2, "IT", 29.0), (3, "NL", 149.0),
     (4, "IT", 89.5), (5, "DE", 14.0), (6, "IT", 42.42)],
    "OrderId INT, Country STRING, Total DOUBLE",
)

# Pure transformation chain — instant
chain = (orders
    .where(col("Country") == "IT")
    .withColumn("WithVat", col("Total") * 1.22)
    .groupBy("Country")
    .agg(F.sum("WithVat").alias("Total"),
         F.count("*").alias("N")))

# 1. Inspect the plan without running anything
chain.explain()

# 2. Action: show() runs the whole chain
chain.show()

# 3. Multiple actions => repeated work
print("count:", chain.count())     # full re-execution
print("count:", chain.count())     # full re-execution again

# 4. Cache, then multiple actions => one execution + cache reads
chain.cache()
chain.count()      # materializes the cache
chain.count()      # cache hit, fast
chain.show()       # cache hit, fast

# 5. Open http://localhost:4040 and look at the Jobs tab.
# You should see one job per action and only one full pipeline execution
# after the cache warmup.

Deschide UI-ul în paralel cu asta și uită-te cum fiecare action adaugă un job în listă. Apoi experimentează: comentează linia .cache() și rulează din nou; numără joburile. Numărul de joburi e exact numărul de actions, de fiecare dată.

Lecția următoare: DAG-ul. Ce construiește Spark efectiv când o action declanșează, de ce un job are mai multe stages, unde sunt limitele și cum supraviețuiește graful de lineage căderilor de executors.

Caută