PySpark, from the ground up Lesson 57 / 60

Memory tuning: executor memory, overhead, OOM diagnostics

The four configs that actually matter, what spill means, how to read an OOM stack trace, and the rule for sizing executors.

There are roughly fifty Spark configuration properties related to memory, and you will see all of them in some Stack Overflow answer somewhere. You need to know about four. The other forty-six are either auto-derived from the four, or relevant once a year for a very specific failure mode, or simply legacy.

This is the lesson where we cut through that. What memory regions Spark actually has, what the four configs control, what the four flavors of OOM look like, what spill means and when it’s fine, and the sizing rule that turns “I have no idea how big to make my executors” into a starting point you can iterate from.

What lives inside an executor

An executor is a JVM process. Each one runs spark.executor.cores tasks in parallel as threads inside that JVM. Memory inside the JVM is divided into regions:

Reserved memory — 300 MB hard-set, for Spark’s own internal objects. You don’t tune this.

User memory — 25% of (executor heap − reserved) by default. This is for objects your code creates outside Spark’s managed structures: UDF closures, broadcast vars you accumulate manually, anything you stick in a regular Python or Scala collection. If a Python UDF builds up a giant local dict, that dict lives here.

Spark unified memory pool — the remaining 75%. Inside this pool, two things compete:

  • Execution memory — sort buffers, hash tables for aggregation, hash tables for joins, shuffle buffers. Anything Spark needs while running operators.
  • Storage memory — cached DataFrames (the .cache() you put on that DataFrame).

The “unified” part of “unified memory” is that execution and storage share the pool dynamically. Storage can borrow from execution and vice versa. There’s a soft floor for each — by default storage is guaranteed 50% of the pool, and execution can evict storage above that floor when it needs space. Eviction means cached partitions get dropped. Execution memory is not evictable — if execution needs more space and there’s nothing storage can give up, Spark spills to disk.

That covers the JVM heap. Outside the heap there’s another region:

Off-heap memory (“overhead”) — JVM stacks, native libraries (think Pandas, Arrow, Tungsten’s off-heap buffers), shuffle network buffers, Python worker processes, container overhead. This isn’t part of spark.executor.memory; it’s a separate budget controlled by spark.executor.memoryOverhead.

Total container memory = spark.executor.memory + spark.executor.memoryOverhead. That’s the number Kubernetes or YARN actually reserves for your executor.

The four configs

These are the ones you’ll touch. Everything else, ignore until you have a specific reason.

spark.executor.memory — the on-heap allocation per executor. The big number. The JVM gets -Xmx set to this value.

spark.executor.memoryOverhead — the off-heap allocation per executor. Default is max(384 MB, 0.10 × spark.executor.memory). Default works for most JVM-only workloads. Bump it when you use PySpark heavily (Python workers live here), heavy Pandas UDFs (Arrow buffers live here), or when you see “container killed by YARN/K8s” errors — those are almost always overhead exhaustion, not heap exhaustion.

spark.driver.memory — heap for the driver process. The driver coordinates the cluster and holds the metadata for your jobs, but it also holds anything you .collect(), the broadcast tables, and the cache for any cached driver-side computation. Default is 1 GB, which is fine for most jobs. Bump it if you broadcast large tables or collect significant results.

spark.driver.maxResultSize — the cap on bytes the driver will accept from .collect() and similar actions. Default 1 GB. If your job dies with Total size of serialized results of N tasks (X GB) is bigger than spark.driver.maxResultSize, you tried to collect more than the cap. Either bump the cap or, more often, don’t collect that data — write it instead.

That’s it. There are also spark.memory.fraction (the unified pool fraction) and spark.memory.storageFraction (the storage soft-floor fraction) but the defaults — 0.6 and 0.5 — are well-tuned and you should leave them alone unless you have a specific diagnosed problem.

A typical production starting point:

spark.executor.memory=12g
spark.executor.memoryOverhead=2g
spark.executor.cores=4
spark.driver.memory=4g
spark.driver.maxResultSize=2g

