Există în jur de cincizeci de proprietăți de configurare Spark legate de memorie și le vei vedea pe toate într-un răspuns Stack Overflow undeva. Trebuie să știi de patru. Celelalte patruzeci și șase fie sunt derivate automat din cele patru, fie sunt relevante o dată pe an pentru un mod de eșec foarte specific, fie sunt pur și simplu legacy.
E lecția în care tăiem prin asta. Ce regiuni de memorie are Spark de fapt, ce controlează cele patru config-uri, cum arată cele patru arome de OOM, ce înseamnă spill și când e ok, și regula de sizing care transformă „n-am idee cât de mari să fac executorii” într-un punct de plecare de la care poți itera.
Ce trăiește în interiorul unui executor
Un executor e un proces JVM. Fiecare rulează spark.executor.cores task-uri în paralel ca thread-uri în interiorul acelui JVM. Memoria din interiorul JVM-ului e împărțită în regiuni:
Reserved memory: 300 MB hard-set, pentru obiectele interne ale Spark. Asta nu o tunezi.
User memory: 25% din (executor heap − reserved) implicit. E pentru obiectele pe care codul tău le creează în afara structurilor managed ale Spark: closure-uri de UDF, broadcast vars pe care le acumulezi manual, orice pui într-o colecție Python sau Scala obișnuită. Dacă un UDF Python construiește un dict local gigantic, dictul ăla trăiește aici.
Spark unified memory pool: cei 75% rămași. În interiorul acestui pool, două lucruri concurează:
- Execution memory: buffere de sort, tabele hash pentru agregare, tabele hash pentru join, buffere de shuffle. Orice are nevoie Spark în timp ce rulează operatori.
- Storage memory: DataFrame-uri cached (
.cache()-ul pe care l-ai pus pe acel DataFrame).
Partea „unified” din „unified memory” e că execution și storage împart pool-ul dinamic. Storage poate împrumuta de la execution și viceversa. Există un soft floor pentru fiecare, implicit storage are garantat 50% din pool și execution poate evicta storage peste acel floor când are nevoie de spațiu. Eviction înseamnă că partițiile cached sunt aruncate. Execution memory nu e evictabil, dacă execution are nevoie de mai mult spațiu și nu e nimic pe care storage să-l poată ceda, Spark face spill pe disc.
Asta acoperă heap-ul JVM. În afara heap-ului mai e o regiune:
Off-heap memory („overhead”): stack-uri JVM, biblioteci native (gândește-te la Pandas, Arrow, bufferele off-heap ale Tungsten), buffere de rețea pentru shuffle, procese Python worker, container overhead. Asta nu face parte din spark.executor.memory; e un buget separat controlat de spark.executor.memoryOverhead.
Memoria totală a containerului = spark.executor.memory + spark.executor.memoryOverhead. Ăla e numărul pe care Kubernetes sau YARN îl rezervă efectiv pentru executor-ul tău.
Cele patru config-uri
Astea sunt cele pe care le vei atinge. Tot restul, ignoră până ai un motiv specific.
spark.executor.memory: alocarea on-heap per executor. Numărul mare. JVM-ul primește -Xmx setat la această valoare.
spark.executor.memoryOverhead: alocarea off-heap per executor. Default e max(384 MB, 0.10 × spark.executor.memory). Default-ul funcționează pentru majoritatea workload-urilor JVM-only. Crește-l când folosești PySpark intens (Python workers trăiesc aici), Pandas UDF grele (bufferele Arrow trăiesc aici), sau când vezi erori „container killed by YARN/K8s”, alea sunt aproape mereu epuizare de overhead, nu epuizare de heap.
spark.driver.memory: heap pentru procesul de driver. Driverul coordonează clusterul și ține metadatele pentru job-urile tale, dar ține și orice faci .collect(), tabelele de broadcast și cache-ul pentru orice computație cached pe partea de driver. Default-ul e 1 GB, ok pentru majoritatea job-urilor. Crește-l dacă faci broadcast la tabele mari sau colectezi rezultate semnificative.
spark.driver.maxResultSize: plafonul pe bytes pe care driverul îl va accepta de la .collect() și acțiuni similare. Default 1 GB. Dacă jobul tău moare cu Total size of serialized results of N tasks (X GB) is bigger than spark.driver.maxResultSize, ai încercat să colectezi mai mult decât plafonul. Fie crește plafonul, fie, mai des, nu colecta datele alea, scrie-le în loc.
Atât. Mai există și spark.memory.fraction (fracțiunea pool-ului unificat) și spark.memory.storageFraction (fracțiunea soft-floor pentru storage), dar default-urile, 0.6 și 0.5, sunt bine tunate și ar trebui să le lași în pace dacă nu ai o problemă specifică diagnosticată.
Un punct de plecare tipic pentru producție:
spark.executor.memory=12g
spark.executor.memoryOverhead=2g
spark.executor.cores=4
spark.driver.memory=4g
spark.driver.maxResultSize=2g
Ăla e un container de 14 GB per executor cu 4 cores. Iterează de acolo pe baza a ceea ce îți spune UI-ul.
Spill: încetinirea tăcută
Când execution memory se termină, să zicem că un sort sau o agregare produce mai multe date intermediare decât încap, Spark nu face OOM. Face spill. Starea curentă in-memory e serializată și scrisă pe discul local, bufferul in-memory e eliberat și operatorul continuă. Când operatorul se termină, chunks-urile spilled sunt merged înapoi de pe disc.
În tabela de Task din Stages tab vezi două coloane:
- Spill (Memory): mărimea necomprimată a ceea ce a fost împins afară.
- Spill (Disk): bytes comprimați scriși efectiv pe discul local.
Spill nu e o eroare. E Spark care degradează cu grație. Dar e lent, I/O-ul de disc e cu ordine de mărime mai lent decât RAM, iar un job care face spill puternic rulează la o fracțiune din viteza in-memory. Niște spill sub presiune e ok. Spill pe fiecare task, gigabytes per task, fiecare stage, ăla e un job care cerșește mai multă memorie.
Cauze comune ale spill-ului:
- Partiție prea mare. O partiție de 10 GB va face spill pe orice executor rezonabil. Repartiționează ca să faci partițiile mai mici (țintă: ~128 MB până la 1 GB per partiție pentru majoritatea workload-urilor).
- Sort sau agregare pe prea multe chei. Un
groupBypeste o coloană de cardinalitate mare construiește o tabelă hash uriașă. - Prea multe cores per executor pentru heap. Fiecare task are nevoie de memorie de lucru; cu N cores și X heap, fiecare task primește aproximativ X/N. Prea multe cores = fiecare task înfometează.
Ordinea de fix: mai întâi repară mărimea partiției (ieftin, doar cod), apoi crește memoria executor-ului, apoi scade cores per executor.
Cele patru arome de OOM
Când memoria se termină și Spark nu poate face spill ca să iasă din asta, primești un OOM. Sunt patru moduri distincte de eșec; fix-ul depinde de care îl ai.
1. OOM JVM la executor în timpul execuției
Heap-ul executor-ului se umple peste ce poate recupera spilling-ul. JVM-ul moare cu:
java.lang.OutOfMemoryError: Java heap space
sau
java.lang.OutOfMemoryError: GC overhead limit exceeded
Procesul executor-ului moare, cluster manager-ul îl repornește, task-urile eșuate sunt reîncercate pe alt executor. Dacă datele sunt cu adevărat prea mari, retry-ul eșuează și el, iar stage-ul eșuează după spark.task.maxFailures (default 4) încercări.
Fix-uri, în ordine:
- Partiții mai mici (
.repartition(N)cu N mai mare). - Heap mai mare pentru executor (
spark.executor.memory). - Mai puține cores per executor (mai puțină competiție pentru heap).
- Dacă e un join, verifică dacă ar trebui să faci broadcast la partea mică (tabelele hash uriașe mănâncă heap rapid).
- Dacă e un
collect()sautoPandas()care îl cauzează pe driver, oprește-te din asta.
2. Container omorât de YARN/K8s
Executor-ul a depășit limita lui totală de memorie (heap + overhead). Cluster manager-ul omoară containerul. Eroarea în log-ul de driver:
Container killed by YARN for exceeding memory limits.
14.5 GB of 14 GB physical memory used.
Consider boosting spark.executor.memoryOverhead.
Sau pe Kubernetes:
Container exited with exit code 137 (OOMKilled)
Asta nu e un OOM de heap. Heap-ul a fost probabil ok. Off-heap overhead-ul, Python workers, buffere Arrow, biblioteci native, buffere de rețea, a depășit. Fix-ul e să crești spark.executor.memoryOverhead, nu spark.executor.memory. Lumea ratează asta tot timpul și crește heap-ul ore în șir întrebându-se de ce nu se schimbă nimic.
Dacă rulezi PySpark cu UDF-uri grele sau Pandas UDF-uri, așteaptă-te să ai nevoie de 3-4 GB de overhead, nu de default-ul de 384 MB.
3. OOM de driver
Driverul rămâne fără heap. Stack trace-ul indică spre cod pe partea de driver, deseori:
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.Dataset.collect(...)
Cauze:
.collect()pe un DataFrame prea mare. Chiar și un.toPandas()pe o tabelă de 1 miliard de rânduri va omorî driverul.- Broadcast la o tabelă mai mare decât credeai. Driverul o colectează înainte de broadcast.
- Un plan de query uriaș (mii de operatori, de ex. SQL generat dinamic), Catalyst însuși folosește memorie.
- Caching cu date
LocalRelation(DataFrame-uri mici construite cucreateDataFramedintr-o listă Python mare).
Fix: nu colecta. Scrie într-un sink. Dacă ai cu adevărat nevoie de un rezultat mic local, .limit(N) mai întâi. Dacă trebuie să faci broadcast la o tabelă mare, probabil n-ar trebui, nu mai e o tabelă mică.
4. Spill excesiv (nu e tehnic un OOM)
Jobul nu eșuează. Doar durează 3 ore în loc de 20 de minute. Tab-ul Stages arată gigabytes de Spill (Disk) per task pe mai multe stages. GC time e 30%+ din task time.
Asta nu e un OOM, dar e modul de eșec OOM-adiacent care prinde lumea: pipeline-ul „funcționează” așa că nimeni nu observă, până când review-urile de costuri arată că clusterul e de două ori mai scump decât ar trebui. Tratează-l ca pe un OOM. Aceleași fix-uri, partiții mai mici, mai multă memorie, mai puține cores per executor.
Cum să diagnostichezi efectiv un OOM
Când un executor moare, log-ul executor-ului de pe cluster are ultimele cuvinte ale JVM-ului. În Spark UI, click pe executor-ul mort în tab-ul Executors → „stderr” → caută OutOfMemoryError sau Container killed. Prima linie a stack trace-ului îți spune dacă a fost un OOM de heap (nume de operatori vizibile, Sort, HashAggregate, ShuffleExternalSorter) sau un kill de overhead (mesaj OOM de kernel, fără stack).
Tiparul eșecurilor contează și el:
- Un task per stage eșuează repetat: skew. O cheie are prea multe date. Salting, AQE skew handling sau o strategie diferită de partiționare.
- Toate task-urile dintr-un stage eșuează: workload-ul e prea mare pentru executorii actuali. Resize.
- Task-uri aleatorii eșuează prin stage-uri: de obicei presiune de overhead. Crește overhead-ul.
Regula de sizing
Iată un punct de plecare care funcționează pentru majoritatea workload-urilor batch. Nu e optim pentru niciun job specific, e locul de start înainte să tunezi.
executor heap = (per-task working memory) × (cores per executor) × 1.3 (headroom)
overhead = max(384 MB, 0.10 × heap, 2 GB if PySpark with UDFs)
container = heap + overhead
Pentru un job cu ~1.5 GB de memorie de lucru per task (un join + agregare moderate) și 4 cores per executor:
heap = 1.5 × 4 × 1.3 ≈ 8 GB
overhead = max(800 MB, 2 GB if PySpark) = 2 GB
container = 10 GB
Apoi sizează numărul de executori pe baza totalului de cores de cluster pe care îl vrei și a cores per executor. Dacă vrei 64 cores de paralelism cu executori de 4 cores, ăla înseamnă 16 executori.
Puțini executori mari vs mulți mici
E veșnica întrebare de sizing din Spark. Două extreme:
Executori mari (16 cores, 64 GB). Pro-uri: datele cached sunt locale între multe task-uri (fără re-fetch), shuffle-ul are mai puține conexiuni (mai puține × mai puține = mult mai puține), broadcast-ul e mai ieftin per task. Con-tra: un singur OOM omoară 16 task-uri de muncă, pauzele GC sunt mai lungi, managementul heap-ului JVM devine mai greu peste ~32 GB (granița de compressed oops).
Executori mici (2 cores, 8 GB). Pro-uri: blast radius mai mic, GC mai ușor, mai mult paralelism per nod (mai multe containere). Con-tra: mai multe conexiuni de shuffle (fiecare executor se conectează la fiecare alt pentru shuffle, cuadratic în numărul de executori), mai multă replicare a datelor de broadcast, locality de cache mai proastă.
Calea de mijloc care funcționează pentru majoritatea job-urilor batch de producție: 4-8 cores per executor, 8-16 GB heap. Peste 8 cores începi să pierzi eficiența de paralelism din cauza contenției JVM; peste ~32 GB heap ești pe teritoriu unde GC tuning începe să conteze și ar trebui să consideri G1 GC (-XX:+UseG1GC).
Pentru PySpark în particular, înclină spre partea mai mică, Python workers adaugă presiune de memorie iar executorii foarte largi înseamnă o grămadă de procese Python per JVM, ceea ce fragmentează memoria de overhead.
O listă de verificare pentru debugging
Când un OOM aterizează în inbox-ul tău:
- Care OOM a fost? Citește log-ul executor-ului (sau log-ul driverului dacă a fost OOM de driver). Heap, container kill, sau driver?
- Care stage? UI → Stages → caută task-uri eșuate. Stage-ul îți spune ce operație rula.
- E un task mult mai mare decât celelalte? Skew. Repară la nivelul datelor.
- Cât de mari sunt partițiile? UI → Stages → input size / task count. Dacă partițiile sunt >2 GB, repartiționează.
- E GC time >10% din task time? Presiune de memorie. Crește heap-ul sau scade cores.
- E overhead-ul mare? PySpark sau Pandas UDF? Crește
memoryOverhead. - A adăugat cineva un
.collect()? Verifică schimbările recente de cod.
Lista aia rezolvă marea majoritate a incidentelor de memorie. Restul sunt cele ciudate, leak-uri în biblioteci native, bug-uri în alocatorul off-heap, genul ăla, și în acel punct ești într-un alt fel de conversație, de obicei cu echipa de platformă.
Am făcut UI-ul, planurile și memoria. Lecția următoare deschide a doua jumătate a Modulului 10 cu skew, broadcast tuning și config-urile de partiționare care transformă un job lent într-unul rapid.