PySpark, de la zero Lecția 23 / 60

Caching si persistence: storage levels, cand are sens fiecare

df.cache() si df.persist(), ce fac de fapt, storage levels-urile pe care le ofera Spark si pattern-urile tipice in care caching-ul da roade.

Lecția 22 a acoperit cum împarte Spark job-ul tău în stages și tasks. Un lucru pe care modelul DAG îl implică, dar nu îl face evident: fiecare acțiune recalculează tot graful de la sursă. Dacă apelezi .count() și apoi .write.parquet() pe același DataFrame, Spark citește sursa de două ori, rulează toate filtrele de două ori, rulează toate join-urile de două ori. Spark e lazy by default și uitat by default.

Caching e pârghia pe care o tragi când urmează să folosești un DataFrame de mai multe ori. Îi spune lui Spark „data viitoare când mă calculezi, ține minte rezultatul; mă întorc”. Folosit bine, taie timpul de iterație pe job-uri cu output-uri multiple cu ordine de mărime. Folosit prost, irosește memorie și încetinește lucrurile. Lecția de azi e versiunea folosită bine. Lecția de mâine (24) e versiunea folosită prost.

.cache() vs .persist()

Cele două metode de pe fiecare DataFrame:

df.cache()                                    # shorthand
df.persist()                                  # same as .cache() with default level
df.persist(StorageLevel.MEMORY_AND_DISK)      # explicit, identical to above
df.persist(StorageLevel.DISK_ONLY)            # different storage level

.cache() e exact echivalent cu .persist(StorageLevel.MEMORY_AND_DISK). Atât. Fără magie, fără diferență. Folosește .cache() pentru cazul comun; recurge la .persist(...) când ai nevoie de un storage level diferit de default.

Mină istorică pe care merită s-o cunoști: în Spark 1.x, .cache() avea default MEMORY_ONLY. Dacă DataFrame-ul tău nu încăpea în memorie, părți din el erau pur și simplu aruncate, iar Spark recalcula acele părți la cerere, în liniște lent. Spark 2.0 a schimbat default-ul la MEMORY_AND_DISK, care face spill cu surplusul pe discul local în loc să-l arunce. Mult mai prietenos. Dacă citești postări vechi de blog Spark care avertizează despre „DataFrame-uri cached care dispar”, aceea e lumea pe care o descriu. Nu mai trăim în ea.

Storage levels-urile

StorageLevel trăiește în pyspark.storagelevel. Cele șapte niveluri pe care le vei întâlni:

  • MEMORY_ONLY, stochează obiecte JVM deserializate în heap-ul executor-ului. Acces rapid, cea mai mare amprentă de memorie. Dacă nu încape, partițiile sunt aruncate și recalculate la utilizare. Rar alegerea corectă astăzi.
  • MEMORY_AND_DISK (default pentru .cache()), încearcă întâi memoria; restul îl scrie pe disc local. Alegerea „fără surprize”. Folosește asta pentru aproape toate caching-urile.
  • MEMORY_ONLY_SER, stochează bytes serializați (Kryo sau Java). Mai mic în memorie decât MEMORY_ONLY, dar fiecare citire plătește un cost de deserializare. De nișă.
  • MEMORY_AND_DISK_SER, ca mai sus, dar face spill cu bytes serializați și pe disc. Folosit când presiunea pe memorie e severă și poți plăti taxa de deserializare pentru mai puțin RAM.
  • DISK_ONLY, direct pe disc local, fără copie în memorie. Util pentru DataFrame-uri genuin uriașe care nu încap în memoria clusterului dar costă mult de recalculat (de exemplu, un join greu pe care îl vei reutiliza).
  • OFF_HEAP, stochează în afara heap-ului JVM. Inițial pentru integrare cu Tachyon/Alluxio. Rar în PySpark modern.
  • Variantele _2, fiecare nivel de mai sus are o aromă _2 (MEMORY_AND_DISK_2, DISK_ONLY_2 etc.) care replică fiecare partiție cached pe doi executors. Util dacă ai o sesiune interactivă de lungă durată și moartea unui executor ar fi foarte scumpă de recuperat.

Dacă nu ai un motiv puternic să alegi altceva, MEMORY_AND_DISK e răspunsul. E ce-ți dă .cache(). Mergi mai departe.

Caching-ul e el însuși lazy

Iată detaliul care prinde pe toată lumea prima oară:

df = spark.read.parquet("big_input/")
df.cache()           # marks df for caching... but does NOTHING yet
df.filter(...).count()   # NOW df gets read AND cached

Apelul .cache() nu declanșează computație. Doar înregistrează un steag: „data viitoare când mă calculezi, ține minte rezultatul”. Materializarea efectivă se întâmplă când o acțiune rulează împotriva DataFrame-ului.

