PySpark, de la zero Lecția 21 / 60

Narrow vs wide transformations: cel mai important concept din Spark

De ce unele transformari sunt aproape gratis si altele cer ca tot clusterul sa faca shuffle. Distinctia care explica fiecare intrebare de performanta din Spark.

Dacă reții un singur lucru din tot cursul ăsta, fă să fie conținutul lecției de față.

Fiecare transformation din Spark cade într-una din două categorii. Transformările narrow rulează la viteza hardware-ului prin cluster, fără coordonare. Transformările wide cer ca toți executors să se oprească, să schimbe date prin rețea și să reia, o operație numită shuffle, care costă cu ordine de mărime mai mult decât tot ce e în jurul ei.

Linia dintre narrow și wide e cea mai importantă graniță din Spark. Fiecare întrebare de performanță despre Spark: de ce e asta lent, cum o fac mai rapidă, de ce a căzut clusterul meu, de ce un singur executor face toată munca: se întoarce la această distincție. Restul cursului e, în mare, despre evitarea, minimizarea sau optimizarea transformărilor wide.

Recapitulare rapidă a partițiilor

Înainte ca definiția să aibă sens, modelul partițiilor. Un Spark DataFrame e împărțit în multe partiții, de regulă între 100 și câteva mii, distribuite pe executors-urile clusterului. Fiecare partiție trăiește pe o singură mașină și e procesată de un singur task. Atâta timp cât munca fiecărui task e independentă de celelalte, clusterul e fericit și jobul tău rulează la viteza celui mai lent task. Lucrurile se strică atunci când un task de pe mașina A trebuie să știe ceva de pe mașina B. Atunci datele trebuie să se miște.

Definiția

O transformation narrow e una în care fiecare partiție de output depinde de exact o partiție de input. Fiecare task poate rula independent pe propria mașină, uitându-se doar la propria felie de date, și să producă felia lui de output. Fără coordonare. Fără rețea.

O transformation wide e una în care fiecare partiție de output depinde de date din mai multe partiții de input. Clusterul trebuie să redistribuie rânduri astfel încât rândurile care trebuie să fie în aceeași partiție de output să ajungă pe aceeași mașină. Această redistribuire e shuffle-ul: fiecare executor scrie rândurile pe un fișier local de disc, partiționat după destinație, apoi fiecare alt executor citește înapoi rândurile destinate lui. Mișcare de date all-to-all.

Concret:

  • Un filter(col("Country") == "IT") se uită la un singur rând pe rând. Fiecare task își citește partiția, păstrează rândurile IT, le aruncă pe celelalte, scrie felia lui de output. Celelalte partiții nu contează. Narrow.
  • Un groupBy("Country").agg(F.sum("Total")) are nevoie ca fiecare rând IT să ajungă în același loc ca să fie sumat împreună. Unele rânduri IT sunt pe mașina A, unele pe B, unele pe C. Trebuie să se întâlnească. Shuffle. Wide.

Catalogul

Transformări narrow (fără shuffle, ieftine):

  • select, selectExpr (când nu e DISTINCT sau agregare în expresie)
  • where / filter
  • withColumn, withColumnRenamed, drop
  • union, unionByName: adaugă rânduri; niciun rând nu trebuie să se mute
  • na.fill, na.drop, na.replace
  • sample (fără replacement, în majoritatea modurilor)
  • map, mapPartitions: operațiile RDD-style rând cu rând, incluse pentru completitudine

Transformări wide (shuffle necesar, scumpe):

  • groupBy(*cols).agg(...): gruparea după cheie cere ca rândurile cu aceeași cheie să se colocalizeze
  • distinct(), dropDuplicates(*cols): la fel: deduplicarea cere ca rândurile cu aceeași valoare să fie împreună
  • join(other, on, how): ambele părți trebuie să aibă cheile potrivite colocalizate, mai puțin în cazul broadcast joins (lecția 25)
  • orderBy / sort: sortarea globală cere range-partitioning prin tot clusterul
  • repartition(n), repartition(n, col), repartitionByRange(...): acestea există literalmente ca să facă shuffle
  • intersect, exceptAll, subtract: operații de mulțimi pe tot DataFrame-ul
  • Funcții window cu o clauză partitionBy(...): au nevoie ca rândurile din aceeași fereastră să fie împreună

