PySpark, de la zero Lecția 60 / 60

Un health check de 30 de minute pe un cluster Spark pe care nu l-ai mai vazut

Checklist-ul capstone: ti se da laptopul, ai timp pana la ora 17 sa-ti dai seama ce e stricat.

Ți se dau cheile unui cluster Spark pe care nu l-ai mai văzut. „Spune-ne ce e stricat până la ora 17.”

Lecția asta e checklist-ul de 30 de minute pe care îl rulez. Fiecare pas, în ordine, cu ce să cauți și ce să faci dacă găsești. E lecția finală a cursului fiindcă leagă tot ce am acoperit, partiționare, shuffle-uri, join-uri, memorie, AQE, streaming, tab-ul SQL, tab-ul executors, și împachetează totul într-o singură tură ordonată. O poți printa. O poți lipi în trusa ta. Ziua în care un client nou îți dă un cluster Spark, scoți asta și ești productiv în jumătate de oră.

Mulțumesc că ai rămas prin 60 de lecții. Hai să terminăm cu o tură scriptată prin tot toolkit-ul.

Pasul 1, prezentarea generală a clusterului (2 minute)

Deschide Spark UI (sau UI-ul workspace-ului dacă ești pe Databricks/EMR/Dataproc). Găsește aplicația care rulează. Sus-dreapta îți dă versiunea. Tab-ul Environment îți dă restul.

spark.version                              # 3.5.1, 3.4.2, etc.
spark.sparkContext.master                  # yarn, k8s://..., local[*], spark://...
spark.sparkContext.defaultParallelism      # total cores
sc = spark.sparkContext
print(sc.statusTracker().getExecutorInfos())

Ce cauți:

  • Versiunea Spark, ideal 3.2+, pentru AQE pornit by default. 3.0 / 3.1 înseamnă audit manual de config.
  • Cluster manager, YARN, Kubernetes, Standalone sau vendor-managed. Tooling operațional diferit.
  • Numărul și dimensiunea executorilor, câți, câtă memorie fiecare, câte core-uri fiecare. Compară cu capacitatea anunțată a clusterului.
  • Dimensiunea driverului, driverele subdimensionate crapă la rezultate collect() mari.

Pasul 2, tab-ul Executors (3 minute)

Ăsta e sp_Blitz-ul tău pentru Spark. Apasă pe Executors.

Executor ID | Address       | Status   | Cores | Memory Used | Task Time | GC Time | Failed | Active Tasks
driver      | 10.0.1.4      | Active   | 0     | 0 B / 4 GB  | 0 ms      | 0 ms    | 0      | 0
0           | 10.0.2.10     | Active   | 4     | 8.2 GB / 12 GB | 2.1 h | 18 min  | 0      | 4
1           | 10.0.2.11     | DEAD     | 4     | -           | 1.4 h     | 22 min  | 17     | 0
2           | 10.0.2.12     | Active   | 4     | 11.8 GB / 12 GB | 2.4 h | 1.1 h | 12     | 4

Ce cauți:

  • Executori morți. Dacă vezi unii, uită-te la log-uri (link-ul stderr), OOM, nod pierdut, sau driverul i-a omorât pe idle timeout. Morți recurenți = job-ul tău e instabil.
  • GC time ca fracțiune din task time. Executor 2 de mai sus are GC la ~45% din task time. E catastrofal; lecția 57 (tunarea memoriei) e fix-ul.
  • Numărul de task-uri eșuate. Diferit de zero peste mai mulți executori = problemă reală (probabil skew sau OOM); concentrat pe un singur executor = un nod bolnav.
  • Utilizarea memoriei aproape de limită. Executorii la 11.8 / 12 GB sunt o alocare distanță de spill sau OOM.

Notează cine arată rău și mergi mai departe.

Pasul 3, job-uri care rulează și query-uri blocate (2 minute)

Apasă pe Jobs. Sortează după Duration. Orice rulează > 1 oră merită o întrebare.

# Intr-un notebook:
[(j.jobId, j.name, j.status, j.numTasks, j.numActiveTasks)
 for j in spark.sparkContext.statusTracker().getActiveJobIds()]