Asta are o consecință subtilă. Dacă „acțiunea ta următoare” atinge doar o parte din DataFrame, doar acea parte ar putea ajunge cached. De exemplu:

df = spark.read.parquet("orders/").cache()

# This action only computes the first few rows
df.show()

# This action needs the rest — and might trigger a re-read
df.count()

.show() materializează doar câteva partiții (oricâte sunt necesare ca să umple display-ul). Restul DataFrame-ului nu e cached încă. Când rulează .count(), Spark citește partițiile lipsă din sursă.

Idiomul canonic „forțează cache-ul să se populeze complet acum”:

df.cache()
df.count()    # actions on the full DataFrame; populates cache end-to-end

Vei vedea asta constant în cod de producție. Nu e leneș cu lăcomie, e un pattern deliberat „încălzește cache-ul înainte să înceapă bucla”.

Când dă roade caching-ul

Trei pattern-uri tipice în care caching-ul e mișcarea corectă.

1. Acțiuni multiple pe același DataFrame

prepared = (
    spark.read.parquet("orders/")
        .filter(col("year") == 2026)
        .join(broadcast(customers), "customer_id")
        .withColumn("revenue_eur", col("total") * col("eur_rate"))
)
prepared.cache()

prepared.write.parquet("out/by_country/", partitionBy="country")
prepared.write.parquet("out/by_status/", partitionBy="status")
prepared.groupBy("country").sum("revenue_eur").write.parquet("out/summary/")

Fără .cache(), acel pipeline mare de read-filter-join-withColumn rulează de trei ori. Cu el, o dată. Pe un dataset real, asta e diferența dintre un job de 9 minute și unul de 3 minute.

2. Algoritmi iterativi

Antrenare ML, parcurgerea grafurilor, orice face buclă peste aceleași date:

training_set = featurize(raw).cache()
training_set.count()    # warm the cache

for epoch in range(20):
    model = model.update(training_set)

Fiecare iterație citește training_set o dată. Fără cache, fiecare iterație recalculează featurize(raw) de la zero. Douăzeci de epoci = douăzeci de recalculări inutile.

3. Explorare interactivă în notebook

Cotrobăi printr-un DataFrame complex, rulând 15 query-uri diferite împotriva lui. Cache-uiește-l o dată în partea de sus a notebook-ului, apoi explorează liber. Doar amintește-ți să-i dai .unpersist() (sau să restartezi kernel-ul) când treci mai departe, altfel stă în memoria clusterului pentru totdeauna.

Unpersist: mișcarea de producție

Eviction-ul LRU va elimina eventual DataFrame-urile cached vechi, dar „eventual” poate însemna „după ce au irosit memorie pentru tot job-ul”. În cod de producție, eliberează cache-ul explicit:

prepared = expensive_pipeline().cache()
prepared.count()

# ... use prepared in 3 different writes ...

prepared.unpersist()   # free the memory; we're done with it

Important mai ales în interiorul aplicațiilor Spark de lungă durată (notebooks, job-uri de structured streaming, batch job-uri programate care împart o SparkSession între task-uri). Unpersist ce nu mai ai nevoie. Memoria e o resursă; tratează-o ca pe una.

Alegerea unui storage level diferit de default cu intenție

Default-ul, MEMORY_AND_DISK, e corect aproape întotdeauna. Iată cele câteva cazuri în care e justificat să recurgi la altceva.

DISK_ONLY când:

  • DataFrame-ul tău e mult mai mare decât memoria totală a executors și îl vei reutiliza intens. Caching-ul în memorie ar face doar thrashing; disk-only evită asta și datele sunt citite secvențial când e nevoie.
  • Vrei să eliberezi memorie pentru computația efectivă (joins, agregări) și ai nevoie doar de un artefact stabil pe disc. Adesea un semn că ai face mai bine să scrii în Parquet și să citești înapoi, dar DISK_ONLY e o alternativă rapidă pentru lucrul interactiv.

MEMORY_AND_DISK_SER când:

  • Dataset-ul tău e lat (multe coloane) și obiectele complet deserializate folosesc mult heap. Forma serializată e tipic de 2-5x mai mică. Plătește deserializare la fiecare citire în schimbul a mai mult în memorie.
  • Vezi GC time mare în Spark UI și cache-ul e suspectul.

Variantele cu replicare *_2 când:

  • Reconstruirea cache-ului e scumpă (gândește-te: cached rezultatul unui join de 30 de minute) și clusterul e preemptible / spot instances unde executors mor. Costul de stocare 2x îți cumpără reziliență împotriva eșecului unui singur executor.

Restul (MEMORY_ONLY, MEMORY_ONLY_SER, OFF_HEAP) sunt rar răspunsul corect într-un deployment Spark modern. Dacă ești tentat să folosești MEMORY_ONLY pentru că ai citit despre el într-un tutorial, rezistă, default-ul post-Spark-1.x e corect MEMORY_AND_DISK și ar trebui să-l lași să-și facă treaba.

