PySpark, de la zero Lecția 12 / 60

Local vs cluster: fluxul de dev care nu te minte

Cand modul local e suficient, cand ai nevoie de un cluster real si bug-urile care apar doar cand exista executori reali in peisaj.

Asta e lecția de încheiere a Modulului 2. Tot ce ai scris până acum a rulat pe master("local[*]"). A fost deliberat; modul local e cu adevărat bun pentru a învăța Spark, iar cea mai mare parte din ce ai tastat va rula neschimbată pe un cluster cu 100 de noduri. Dar „cea mai mare parte” face multă muncă în acea propoziție. Există o categorie de bug-uri care există doar când Spark e cu adevărat distribuit, iar descoperirea lor în producție e un moment prost.

Obiectivul de azi: să înțelegi exact ce înseamnă local, despre ce te minte și cum arată un flux sănătos dev → staging → prod în 2026.

Ce este modul local de fapt

Când scrii:

spark = SparkSession.builder.master("local[*]").getOrCreate()

…Spark pornește un singur proces JVM pe laptopul tău. Acel proces e driverul, executorul și cluster managerul, toate odată. Nu există rețea. Nicio serializare între mașini. Niciun JVM separat de executor. Doar un proces care se preface că e un cluster.

[*] din local[*] controlează paralelismul; îi spune lui Spark să folosească toate core-urile CPU disponibile ca thread-uri worker. Alte forme:

.master("local")       # 1 thread. Util pentru teste unde vrei determinism.
.master("local[4]")    # 4 thread-uri.
.master("local[*]")    # Toate core-urile.
.master("local[*, 3]") # Toate core-urile, retry de 3 ori la task-uri eșuate. Testează toleranța la erori.

Fiecare „thread” rulează un task Spark la un moment dat. Cu local[8], poți rula 8 partiții în paralel, pe 8 core-uri, într-un singur JVM. Arată ca un cluster din perspectiva API-ului. Codul DataFrame pe care îl scrii e identic byte cu byte cu codul de cluster. Asta e partea cu adevărat utilă.

Demo rapid: același script, rulat de două ori cu paralelism diferit:

from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id, count

def run(master):
    spark = (SparkSession.builder
             .appName("LocalDemo")
             .master(master)
             .getOrCreate())
    spark.sparkContext.setLogLevel("WARN")

    df = spark.range(0, 1_000_000).repartition(16)
    print(f"\nMaster={master}, default parallelism={spark.sparkContext.defaultParallelism}")
    df.groupBy(spark_partition_id().alias("partition")).agg(count("*").alias("rows")).show()
    spark.stop()

run("local[1]")
run("local[*]")

Ambele rulări produc aceeași agregare finală. Prima folosește 1 thread worker, a doua folosește fiecare core pe care îl ai. Throughput-ul diferă cu ~Nx unde N e numărul tău de core-uri. Output-ul e identic.

Unde modul local e cu adevărat ok

Fii fără scuze pentru local pentru acestea:

  • Iterație de dev. Scrii o transformare, o rulezi, repari eroarea de tip, o rulezi din nou. Bucla de feedback pe un cluster real e de 30 de secunde de overhead de submisie per încercare. Local e instantaneu. Vei itera de zece ori mai rapid.
  • Teste unitare. O suită pytest care construiește un DataFrame minuscul în cod, rulează funcția ta și face assert pe output e exact pentru ce e modul local. Runner-ele CI au 2-4 core-uri și e suficient.
  • ETL mic. Dacă input-ul tău e sub, să zicem, 5GB și laptopul tău are 16GB de RAM, modul local va depăși un cluster cu 4 noduri (fără rețea, fără shuffle peste fir, fără spin-up de executor). Am un pipeline de finanțe personale care rulează nocturn pe local[*] și probabil va rula întotdeauna.
  • Învățare. Nu vei învăța window functions mai rapid pe EMR decât pe laptopul tău.

Un laptop cu local[*] și 16GB de memorie e un mediu Spark real, capabil. Nu te scuza pentru că îl folosești.