Ce cauți:

  • Job-uri long-running care ar trebui să fie rapide (un agregat zilnic care a rulat 6 ore în loc de 30 de minute, candidat pentru kill).
  • Job-uri blocate cu un singur task activ în timp ce restul s-au terminat, coada clasică de skew (lecția 28).
  • Stări blocate pe partea de driver, multe stage-uri terminate dar job-ul nu se finalizează; de obicei un collect() sau toPandas() care trage prea mult.

Ca să omori un job: Spark UI are un link (kill) lângă job-urile care rulează dacă ești admin. Sau spark.sparkContext.cancelJobGroup(...) dacă codul tău a setat un job group.

Pasul 4, statistici cumulative ale job-urilor și cele mai lente 10 (3 minute)

Apasă pe Jobs din nou, derulează la lista Completed Jobs. Sortează după Duration descrescător. Uită-te la primele 10.

Ce au în comun?

  • Toate scriu în același tabel? → layout-ul de storage al acelui tabel ar putea fi gâtul de sticlă (fișiere mici, compactare lipsă, partiționare proastă).
  • Toate fac join cu aceeași dimensiune? → dim-ul ăla ar putea avea nevoie de broadcast (lecția 27) sau o cheie mai bună.
  • Toate citesc din aceeași sursă? → verifică statisticile și partiționarea sursei ăleia.
  • Toate rulează la aceeași oră din zi? → contention pe resurse cu alt job.

Recunoașterea pattern-urilor e mai rapidă decât tunarea fiecăruia.

Pasul 5, utilizarea disk-ului pe workeri (3 minute)

Director-ele locale ale Spark (spark.local.dir, default /tmp) țin fișiere de shuffle, fișiere de spill, fișiere de broadcast și block-uri cache-uite. Se umplu.

# Pe fiecare worker (via SSH sau kubectl exec):
df -h /tmp
du -sh /tmp/spark-* 2>/dev/null | sort -h | tail

Pe platforme manageriate, verifică metrica de disk al workerului în UI-ul platformei. Pe Databricks, ăsta e panoul de storage al clusterului. Pe EMR, metricile de disk Ganglia sau CloudWatch.

Ce cauți:

  • /tmp mai mult de 80% plin → shuffle-ul și spill-ul vor eșua. Adaugă disk sau restartează clusterul.
  • Director-e vechi spark-*-shuffle din job-uri eșuate trecute care nu au fost curățate. Se curăță la shutdown grațios dar persistă după crash-uri. Sigur de șters după ce confirmi că nu rulează nicio aplicație.
  • Fișiere cache care nu au fost curățate (director-e blockmgr-*), aceeași poveste.

Pasul 6, top query-uri costisitoare în tab-ul SQL (3 minute)

Apasă pe SQL / DataFrame. Sortează după Duration. Top 10 query-uri sunt unde se duce timpul clusterului tău.

Pentru fiecare dintre cele mai rele:

  • Apasă în el. Uită-te la plan.
  • Găsește operatorul cel mai rău (timpul raportat cel mai mare).
  • Aplică checklist-ul lecției 58: skew? Strategie de join proastă? Shuffle masiv? Filtru care nu se împinge?

Vinovați frecvenți pe un cluster proaspăt:

  • Un BroadcastNestedLoopJoin undeva, condiție de join lipsă.
  • Un SortMergeJoin între un tabel de 50 GB și un lookup table de 5 MB fiindcă cineva a dezactivat autoBroadcastJoinThreshold cu ani în urmă și a uitat.
  • Un groupBy cu 50 de partiții de 8 GB fiecare fiindcă AQE e oprit.
  • UDF-uri Python (BatchEvalPython) făcând muncă ce ar putea fi o funcție SQL (lecția 40).

Astea sunt query-urile de înregistrat ca follow-up, nu neapărat de reparat astăzi.

Pasul 7, eșecuri recente și clasa de eroare (2 minute)

Pentru clusterele Databricks: tab-ul Event Log și Failed Jobs. Pentru YARN: yarn application -list -appStates FAILED și log-urile aplicației. Pentru Kubernetes: kubectl get pods și pod-urile de executor care au ieșit non-zero.