Excepția onorabilă:

  • coalesce(n) reduce numărul de partiții combinându-le pe cele adiacente. Nu face shuffle; e narrow. Compromisul e că poți doar să cobori (nu poți merge de la 4 partiții la 200 cu coalesce), iar partițiile de output pot deveni dezechilibrate. Folosește coalesce ca să cobori de la 200 la 10 înainte de a scrie un fișier mic. Folosește repartition când ai nevoie de echilibru sau să crești.

Imaginea

Diagramă ASCII. Patru partiții de input, sus, procesate de patru task-uri. Două fluxuri: narrow (filter) și wide (groupBy).

Narrow transformation: filter(col("Country") == "IT")

  Input:    [P0]      [P1]      [P2]      [P3]
              |         |         |         |
              v         v         v         v
            task0     task1     task2     task3   (parallel, independent)
              |         |         |         |
              v         v         v         v
  Output:   [P0']     [P1']     [P2']     [P3']

Each task reads exactly one partition, writes one partition. No data
crosses machines. Wall-clock time = time of slowest task.


Wide transformation: groupBy("Country").agg(...)

  Input:    [P0]      [P1]      [P2]      [P3]
              |         |         |         |
              v         v         v         v
            task0     task1     task2     task3   (partial aggregate)
              |         |         |         |
              +----+----+----+----+----+----+
                   |         |         |
                   v         v         v          === SHUFFLE ===
                  IT        NL        DE          (network + disk)
                   |         |         |
                   v         v         v
                task_IT   task_NL   task_DE       (final aggregate)
                   |         |         |
                   v         v         v
  Output:        [Pa]      [Pb]      [Pc]

Uită-te la secțiunea de mijloc. În cazul narrow, patru task-uri își fac fiecare treaba în paralel și am terminat. În cazul wide, patru task-uri fac muncă parțială, apoi fiecare task trebuie să-și trimită rândurile la destinația corectă: rândurile IT într-un loc, NL în altul, DE în al treilea, prin rețea. Apoi o a doua rundă de task-uri face agregarea finală.

Acesta e shuffle-ul. E lucrul lent și scump din distributed computing. Și e lucrul la care fiecare discuție de performanță în Spark se întoarce eventual.

De ce sunt shuffles scumpe

Trei costuri, toate reale. Disk write: fiecare executor, înainte să-și trimită rândurile, le scrie pe discul local partiționate după destinație, ca partea care primește să poată trage la cerere și ca task-urile eșuate să poată fi reluate fără să refacă munca în amonte. Transfer prin rețea: rândurile se mișcă prin rețeaua clusterului; într-un shuffle de 200 de partiții pe un cluster de 10 noduri, fiecare nod trimite la fiecare alt nod simultan, iar lățimea de bandă devine bottleneck-ul. Disk read: executor-ul care primește citește fișierele de shuffle în memorie ca să-și facă treaba.

Un pipeline de transformări pur narrow rulează la viteza CPU plus storage. Un singur shuffle adaugă o cursă disk-network-disk pentru tot setul de date. Cinci shuffles fac asta de cinci ori secvențial, fiindcă fiecare shuffle e o barieră: stage-ul următor nu poate începe decât după ce shuffle-ul precedent s-a terminat. În cifre practice: un stage doar narrow pe un DataFrame de 10 GB ar putea rula în 30 de secunde; adaugă un groupBy și același pipeline durează adesea 2-3 minute.

Cum se vede un shuffle în .explain()

Spark expune shuffle-ul direct în planul fizic. Caută operatorul Exchange:

df = spark.createDataFrame(
    [(i, "IT" if i % 3 == 0 else "NL", float(i)) for i in range(1000)],
    "OrderId INT, Country STRING, Total DOUBLE",
)

# Narrow only — no Exchange
(df.where(col("Country") == "IT")
   .withColumn("WithVat", col("Total") * 1.22)).explain()
# *(1) Project [..., (Total * 1.22) AS WithVat]
# +- *(1) Filter (Country = IT)
#    +- *(1) Scan ExistingRDD[...]

# Add a groupBy — Exchange appears
(df.where(col("Country") == "IT")
   .groupBy("Country")
   .agg(F.sum("Total").alias("TotalIT"))).explain()
# *(2) HashAggregate(keys=[Country], functions=[sum(Total)])
# +- Exchange hashpartitioning(Country, 200), ENSURE_REQUIREMENTS, ...
#    +- *(1) HashAggregate(keys=[Country], functions=[partial_sum(Total)])
#       +- *(1) Filter (Country = IT)
#          +- *(1) Scan ExistingRDD[...]

Acolo e Exchange hashpartitioning(Country, 200). *(1) și *(2) sunt ID-urile de stage; Spark sparge munca la granița shuffle-ului. Stage 1 citește, filtrează și agregă parțial per partiție. Stage 2 ia output-ul shuffle-uit, finalizează agregarea și întoarce rezultatul.

Pattern-ul de internalizat: fiecare Exchange e un shuffle, o graniță de stage, o bucată de cost network/disc.

Stages și DAG-ul

Definiție rapidă: un stage e o bucată de muncă ce poate rula ca un pipeline de transformări narrow fără un shuffle la mijloc. Fiecare Exchange e granița dintre două stages. Un pipeline cu două shuffles are trei stages; un pipeline fără shuffles e un singur stage. Fiecare stage trebuie să se termine înainte ca următorul să înceapă, pentru că input-ul lui e output-ul shuffle-ui anterior. Regulă empirică: numără transformările wide și ai numărat aproximativ stages-urile. Povestea completă a DAG-ului vine în lecția următoare.

Regulile de performanță care ies de aici

Odată ce internalizezi narrow vs wide, fiecare recomandare de performanță Spark capătă sens:

Filtrează și proiectează înainte de pașii wide. Un where și un select sunt narrow și gratis; făcându-i devreme reduci datele care intră în următorul shuffle. Optimizatorul îi împinge adesea în jos pentru tine, dar scriindu-i în locul potrivit faci intenția evidentă.

Evită sortarea dacă nu ai chiar nevoie de output sortat. orderBy e sortare globală prin tot clusterul, una dintre cele mai scumpe lucruri pe care le poți face. Dacă ai nevoie doar de top 10, df.orderBy("col").limit(10) lasă Spark să facă o optimizare TopK, dar tot e mai ieftin să eviți sortarea complet când poți.

Folosește broadcast joins pentru small-on-large. Dacă o parte a unui join încape în memoria driverului (sub câteva sute de MB), Spark o poate broadcast-a la fiecare executor și evita complet shuffle-ul pe partea mare. Lecția 25 acoperă asta; poate transforma un join de 10 minute într-unul de 30 de secunde.

Refolosește rezultatele intermediare. Dacă ai plătit o dată costul unui shuffle, cache-uiește rezultatul înainte să faci mai multe agregări peste el. Lecția 23 acoperă strategia de caching.

Nu repartiționa dacă n-ai un motiv. repartition() e ea însăși o transformation wide. Apelarea ei din motive cosmetice înseamnă să plătești pentru un shuffle fără niciun beneficiu. Motive bune: skew sever, pre-partiționare pentru multe agregări pe o cheie, sau controlul numărului de fișiere de output înainte de o scriere.

Folosește coalesce ca să reduci partițiile înainte de scriere. Dacă DataFrame-ul tău are 200 de partiții și scrii un rezultat mic, coalesce(8) îți dă 8 fișiere de output fără shuffle.