Unde modul local te minte

Trei categorii de bug-uri pe care modul local le ascunde. Toate trei te vor găsi în cele din urmă în producție.

1. Skew-ul e invizibil

În Spark real, partițiile sunt distribuite peste executori. Dacă o partiție are 90% din date, să zicem că groupBy("country") a produs o partiție US cu 50GB și alte patruzeci de partiții cu câte 100MB fiecare, acea singură partiție US rulează pe un executor în timp ce ceilalți 40 de executori stau degeaba. Wall-clock time-ul jobului devine „cât durează acel singur task”. Asta e data skew și e cea mai comună cauză a tichetelor „jobul meu Spark e lent”.

Modul local nu îți arată asta. Cu un singur proces, toate partițiile trec oricum prin același JVM, pe aceleași thread-uri, împărțind aceeași memorie. O partiție skew tot durează mai mult decât celelalte, dar poate face spill, swap și altfel poate ocoli problema în-proces. Pe un cluster real, acel singur task primește exact memoria unui executor și moare.

Concret:

from pyspark.sql.functions import lit, col
import random

# Construiește un dataset puternic skew: o cheie cu 1M de rânduri, altele cu 1.
big   = spark.range(0, 1_000_000).withColumn("key", lit("HOT"))
small = spark.range(0, 99).withColumn("key", col("id").cast("string"))
skewed = big.unionByName(small)

# Group by key: un task face aproape toată munca.
skewed.groupBy("key").count().show()

Pe local[*] se termină fericit. Pe un cluster mic cu setări default, task-ul cheii HOT poate da OOM la executor. Lecția 41 e dedicată skew-ului: salting, broadcast joins, gestionarea skew-ului prin AQE, și nu poți cu adevărat să exersezi asta local.

2. Shuffle-urile par gratis

Un shuffle în Spark e când partițiile sunt redistribuite peste rețea, motorul din spatele fiecărui groupBy, join, distinct și orderBy. Shuffle-urile serializează datele, le scriu pe discul local și le trag peste fir de la alți executori. Sunt scumpe. Sunt adesea bottleneck-ul. Toată forma de artă „tune Spark” e în mare parte despre reducerea volumului de shuffle.

Local, un shuffle e un transfer de memorie în-proces. Fără rețea. Adesea fără disc. E aproape gratis. Așa că local un pipeline cu 5 shuffle-uri rulează în 8 secunde; pe un cluster, același pipeline durează 12 minute. Shuffle-urile domină și nu ai văzut-o venind.

Nu vei prinde nici:

  • Timeout-uri de rețea și executori pierduți în timpul citirilor de shuffle.
  • Eșecuri de disk-full din spill-ul de shuffle.
  • Skew în output-ul de shuffle (un reducer de shuffle cu 50GB de date).

Pipeline-urile care arată grozav pe un laptop au o tendință jenantă de a se prăbuși pe un cluster, iar cauza e aproape întotdeauna „shuffle-urile la care ai încetat să te gândești pentru că erau gratis local”.

3. Out-of-memory la scara executorului nu se reproduce

Într-un cluster real, fiecare executor are memorie limitată, să zicem 16GB per executor. Dacă o partiție varsă 20GB într-un singur task, acel task moare. Executorul ar putea muri. Jobul eșuează.

Local, „executorul” tău e tot JVM-ul tău, care poate folosi cea mai mare parte a RAM-ului laptopului tău (adesea 8-32GB). Poți procesa partiții care n-ar încăpea în niciun executor de cluster rezonabil și să nu îți dai seama niciodată. Apoi livrezi pe un cluster și același cod dă OOM la fiecare a doua rulare.

Versiunea de diagnostic a asta: un parse from_json() per înregistrare care alocă 200MB de struct-uri imbricate, rulat pe 100M de înregistrări, local sunt 20GB total de churn de memorie, JVM-ul gestionează; pe un cluster sunt 200MB înmulțit cu orice se înghesuie în modelul de presiune de memorie al unui executor și la revedere.