Grupează eșecurile recente după clasa de eroare:

  • Driver OOM, java.lang.OutOfMemoryError: Java heap space pe driver. De obicei un collect(), toPandas(), sau broadcast uriaș. Lecția 57.
  • Executor OOM, aceeași excepție pe un executor. Skew (lecția 28), cache prost (lecția 23), sau memorie subdimensionată.
  • Lost executor, ExecutorLostFailure urmat de retry de stage. De obicei OOM-killed de OS / container manager. Mărește memoryOverhead.
  • Shuffle fetch failed, FetchFailedException. Un executor a murit în timp ce altul citea de la el. Simptom al oricăruia dintre cele de mai sus.
  • Task failed N times, spark.task.maxFailures (default 4) a fost atins. Bug real sau skew persistent.

Defalcarea îți spune clasa dominantă, care îți spune ce fix să prioritizezi.

Pasul 8, query-uri de streaming și checkpoint-uri (3 minute)

Dacă cineva rulează Structured Streaming pe acest cluster:

for q in spark.streams.active:
    last = q.lastProgress
    print(q.name, q.status, last.get("inputRowsPerSecond"), last.get("processedRowsPerSecond"),
          last.get("batchDuration"), last.get("triggerExecution"))

Ce cauți:

  • Query-uri unde input rate > processed rate susținut → rămâne în urmă, backlog nelimitat.
  • Query-uri cu batchDuration > intervalul de trigger → nu poate ține pasul; cluster mai mare sau workload mai mic.
  • Query-uri unde numInputRows e zero ore întregi → upstream-ul e mort, query-ul e idle.

Apoi verifică storage-ul de checkpoint:

# S3 sau DBFS sau orice e checkpoint root
aws s3 ls s3://my-checkpoints/ --recursive --summarize | tail -3

Ce cauți:

  • Director-e de checkpoint ale query-urilor moarte pe care nu le-a curățat nimeni, irosesc storage.
  • Arbori de checkpoint singulari în GB fiindcă politica de retenție nu prune-uiește state-ul. Query-urile stateful (lecția 53) fără watermark-uri acumulează la nesfârșit.

Pasul 9, job-uri programate și eșecuri recente (2 minute)

Toți cluster manager-ii au view-uri pentru job-urile programate.

  • Databricks: tab-ul Workflows → Jobs → sortează după statusul ultimei rulări.
  • EMR: status Step; rulări Airflow DAG dacă orchestrezi astfel.
  • Dataproc: template-uri Workflow și rulări recente de job-uri.
  • Plain Airflow / Dagster / etc.: instanțe de task-uri eșuate în orchestrator.

Ce cauți:

  • Rulări eșuate în ultimele 7 zile. Pattern-uri: același job eșuând zilnic, aceeași oră a zilei, aceeași eroare?
  • Job-uri care nu au mai rulat de luni. Programare dezactivată? Ar trebui șterse?
  • Rulări care au reușit dar au durat de 4 ori mai mult decât de obicei → regresii silențioase, înregistrează ca follow-up.

Pasul 10, audit de cache (2 minute)

Apasă tab-ul Storage.

RDD Name                | Storage Level    | Cached Partitions | Memory   | Disk
df_users (id 12)        | MEMORY_AND_DISK  | 200 / 200         | 18 GB    | 0
df_huge_facts (id 14)   | MEMORY_ONLY      | 80 / 1200         | 12 GB    | 0
df_unused_2024 (id 5)   | MEMORY_AND_DISK  | 200 / 200         | 6 GB     | 0

Ce cauți:

  • Dataset-uri cache-uite mai mari decât încape în memorie (al doilea rând de mai sus, doar 80 din 1200 de partiții cache-uite). Cache-ul nu face nimic; fie mărește memoria, fie nu mai cache-ui.
  • Dataset-uri cache-uite care n-au mai fost atinse de ore întregi, leak-uite dintr-un notebook care e încă atașat. Unpersist-uiește-le.
  • DataFrame-uri cache-uite multiple foarte similare, cineva a cache-uit aceleași date de trei ori sub nume diferite de variabile.

spark.sparkContext._jsc.getPersistentRDDs() listează tot ce e cache-uit în acest moment, dacă vrei să scriptezi auditul.