That’s a 14 GB container per executor with 4 cores. Iterate from there based on what the UI tells you.

Spill: the silent slowdown

When execution memory runs out — say, a sort or aggregation produces more intermediate data than fits — Spark doesn’t OOM. It spills. The current in-memory state is serialized and written to local disk, the in-memory buffer is freed, and the operator continues. When the operator finishes, the spilled chunks are merged back from disk.

In the Stages tab Task table you see two columns:

  • Spill (Memory) — uncompressed size of what was pushed out.
  • Spill (Disk) — compressed bytes actually written to local disk.

Spill is not an error. It’s Spark gracefully degrading. But it’s slow — disk I/O is orders of magnitude slower than RAM, and a heavily spilling job runs at a fraction of its in-memory speed. Some spill under pressure is fine. Spill on every task, gigabytes per task, every stage — that’s a job begging for more memory.

Common causes of spill:

  • Partition too big. A 10 GB partition will spill on any reasonable executor. Repartition to make partitions smaller (target: ~128 MB to 1 GB per partition for most workloads).
  • Sort or aggregation on too many keys. A groupBy over a high-cardinality column builds a huge hash table.
  • Too many cores per executor for the heap. Each task needs working memory; with N cores and X heap, each task gets roughly X/N. Too many cores = each task starves.

Fix order: first fix partition size (cheap, code-only), then bump executor memory, then drop cores per executor.

The four flavors of OOM

When memory runs out and Spark can’t spill its way out of it, you get an OOM. There are four distinct failure modes; the fix depends on which one you have.

1. Executor JVM OOM during execution

The executor’s heap fills up beyond what spilling can recover. The JVM dies with:

java.lang.OutOfMemoryError: Java heap space

or

java.lang.OutOfMemoryError: GC overhead limit exceeded

The executor process dies, the cluster manager restarts it, the failed tasks retry on another executor. If the data is genuinely too big, the retry fails too, and the stage fails after spark.task.maxFailures (default 4) attempts.

Fixes, in order:

  • Smaller partitions (.repartition(N) with bigger N).
  • Bigger executor heap (spark.executor.memory).
  • Fewer cores per executor (less competition for the heap).
  • If it’s a join, check whether you should be broadcasting the small side (huge hash tables eat heap fast).
  • If it’s a collect() or toPandas() causing it on the driver, stop doing that.

2. Container killed by YARN/K8s

The executor exceeded its total memory limit (heap + overhead). The cluster manager kills the container. The error in the driver log:

Container killed by YARN for exceeding memory limits.
14.5 GB of 14 GB physical memory used.
Consider boosting spark.executor.memoryOverhead.

Or on Kubernetes:

Container exited with exit code 137 (OOMKilled)

This is not a heap OOM. The heap was probably fine. The off-heap overhead — Python workers, Arrow buffers, native libraries, network buffers — overflowed. The fix is to bump spark.executor.memoryOverhead, not spark.executor.memory. People miss this all the time and bump heap for hours wondering why nothing changes.

If you’re running PySpark with heavy UDFs or Pandas UDFs, expect to need 3-4 GB of overhead, not the default 384 MB.

3. Driver OOM

The driver runs out of heap. Stack trace points to driver-side code, often:

java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.sql.Dataset.collect(...)

Causes:

  • .collect() on a too-big DataFrame. Even a .toPandas() on a 1B-row table will kill the driver.
  • Broadcasting a table that’s bigger than you thought. Driver collects it before broadcasting.
  • A huge query plan (thousands of operators, e.g. dynamically-generated SQL) — Catalyst itself uses memory.
  • Caching with LocalRelation data (small DataFrames built with createDataFrame from a big Python list).

Fix: don’t collect. Write to a sink instead. If you genuinely need a small result locally, .limit(N) first. If you must broadcast a big table, you probably shouldn’t — it’s not a small table any more.

4. Excessive spill (not technically an OOM)