Cum verifici că funcționează cache-ul

Două locuri unde să te uiți:

1. Spark UI → tab-ul Storage. Listează fiecare DataFrame cached, cu dimensiunea în memorie, dimensiunea pe disc, count-ul de replicare și storage level-ul. Dacă DataFrame-ul tău „cached” nu apare aici, nicio acțiune nu l-a materializat încă.

2. Explain plan. Caută InMemoryTableScan (sau InMemoryRelation) în planul fizic:

prepared.cache()
prepared.count()        # triggers caching
prepared.explain()

Planul va include InMemoryTableScan [...] lângă vârf, indicând că Spark servește din cache în loc să recitească sursa. Dacă nu îl vezi, cache-ul nu e folosit (ar putea fi o transformare nedeterministă, ar putea fi că operezi pe un obiect DataFrame diferit de cel pe care l-ai cache-uit, da, se întâmplă).

Un bug subtil comun

df = spark.read.parquet("orders/")
df.cache()
df.count()

# Later in the code...
df = df.filter(col("year") == 2026)   # reassigns df

df.write.parquet("out/")              # NOT served from cache

Caching-ul e legat de un obiect DataFrame, nu de un nume. În momentul în care reasignezi df, noul DataFrame e uncached. Originalul (încă în memorie) e acum un orfan pe care nimic nu îl referențiază. Va sta acolo până când îl scoate eviction-ul LRU.

Soluția:

raw = spark.read.parquet("orders/").cache()
raw.count()

filtered = raw.filter(col("year") == 2026)   # different name; raw is still cached
filtered.write.parquet("out/")

Folosește nume distincte pentru DataFrame-uri distincte. Nu umbri variabila pe care ai cache-uit-o.

Cache vs checkpoint: o paranteză rapidă

Concept adiacent care uneori se confundă cu caching-ul: checkpointing.

spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoints")

df = expensive_pipeline()
df = df.checkpoint()    # eager by default

Checkpointing-ul materializează DataFrame-ul într-un storage de încredere (HDFS, S3) și trunchiază lineage-ul, adică planul DataFrame-ului rezultat începe de la checkpoint, nu de la sursa originală. Caching-ul păstrează lineage-ul; doar memoizează rezultatul. Dacă o partiție cached e pierdută, Spark o recalculează din lineage. Dacă o partiție checkpointed e pierdută, Spark o citește înapoi din storage durabil.

Când să folosești checkpoint în loc de cache:

  • Algoritmi iterativi unde lineage-ul crește atât de lung încât chiar și planificarea unei iterații noi devine lentă (unii algoritmi de grafuri).
  • Job-uri de streaming care au nevoie de un punct stabil de recovery.

Pentru ETL batch zi de zi, cache e ce vrei. Vom revedea checkpoint în modulul de streaming mult mai târziu. Îl menționez aici ca să nu recurgi la el din greșeală; rezolvă o problemă diferită.

Rulează asta pe propria mașină

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand
from pyspark.storagelevel import StorageLevel
import time

spark = (
    SparkSession.builder
        .appName("cache-demo")
        .config("spark.sql.shuffle.partitions", "8")
        .getOrCreate()
)

# Build something deliberately expensive: a join + heavy compute
big = (
    spark.range(0, 5_000_000, numPartitions=32)
        .withColumn("k", (col("id") % 1000).cast("int"))
        .withColumn("v", rand() * 1000)
)
small = spark.range(0, 1000).withColumnRenamed("id", "k")

joined = (
    big.join(small, "k")
       .withColumn("v2", col("v") * 1.21)
       .withColumn("v3", col("v") + col("v2"))
)

# === Without cache ===
t0 = time.time()
joined.count()
joined.groupBy("k").sum("v3").count()
print(f"Without cache: {time.time() - t0:.2f}s")

# === With cache ===
joined.cache()
joined.count()    # warm the cache

t0 = time.time()
joined.count()
joined.groupBy("k").sum("v3").count()
print(f"With cache:    {time.time() - t0:.2f}s")

# Look at the Storage tab while the script is paused
input("Press Enter to release cache and exit... ")
joined.unpersist()
spark.stop()

Ar trebui să vezi rularea cu cache terminându-se într-o mică fracțiune din timpul rulării reci. Deschide tab-ul Storage în timpul pauzei și observă dimensiunea și storage level-ul DataFrame-ului cached.

Aceasta e partea productivă a caching-ului. Lecția următoare, postarea existentă pe care o păstrăm în acest slot, doar reprelucrată, acoperă partea întunecată: când caching-ul face lucrurile mai rele. Presiune pe memorie, churn de eviction, scriptul de zece linii care devine mai rapid când scoți linia .cache(). Citește-o următoarea; e contraponderea necesară pentru cea de azi.

Caută