Pasul 11, mentenanță Delta / Iceberg pe tabele (2 minute)

Dacă clusterul citește/scrie tabele Delta sau Iceberg, verifică dacă rulează mentenanță:

DESCRIBE HISTORY my.table LIMIT 10;     -- ultimele operatii
DESCRIBE DETAIL my.table;                -- numFiles, sizeInBytes

Ce cauți:

  • numFiles > 10.000 pe tabele sub câțiva TB → problemă de small files, are nevoie de OPTIMIZE.
  • Niciun OPTIMIZE în istoric de luni de zile → programează compactare săptămânală.
  • Niciun VACUUM nici el → fișiere versionate vechi acumulându-se în object storage; costă bani.
  • Echivalente Iceberg: expire_snapshots și rewrite_data_files, tot pe schedule.

Fără astea, tabelele tale se degradează în liniște.

Pasul 12, sanitate la configurarea memoriei (2 minute)

sc.getConf().getAll()

Sau pur și simplu tab-ul Environment în UI, derulează la Spark Properties.

Ce cauți:

  • spark.executor.memory și spark.executor.memoryOverhead, overhead-ul ar trebui să fie aproximativ 10% din memoria executorului sau 1 GB, oricare e mai mare. Mai strâns de atât și YARN/K8s va omorî container-ele (lecția 57).
  • spark.driver.memory, dacă cineva apelează collect() pe acest cluster, asta contează. Default-ul de 1 GB e prea mic pentru orice serios.
  • spark.executor.cores, de obicei 4-5. Mai mult înseamnă mai multă contention pentru JVM, mai puțin înseamnă mai multe JVM-uri (overhead).
  • spark.sql.shuffle.partitions, dacă e 200 (default-ul) și procesezi terabyți, ai o problemă. AQE ajută dar nu repară totul.

Dacă container-ele sunt omorâte (verifică lista de executori la pasul 2 plus evenimente YARN/K8s), memoryOverhead e primul lucru de mărit.

Pasul 13, drift de configurare (2 minute)

Același tab Environment. Filtrează după spark.sql.* și spark.shuffle.*. Scanează după non-default-uri.

Ce cauți:

  • spark.sql.adaptive.enabled = false → pornește-l înapoi (lecția 59) decât dacă cineva are un motiv documentat.
  • spark.sql.autoBroadcastJoinThreshold = -1 → broadcast dezactivat. Aproape niciodată o idee bună pe un workload real.
  • spark.sql.shuffle.partitions = 2000 → setat sus o dată pentru un job ad-hoc, niciodată resetat. Acum afectează fiecare job pe cluster.
  • Serializeri custom, codec-uri, evict policies, fiecare non-default are nevoie de un comentariu în docs-ul echipei tale care să explice de ce. Dacă nu știe nimeni de ce, suspect.

Documentează fiecare non-default cu justificarea sa. Orice nu poți justifica, revert.

Pasul 14, scrie raportul (3 minute)

Ia notițele și transformă-le într-un raport scurt. Oglindește template-ul SQL Server, ține-l scurt, prioritizat, acționabil.

Customer X Spark Cluster Health Check, 2026-06-18

  • Overall: Galben. Două findings P1.
  • Critical:
    • Query-ul de streaming clickstream-aggs rămâne în urmă cu 12 ore; input rate de 4 ori mai mare decât processed rate. Fie cluster mai mare, fie scade workload-ul. Decide astăzi.
    • Cluster-ul rulează Spark 3.0 → AQE oprit by default, mai multe job-uri ar beneficia imediat. Planifică un upgrade sau backport-ează configurile AQE.
  • High:
    • OOM-uri de executor zilnic pe job-ul daily-rollup. Semnal: skew pe country_code. Salt sau AQE.
    • 4 din 12 executori arată GC time > 30% din task time. Mărește memoria executorului sau micșorează dataset-urile cache-uite.
    • Niciodată rulat OPTIMIZE pe events_delta; 47.000 de fișiere mici pe un tabel de 800 GB. Programează OPTIMIZE săptămânal.
  • Medium:
    • BroadcastNestedLoopJoin în top-ul query-urilor costisitoare (weekly_finance). Condiție de join lipsă; query-ul durează 40 de minute per rulare.
    • spark.sql.shuffle.partitions = 4000 setat pentru un job ad-hoc în martie, niciodată reverted; afectează negativ job-urile mici.
    • df_unused_2024 cache-uit (6 GB, MEMORY_AND_DISK) atașat la un notebook idle. Unpersist.
  • Low:
    • 12 log-uri de executori morți acumulate în /tmp pe workeri. Adaugă un cron de cleanup.
    • Job-ul etl-archive n-a mai rulat cu succes din ianuarie. Dezactivează sau repară.
  • Follow-up:
    • Audit la toate configurile Spark față de default-uri; documentează cele păstrate.
    • Migrează clusterul la Spark 3.5 LTS.
    • Setează schedule pentru Delta OPTIMIZE + VACUUM peste toate tabelele de producție.
    • Adaugă dashboard-uri Prometheus / DataDog la nivel de cluster pentru GC executor, volum shuffle, rata de eșec a task-urilor.