Există și veri mai subtili. Broadcast joins care „funcționează” pentru că totul e într-un singur proces au nevoie de fapt de broadcast pe un cluster. UDF-uri care funcționează local, dar serializează un closure de 50MB către fiecare executor. Coerciuni de tip PyArrow care fac round-trip ușor diferit la trecerea peste granițe de proces. Nimic din asta nu e vreodată o problemă local.

Fluxul de dev care le prinde

Forma care supraviețuiește contactului cu realitatea:

1. Dezvoltă pe local[*]. Buclă de iterație strânsă. Scrie transformări, rulează teste unitare, schițează pipeline-ul.

2. Rulează pe un cluster mic de staging, pe o felie reprezentativă de date. Doi sau trei executori, rețea reală între ei, o versiune eșantionată sau anonimizată a datelor de producție. Aici apare skew-ul, aici shuffle-urile devin scumpe, aici memoria-per-executor devine o constrângere reală.

Un eșantion de 10% pe un cluster real e enorm mai util decât o rulare de 100% pe local. Ideea nu e volumul, ci distribuirea. Dacă un cluster mic nu se sufocă, unul mare de obicei nu o va face nici el.

3. Promovează la producție. Același cod, --master diferit și date mai mari. Până în acest punct ai depanat 95% din bug-urile cluster-only.

A sări peste pasul 2 e cea mai comună greșeală de flux în domeniu. Oamenii merg direct de la local la prod, lovesc un bug de skew în producție și petrec o seară făcând rollback. Plătește taxa de staging. E ieftină.

Peisajul de cluster managers, ediția 2026

Când livrezi la un cluster real, cluster managerul e lucrul care alocă executori pentru jobul tău. Spark suportă patru:

YARN e cluster managerul original Hadoop. Dacă firma ta rulează o instalare Cloudera/Hortonworks/Hadoop legacy on-prem, ești pe YARN. E matur, testat în luptă și uniform urât. Își pierde și el încet cota de piață; practic nimeni nu alege YARN pentru un deployment greenfield în 2026, dar baza instalată e enormă și nu pleacă undeva repede.

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 50 \
  --executor-cores 4 \
  --executor-memory 16g \
  --driver-memory 8g \
  my_pipeline.py

Kubernetes e default-ul modern. Dacă ridici Spark de la zero în 2026, probabil e pe Kubernetes. Spark Operator (originar de la Google, acum un proiect Apache) îți dă CRD-uri SparkApplication, scheduling nativ de pod-uri și integrare curată cu restul stack-ului tău cloud-native. Aproape fiecare vendor cloud Spark, Databricks, EMR Serverless, Dataproc Serverless, OpenShift, e acum Kubernetes pe dedesubt, chiar dacă îl ascund.

spark-submit \
  --master k8s://https://my-cluster.example.com:6443 \
  --deploy-mode cluster \
  --conf spark.kubernetes.container.image=my-registry/spark:3.5.1 \
  --conf spark.executor.instances=50 \
  --conf spark.kubernetes.namespace=data \
  local:///opt/jobs/my_pipeline.py

Standalone mode e propriul cluster manager bundle-uit al lui Spark. E un singur binar, fără Hadoop sau Kubernetes necesare. Cu adevărat util când ai o echipă mică, dedicată, care rulează Spark pe un set fix de VM-uri și nu vrei să înveți Kubernetes pentru asta. Compromisul: fără izolare de resurse între joburi (e first-come-first-served), fără partajare de încărcătură cu joburi non-Spark și o poveste limitată de HA. Pentru o cutie de analytics cu 5 mașini, ok. Pentru o platformă multi-echipă, nu.

# Pe nodul master
./sbin/start-master.sh
# Pe fiecare worker
./sbin/start-worker.sh spark://master-host:7077

# Submit
spark-submit \
  --master spark://master-host:7077 \
  --total-executor-cores 32 \
  --executor-memory 8g \
  my_pipeline.py

Mesos a existat și ar trebui să știi că a existat. A fost depreciat în Spark 3.2 (2021) și eliminat complet în Spark 4.0 (2024). Dacă cineva încearcă să te convingă să livrezi Mesos în 2026, îndepărtează-l blând.