The job doesn’t fail. It just takes 3 hours instead of 20 minutes. The Stages tab shows gigabytes of Spill (Disk) per task on multiple stages. GC time is 30%+ of task time.

This isn’t an OOM but it’s the OOM-adjacent failure mode that catches people: the pipeline “works” so nobody notices, until cost reviews show the cluster is twice as expensive as it should be. Treat it like an OOM. Same fixes — smaller partitions, more memory, fewer cores per executor.

How to actually diagnose an OOM

When an executor dies, the executor log on the cluster has the JVM’s last words. In the Spark UI, click the dead executor in the Executors tab → “stderr” → search for OutOfMemoryError or Container killed. The first line of the stack trace tells you whether it was a heap OOM (operator names visible — Sort, HashAggregate, ShuffleExternalSorter) or an overhead kill (kernel OOM message, no stack).

The pattern of failures matters too:

  • One task per stage fails repeatedly — skew. One key has too much data. Salting, AQE skew handling, or a different partitioning strategy.
  • All tasks in a stage fail — the workload is too big for current executors. Resize.
  • Random tasks fail across stages — usually overhead pressure. Bump overhead.

The sizing rule

Here’s a starting point that works for most batch workloads. It’s not optimal for any specific job — it’s the place to start before you tune.

executor heap = (per-task working memory) × (cores per executor) × 1.3 (headroom)
overhead     = max(384 MB, 0.10 × heap, 2 GB if PySpark with UDFs)
container    = heap + overhead

For a job with ~1.5 GB of working memory per task (a moderate join + aggregation) and 4 cores per executor:

heap      = 1.5 × 4 × 1.3 ≈ 8 GB
overhead  = max(800 MB, 2 GB if PySpark) = 2 GB
container = 10 GB

Then size number of executors based on total cluster cores you want and cores per executor. If you want 64 cores of parallelism with 4-core executors, that’s 16 executors.

Few big executors vs many small ones

This is the eternal Spark sizing question. Two extremes:

Big executors (16 cores, 64 GB). Pros: cached data is local across many tasks (no re-fetch), shuffle has fewer connections (fewer × fewer = much fewer), broadcast is cheaper per task. Cons: a single OOM kills 16 tasks worth of work, GC pauses are longer, JVM heap management gets harder above ~32 GB (compressed oops boundary).

Small executors (2 cores, 8 GB). Pros: smaller blast radius, easier GC, more parallelism per node (multiple containers). Cons: more shuffle connections (every executor connects to every other for shuffle — quadratic in executor count), more replication of broadcast data, worse cache locality.

The middle ground that works for most production batch jobs: 4-8 cores per executor, 8-16 GB heap. Above 8 cores you start losing parallelism efficiency to JVM contention; above ~32 GB heap you’re in territory where GC tuning starts to matter and you should consider G1 GC (-XX:+UseG1GC).

For PySpark specifically, lean toward the smaller side — Python workers add memory pressure and very wide executors mean a lot of Python processes per JVM, which fragments overhead memory.

A debugging checklist

When an OOM lands in your inbox:

  1. Which OOM was it? Read the executor log (or driver log if it was a driver OOM). Heap, container kill, or driver?
  2. Which stage? UI → Stages → look for failed tasks. The stage tells you what operation was running.
  3. Is one task much bigger than the others? Skew. Fix at the data layer.
  4. How big are the partitions? UI → Stages → input size / task count. If partitions are >2 GB, repartition.
  5. Is GC time >10% of task time? Memory pressure. Bump heap or drop cores.
  6. Is overhead high? PySpark or Pandas UDFs? Bump memoryOverhead.
  7. Did someone add a .collect()? Check recent code changes.

That checklist resolves the vast majority of memory incidents. The rest are weird ones — native library leaks, off-heap allocator bugs, that sort of thing — and at that point you’re in a different kind of conversation, usually with the platform team.

We’ve done the UI, plans, and memory. Next lesson opens the second half of Module 10 with skew, broadcast tuning, and the partitioning configs that turn a slow job into a fast one.

Search