O avertizare despre skew: modelul narrow-vs-wide presupune că partițiile de output ale shuffle-ului sunt aproximativ echilibrate. În viața reală, adesea nu sunt. Dacă 80% dintre comenzile tale sunt din Italia și faci groupBy("Country"), task-ul de reduce IT primește 80% din date, rulează de 50x mai lent decât celelalte și dominează timpul real. Coloanele categoriale cu o valoare dominantă sunt suspecții obișnuiți. Diagnosticul și soluțiile vin mai târziu în curs.

Punând totul cap la cap

Un pipeline realist. Marchează fiecare linie ca narrow (N), wide (W) sau action (A):

df = (spark.read.parquet("./orders")           # ___
      .where(col("OrderDate") >= "2026-01-01")  # ___
      .withColumn("Total",                       # ___
                  col("Total").cast("double"))
      .join(customers, "CustomerId", "left")    # ___
      .groupBy("Country", "CustomerId")          # ___
      .agg(F.sum("Total").alias("LTV"))
      .orderBy(F.desc("LTV"))                   # ___
      .limit(100)                                # ___
      .write.parquet("./out/top100"))            # ___

Read, filter (N), withColumn (N), join (W), groupBy.agg (W), orderBy (W), limit (N), write (A). Trei transformări wide, patru stages, o action. Costul e dominat de cele trei shuffles: join-ul, grup-ul, sortarea: nu de pașii narrow.

Dacă pipeline-ul ăsta ar fi lent, soluția nu e în liniile narrow. E în: ar putea fi customers broadcast-at? Am putea grupa pe o cardinalitate mai mică mai întâi? Pârghiile sunt mereu aceleași: fă shuffles-urile mai mici, mai puține sau evitabile.

Rulează asta pe propria ta mașină

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder.appName("NarrowWide").master("local[*]").getOrCreate())

df = spark.createDataFrame(
    [(i, "IT" if i % 3 == 0 else "NL", float(i)) for i in range(1000)],
    "OrderId INT, Country STRING, Total DOUBLE",
)

# 1. Pure narrow chain — no Exchange in the plan
narrow = (df
    .where(col("Country") == "IT")
    .withColumn("WithVat", col("Total") * 1.22)
    .withColumn("Region", F.lit("EU")))
narrow.explain()

# 2. Single wide step — one Exchange, two stages
single_wide = narrow.groupBy("Country").agg(F.sum("WithVat").alias("Total"))
single_wide.explain()

# 3. Multiple wide steps — multiple Exchanges, more stages
multi_wide = (narrow
    .join(df.select("OrderId", "Total").alias("d2"), "OrderId")
    .groupBy("Country").agg(F.sum("WithVat").alias("Total"))
    .orderBy(F.desc("Total")))
multi_wide.explain()

# 4. coalesce vs repartition: count Exchange operators
coalesced = narrow.coalesce(2)
coalesced.explain()                    # no Exchange — just partition combining

repartitioned = narrow.repartition(8)
repartitioned.explain()                # Exchange present — full shuffle

# 5. Run the wide pipeline and check the Spark UI for stage count
multi_wide.write.mode("overwrite").parquet("./tmp/wide_demo")
# Open http://localhost:4040 — you should see one job with three stages
# (one per Exchange plus the final write).

Privește acele planuri până când operatorul Exchange devine lucrul la care îți fug ochii primul. Acel mușchi trebuie construit. De aici încolo, „o să fie pipeline-ul ăsta rapid?” e cam aceeași întrebare cu „câte Exchanges are planul lui și cât de mari sunt datele care trec prin fiecare?”.

Lecția următoare: DAG-ul. Cum transformă Spark un plan logic în stages, ce îți cumpără graful de lineage (toleranță la erori gratis) și cum citești vizualizarea DAG din Spark UI pentru orice job. Lecția 23 e despre caching: playbook-ul pentru când, ce și cum să cache-uiești, ca munca ta iterativă să nu plătească taxa de shuffle de două ori.

Caută