Wrapper-ele gestionate, Databricks, AWS EMR, Google Dataproc, Azure Synapse, sunt toate construite peste unul dintre acestea. Databricks rulează pe un substrat gestionat în stil Kubernetes. EMR poate rula pe YARN (EMR clasic) sau Kubernetes (EMR on EKS) sau serverless. Dataproc are aceeași împărțire. Scriptul de submit pe care îl scrii abia se schimbă; ce se schimbă e cine gestionează nodurile dedesubt, cine le patch-uiește, cine plătește pentru ele.

Dacă ești la început în cariera Spark, răspunsul practic pentru 2026 e: învață local[*] pentru dezvoltare, învață suficient Kubernetes cât să poți citi un YAML SparkApplication și înțelege că serviciile gestionate precum Databricks sunt grozave pentru a nu fi nevoit să te gândești la rest. Pune mâna pe YARN dacă și când un job o cere.

Un spark-submit minim pe care chiar îl poți folosi

Iată un starter portabil. Același script funcționează pe local, standalone, YARN și Kubernetes; doar --master și câteva linii --conf se schimbă:

spark-submit \
  --master "local[*]" \
  --name "my_pipeline" \
  --conf "spark.sql.shuffle.partitions=200" \
  --conf "spark.sql.adaptive.enabled=true" \
  --py-files dependencies.zip \
  --files config.yaml \
  pipelines/my_pipeline.py \
  --input  s3a://my-bucket/raw/2026-05-01 \
  --output s3a://my-bucket/curated/2026-05-01

Bucățile:

  • --master: cluster managerul. local[*], yarn, k8s://... sau spark://....
  • --name: ce apare în UI-ul Spark. Numește-ți joburile. Tu-cel-din-viitor care depanează la 3 dimineața îți va mulțumi.
  • --conf: override-uri de configurare Spark. Cele două de mai sus (shuffle.partitions și AQE) sunt cele mai comune.
  • --py-files: module Python suplimentare zip-uite și livrate executorilor. Orice dincolo de un script de un singur fișier are nevoie de asta.
  • --files: fișiere non-cod (configurări, date de lookup) livrate către directorul de lucru al fiecărui executor.
  • Calea scriptului și orice argumente vrei să le forwardezi către argparse-ul tău.

Pentru un punct de intrare Python, blocul if __name__ == "__main__": al scriptului parsează argumentele, construiește o SparkSession, rulează pipeline-ul, apelează spark.stop() și iese. Același fișier pe care l-ai rula local cu python my_pipeline.py, cu aceleași argumente argparse. Portabilitatea vine din faptul că scriptului nu îi pasă cum a fost lansat.

Un exercițiu de practică care plătește: ia oricare dintre snippet-urile PySpark multi-pas de mai devreme din acest modul și transformă-le într-un singur pipelines/orders_etl.py submitabil. Citește căile din argparse. spark = SparkSession.builder.appName("orders_etl").getOrCreate(), fără master în cod, așa că preia orice --master cu care faci submit. Rulează-l cu spark-submit --master "local[*]" pipelines/orders_etl.py --input ./data/orders.csv --output ./data/orders.parquet. Odată ce asta funcționează, același script va rula pe un cluster cu un singur flag schimbat.

Ce a acoperit acest modul

Atât pentru Modulul 2. Poți instala Spark, construi o SparkSession, citi CSV / JSON / Parquet, inspecta cu show / count / collect (cu grijă), scrie cu save modes adecvate și partiționare și submite un job la un cluster manager real. Tot ce urmează e despre a face lucruri mai interesante cu DataFrame-urile: selectare, filtrare, join, agregare, windowing și în cele din urmă chestiile de debugging-de-producție din Modulul 10.

Modulul următor: lucrul efectiv cu DataFrame-uri. API-ul Column, select versus selectExpr, withColumn și ciudățeniile lui și obiceiurile mici care separă codul Spark care scalează de codul Spark care nu.

Caută