Asta e livrabilul tău. Scurt, acționabil, prioritizat. Trimite-l prin email cui ți-a dat cheile clusterului.

Pasul 15, ce să faci după acest curs

Acum cunoști echivalentul a câțiva ani de experiență Spark on-the-job, comprimată în 60 de lecții. Ce să citești și să urmărești în continuare:

  1. Documentația Spark, Performance Tuning și secțiunea AQE. Sursa de adevăr pentru fiecare config pe care o vei atinge.
  2. Gitbook-ul lui Jacek Laskowski, The Internals of Apache Spark și The Internals of Spark SQL. Gratuit, profund tehnic, locul unde mergi când trebuie să știi ce face efectiv un operator.
  3. Cărțile și prezentările lui Holden Karau, High Performance Spark și Learning Spark. Orientate spre producție, scrise de cineva care face debugging la clustere Spark reale de ani de zile.
  4. Blogul Databricks, părtinitor spre platforma lor dar plin de deep dives despre Catalyst, AQE, Delta, Photon. Citește cu filtrul de bias pornit.
  5. Apache Spark JIRA, când lovești un bug ciudat, caută-l. E o șansă de 50/50 ca fix-ul să fie în următoarea versiune minor.
  6. Practică pe un workload real, un sandbox cu TPC-DS sau datele tale. Rulează health check-ul, repară findings, repetă lunar. Așa se formează memoria musculară.

Felicitări

Ai trecut prin 60 de lecții de PySpark. Acum cunoști:

  • Fundamentele, ce e Spark, arhitectura, ierarhia RDD/DataFrame/Dataset (Modulele 1-2).
  • API-ul DataFrame, citire, scriere, transformare, agregare, window, pivot, UDF (Modulele 3-4).
  • Mecanica execuției, lazy evaluation, narrow vs wide, DAG-ul, caching, shuffles, joins, broadcast, skew, salting (Modulele 5-6).
  • Partitioning, bucketing, file layout, on-disk vs in-memory (Modulul 7).
  • Optimizatorul, Catalyst, Tungsten, tab-ul SQL, formate de fișiere și JDBC și cloud storage (Modulul 8).
  • Streaming, surse, watermark-uri, operații stateful, output modes, sinks (Modulul 9).
  • Producție, debugging la job-uri lente, AQE, health check-ul de cluster (Modulul 10).

Dacă acest curs și-a făcut treaba, ești gata să deschizi un cluster Spark pe care nu l-ai mai văzut, să-i diagnostichezi problemele în 30 de minute, să repari problemele de top și să nu intri în panică când pager-ul de on-call sună la 3 dimineața. Ăsta e standardul. Asta înseamnă „să cunoști PySpark” la acest nivel.

Vei tot învăța lucruri noi în fiecare săptămână, asta e meseria. Dar acum ai cadrul. Următorul blog post sau release note se potrivește într-o structură care are sens, în loc să fie un zid de termeni nefamiliari.

Du-te și ai grijă de pipeline-urile tale. Ele depind de tine.

Mulțumesc pentru lectură. Dacă ai observat erori, ai sugestii sau vrei să te cerți despre dacă repartition() sau coalesce() e alegerea corectă, salută.

Caută