PySpark, from the ground up Lesson 23 / 60

Caching and persistence: storage levels, when each makes sense

df.cache() and df.persist() — what they actually do, the storage levels Spark offers, and the typical patterns where caching pays off.

Lesson 22 covered how Spark splits your job into stages and tasks. One thing the DAG model implies but doesn’t make obvious: every action recomputes the whole graph from the source. If you call .count() and then .write.parquet() on the same DataFrame, Spark reads the source twice, runs all the filters twice, runs all the joins twice. Spark is lazy by default and forgetful by default.

Caching is the lever you pull when you’re going to use a DataFrame more than once. It tells Spark “the next time you compute this, hold on to the result; I’ll be back.” Used well, it cuts iteration time on multi-output jobs by orders of magnitude. Used badly, it wastes memory and slows things down. Today’s lesson is the well-used version. Tomorrow’s lesson (24) is the badly-used version.

.cache() vs .persist()

The two methods on every DataFrame:

df.cache()                                    # shorthand
df.persist()                                  # same as .cache() with default level
df.persist(StorageLevel.MEMORY_AND_DISK)      # explicit, identical to above
df.persist(StorageLevel.DISK_ONLY)            # different storage level

.cache() is exactly equivalent to .persist(StorageLevel.MEMORY_AND_DISK). That’s it. No magic, no difference. Use .cache() for the common case; reach for .persist(...) when you need a non-default storage level.

Historical landmine worth knowing about: in Spark 1.x, .cache() defaulted to MEMORY_ONLY. If your DataFrame didn’t fit in memory, parts of it just got dropped, and Spark recomputed those parts on demand — silently slow. Spark 2.0 changed the default to MEMORY_AND_DISK, which spills the overflow to local disk instead of dropping it. Much friendlier. If you ever read old Spark blog posts that warn about “cached DataFrames disappearing,” that’s the world they’re describing. We don’t live in it anymore.

The storage levels

StorageLevel lives in pyspark.storagelevel. The seven levels you’ll meet:

  • MEMORY_ONLY — store deserialized JVM objects in executor heap. Fast access, biggest memory footprint. If it doesn’t fit, partitions get dropped and recomputed on use. Rarely the right choice today.
  • MEMORY_AND_DISK (default for .cache()) — try memory first; spill the rest to local disk. The “no surprises” choice. Use this for almost all caching.
  • MEMORY_ONLY_SER — store serialized bytes (Kryo or Java). Smaller in memory than MEMORY_ONLY, but every read pays a deserialization cost. Niche.
  • MEMORY_AND_DISK_SER — like above, but spills serialized bytes to disk too. Used when memory pressure is severe and you can pay the deserialization tax for less RAM.
  • DISK_ONLY — straight to local disk, no memory copy. Useful for genuinely huge DataFrames that don’t fit in cluster memory but cost a lot to recompute (e.g., a heavy join you’ll re-use).
  • OFF_HEAP — store outside the JVM heap. Originally for Tachyon/Alluxio integration. Rare in modern PySpark.
  • The _2 variants — every level above has a _2 flavor (MEMORY_AND_DISK_2, DISK_ONLY_2, etc.) that replicates each cached partition on two executors. Useful if you have a long-running interactive session and an executor death would be very expensive to recover from.

If you don’t have a strong reason to pick something else, MEMORY_AND_DISK is the answer. That’s what .cache() gives you. Move on.

Caching is itself lazy

Here’s the detail that catches everyone the first time:

df = spark.read.parquet("big_input/")
df.cache()           # marks df for caching... but does NOTHING yet
df.filter(...).count()   # NOW df gets read AND cached

Calling .cache() doesn’t trigger computation. It just registers a flag: “next time you compute me, hold on to the result.” The actual materialization happens when an action runs against the DataFrame.

That has a subtle consequence. If your “next action” only touches part of the DataFrame, only that part might end up cached. For example:

df = spark.read.parquet("orders/").cache()

# This action only computes the first few rows
df.show()

# This action needs the rest — and might trigger a re-read
df.count()

.show() only materializes a few partitions (whatever it takes to fill the display). The rest of the DataFrame isn’t cached yet. When .count() runs, Spark reads the missing partitions from source.

The canonical “force the cache to populate fully right now” idiom:

df.cache()
df.count()    # actions on the full DataFrame; populates cache end-to-end

You’ll see this in production code constantly. It’s not lazy laziness — it’s a deliberate “warm the cache before the loop starts” pattern.

When caching pays off

Three typical patterns where caching is the right call.

1. Multiple actions on the same DataFrame

prepared = (
    spark.read.parquet("orders/")
        .filter(col("year") == 2026)
        .join(broadcast(customers), "customer_id")
        .withColumn("revenue_eur", col("total") * col("eur_rate"))
)
prepared.cache()

prepared.write.parquet("out/by_country/", partitionBy="country")
prepared.write.parquet("out/by_status/", partitionBy="status")
prepared.groupBy("country").sum("revenue_eur").write.parquet("out/summary/")

Without .cache(), that big read-filter-join-withColumn pipeline runs three times. With it, once. On a real dataset that’s the difference between a 9-minute and a 3-minute job.

2. Iterative algorithms

ML training, graph traversal, anything that loops over the same data:

training_set = featurize(raw).cache()
training_set.count()    # warm the cache

for epoch in range(20):
    model = model.update(training_set)

Each iteration reads training_set once. Without the cache, every iteration recomputes featurize(raw) from scratch. Twenty epochs = twenty unnecessary recomputations.

3. Interactive notebook exploration

You’re poking at a complex DataFrame, running 15 different queries against it. Cache it once at the top of the notebook, then explore freely. Just remember to .unpersist() it (or restart the kernel) when you move on, otherwise it sits in cluster memory forever.

Unpersist: the production move

LRU eviction will eventually kick stale cached DataFrames out — but “eventually” can mean “after they’ve been wasting memory for the whole job.” In production code, free the cache explicitly:

prepared = expensive_pipeline().cache()
prepared.count()

# ... use prepared in 3 different writes ...

prepared.unpersist()   # free the memory; we're done with it

Especially important inside long-running Spark applications (notebooks, structured streaming jobs, scheduled batch jobs that share a SparkSession across tasks). Unpersist what you don’t need anymore. Memory is a resource; treat it like one.

Picking a non-default storage level on purpose

The default — MEMORY_AND_DISK — is right almost always. Here are the few cases where reaching for something else is justified.

DISK_ONLY when:

  • Your DataFrame is much larger than total executor memory and you’ll re-use it heavily. Caching to memory would just thrash; disk-only avoids that and the data gets read sequentially when it’s needed.
  • You want to free up memory for the actual computation (joins, aggregations) and just need a stable on-disk artifact. Often a sign you’d be better off writing to Parquet and reading it back, but DISK_ONLY is a quick alternative for interactive work.

MEMORY_AND_DISK_SER when:

  • Your dataset is wide (many columns) and full deserialized objects use a lot of heap. Serialized form is typically 2-5x smaller. Pay deserialization on every read in exchange for fitting more into memory.
  • You’re seeing high GC time in the Spark UI and the cache is the suspect.

*_2 replication variants when:

  • Cache rebuild is expensive (think: cached the result of a 30-minute join), and the cluster is preemptible / spot instances where executors die. The 2x storage cost buys you resilience against single-executor failure.

The rest (MEMORY_ONLY, MEMORY_ONLY_SER, OFF_HEAP) are rarely the right answer in a modern Spark deployment. If you’re tempted to use MEMORY_ONLY because you read it in a tutorial, resist — the post-Spark-1.x default is correctly MEMORY_AND_DISK and you should let it do its job.

How to verify the cache is working

Two places to look:

1. Spark UI → Storage tab. Lists every cached DataFrame, with size in memory, size on disk, replication count, and the storage level. If your “cached” DataFrame doesn’t appear here, no action has materialized it yet.

2. Explain plan. Look for InMemoryTableScan (or InMemoryRelation) in the physical plan:

prepared.cache()
prepared.count()        # triggers caching
prepared.explain()

The plan will include InMemoryTableScan [...] near the top, indicating Spark is serving from cache instead of re-reading the source. If you don’t see it, the cache isn’t being used (could be a non-deterministic transformation, could be that you’re operating on a different DataFrame object than the one you cached — yes, that happens).

A common subtle bug

df = spark.read.parquet("orders/")
df.cache()
df.count()

# Later in the code...
df = df.filter(col("year") == 2026)   # reassigns df

df.write.parquet("out/")              # NOT served from cache

Caching is bound to a DataFrame object, not to a name. The moment you reassign df, the new DataFrame is uncached. The original (still in memory) is now an orphan that nothing references. It will sit there until LRU eviction kicks it out.

The fix:

raw = spark.read.parquet("orders/").cache()
raw.count()

filtered = raw.filter(col("year") == 2026)   # different name; raw is still cached
filtered.write.parquet("out/")

Use distinct names for distinct DataFrames. Don’t shadow the variable you cached.

Cache vs checkpoint: a quick aside

Adjacent concept that sometimes gets confused with caching: checkpointing.

spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoints")

df = expensive_pipeline()
df = df.checkpoint()    # eager by default

Checkpointing materializes the DataFrame to reliable storage (HDFS, S3) and truncates the lineage — meaning the resulting DataFrame’s plan starts from the checkpoint, not from the original source. Caching keeps the lineage; it just memoizes the result. If a cached partition is lost, Spark recomputes it from lineage. If a checkpointed partition is lost, Spark reads it back from durable storage.

When to use checkpoint instead of cache:

  • Iterative algorithms where the lineage grows so long that even planning a new iteration becomes slow (some graph algorithms).
  • Streaming jobs that need a stable recovery point.

For day-to-day batch ETL, cache is what you want. We’ll revisit checkpoint in the streaming module much later. Mentioning it here so you don’t reach for it accidentally; it solves a different problem.

Run this on your own machine

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand
from pyspark.storagelevel import StorageLevel
import time

spark = (
    SparkSession.builder
        .appName("cache-demo")
        .config("spark.sql.shuffle.partitions", "8")
        .getOrCreate()
)

# Build something deliberately expensive: a join + heavy compute
big = (
    spark.range(0, 5_000_000, numPartitions=32)
        .withColumn("k", (col("id") % 1000).cast("int"))
        .withColumn("v", rand() * 1000)
)
small = spark.range(0, 1000).withColumnRenamed("id", "k")

joined = (
    big.join(small, "k")
       .withColumn("v2", col("v") * 1.21)
       .withColumn("v3", col("v") + col("v2"))
)

# === Without cache ===
t0 = time.time()
joined.count()
joined.groupBy("k").sum("v3").count()
print(f"Without cache: {time.time() - t0:.2f}s")

# === With cache ===
joined.cache()
joined.count()    # warm the cache

t0 = time.time()
joined.count()
joined.groupBy("k").sum("v3").count()
print(f"With cache:    {time.time() - t0:.2f}s")

# Look at the Storage tab while the script is paused
input("Press Enter to release cache and exit... ")
joined.unpersist()
spark.stop()

You should see the cached run finish in a small fraction of the cold run’s time. Open the Storage tab during the pause and observe the cached DataFrame’s size and storage level.

That’s the productive side of caching. Next lesson — the existing post we’re keeping in this slot, just reworked — covers the dark side: when caching makes things worse. Memory pressure, eviction churn, the ten-line script that gets faster when you remove the .cache() line. Read it next; it’s the necessary counterweight to today.

Search