PySpark, de la zero Lecția 31 / 60

Ce este o partitie, fizic

Partitii in memorie, partitii pe disc si relatia dintre partitii si task-uri.

Aruncăm cu cuvântul „partiție” de tot cursul. E timpul să încetinim și să ne asigurăm că înseamnă același lucru în mintea ta ca în a mea, fiindcă există două sensuri ale cuvântului, sunt înrudite dar distincte și confundarea lor e sursa a jumătate dintre bug-urile de partiționare din sălbăticie.

Lecția asta e despre ce este o partiție, fizic, în memorie și pe disc, și care e relația ei cu unitatea de execuție pe care Spark chiar o rulează, care e un task. Odată ce imaginea asta e clară, lecția următoare (butonul spark.sql.shuffle.partitions și de ce default-ul e greșit) capătă sens evident.

Partiția runtime

O partiție, în sens runtime, e bucata de date pe care Spark o procesează ca o singură unitate de muncă paralelă. O partiție. Un task. Un core. La un moment dat.

Asta e identitatea de internalizat:

1 partiție -> 1 task -> 1 core, pe durata task-ului.

Dacă DataFrame-ul tău are 1000 de partiții și declanșezi un stage care îl procesează, Spark generează 1000 de task-uri. Dacă cluster-ul tău are 200 de core-uri disponibile, acele 1000 de task-uri rulează în 5 valuri de câte 200 fiecare; la un moment dat, 200 de task-uri rulează în paralel, restul sunt în coadă. Dacă cluster-ul tău are 2000 de core-uri, toate cele 1000 de task-uri rulează simultan și 1000 de core-uri stau degeaba. Nu poți obține mai mult paralelism decât numărul de partiții.

În interiorul unui executor, o partiție e doar un subset al rândurilor DataFrame-ului, ținut în memorie (sau dat la disk dacă memoria e strâmtă). Fiecare task își procesează partiția rând cu rând, aplică filtrele, transformările, agregările, orice spune planul stage-ului, și își scrie output-ul, fie în fișierele de shuffle ale stage-ului următor, fie în orice sink a țintit acțiunea.

Partițiile unui DataFrame nu trăiesc nicăieri central; trăiesc distribuite peste executor-i. Asta e tot scopul procesării distribuite.

Partiția pe disc

Cealaltă utilizare a cuvântului „partiție” e pe disc: un director într-o scriere partiționată.

/data/transactions/
  year=2024/
    month=01/
      part-00000.parquet
      part-00001.parquet
      ...
    month=02/
      ...
  year=2025/
    month=01/
      ...

Când scrii df.write.partitionBy("year", "month").parquet(...), Spark creează un director pentru fiecare combinație distinctă de valori ale coloanelor de partiționare. La citire, Spark folosește numele acelor directoare ca să facă partition pruning: dacă filtrezi WHERE year = 2024, Spark deschide doar fișierele de sub year=2024/ și le sare pe celelalte cu totul.

Aceste partiții pe disc sunt diferite de partițiile runtime. O singură partiție pe disc (un director year=2024/month=03/) poate conține multe fișiere; citirea ei produce multe partiții runtime. Invers, o partiție runtime poate scrie în mai multe partiții pe disc (dacă datele dinăuntrul ei acoperă mai multe valori year/month).

Cele două sensuri sunt înrudite, ambele țin de bucățirea datelor, dar sunt concepte separate care trăiesc pe straturi diferite. Când cineva spune „job-ul ăsta are prea multe partiții,” întreabă care fel. Partițiile runtime afectează paralelismul și costul shuffle-ului. Partițiile pe disc afectează pruning-ul la citire și aglomerarea de directoare. Soluțiile diferă.

Pentru restul lecției, și majoritatea modulului 6, „partiție” înseamnă sensul runtime dacă nu se notează explicit altfel.

Task-uri pe stage = partițiile input-ului

Cea mai simplă și mai utilă identitate din munca de performanță Spark:

Numărul de task-uri dintr-un stage e egal cu numărul de partiții ale input-ului acelui stage.

Un stage începe la o graniță de shuffle (sau la o citire) și se termină la următorul shuffle (sau la acțiune). În interiorul unui stage, fiecare task procesează o partiție din input-ul stage-ului prin planul stage-ului, cap-coadă. Deci:

  • O citire a 800 de fișiere input (cu o partiție per fișier) creează un stage cu 800 de task-uri.
  • Un output de shuffle dintr-un groupBy cu 200 de partiții creează un stage downstream cu 200 de task-uri.
  • Un repartition(50) pe un DataFrame face ca stage-ul următor să aibă 50 de task-uri.

De aia oamenii devin obsedați de numărul de partiții. Numărul de partiții este paralelismul. Prea puține partiții și cluster-ul e subutilizat; prea multe și pierzi timp programând task-uri minuscule.

Inspectarea partițiilor

Nu trebuie să ghicești. Două moduri ieftine de a te uita la ce sunt de fapt partițiile unui DataFrame:

Câte partiții:

df.rdd.getNumPartitions()
# 200

Ăsta e numărul de partiții runtime, cel care determină numărul de task-uri. Funcționează pe orice DataFrame. Ieftin.

Cât de mare e fiecare partiție:

from pyspark.sql import functions as F

(df.withColumn("pid", F.spark_partition_id())
   .groupBy("pid")
   .count()
   .orderBy("pid")
   .show(50))

# +---+------+
# |pid| count|
# +---+------+
# |  0| 50012|
# |  1| 49873|
# |  2| 49991|
# ...
# |199| 50104|
# +---+------+

spark_partition_id() întoarce ID-ul întreg al partiției în care trăiește acum fiecare rând. Faci group by după el, numeri, te uiți la distribuție. Dacă fiecare partiție are aproximativ același număr de rânduri, datele sunt echilibrate. Dacă o partiție are 10x mediana, ai skew (lecția 28) și shuffle-ul următor o să doară.

Ăsta e același diagnostic din lecțiile de skew, dar acum înțelegi exact ce-ți spune: numărul de rânduri per unitate de muncă paralelă. O partiție cu 10 milioane de rânduri e un task care procesează 10 milioane de rânduri. O partiție cu 100 de rânduri e un task care procesează 100 de rânduri. Cluster-ului nu-i pasă câte partiții sunt în total; îi pasă de cea mai lentă.

Un follow-up util: mărimea în bytes, nu doar numărul de rânduri. Numărul de rânduri poate fi înșelător; rândurile late cu array-uri și struct-uri pot fi de 100x mai grele decât rânduri înguste de întregi. Un mod aproximativ de a verifica, când suspectezi că dezechilibrul de partiții e mai mult decât doar numere de rânduri:

def partition_size_bytes(df):
    """Per-partition byte size estimate via the RDD interface."""
    return (df.rdd
              .mapPartitionsWithIndex(
                  lambda idx, it: [(idx, sum(len(str(r)) for r in it))])
              .collect())

for pid, nbytes in sorted(partition_size_bytes(df))[:5]:
    print(f"partition {pid}: {nbytes:,} bytes (string-len approximation)")

Nu e perfect; str(row) supranumără față de mărimea serializată, dar raporturile dintre partiții îți spun ce trebuie să știi. Dacă partiția 0 e de 50x mărimea partiției 1, ai skew pe care numărul de rânduri singur poate să nu-l fi arătat.

Numerele de partiții implicite

De unde vin numerele de partiții la început? Trei surse principale:

La citire. Când citești o sursă bazată pe fișiere, Parquet, CSV, JSON, ORC, Spark creează o partiție per fișier ca default, cu câteva nuanțe:

  • Pentru HDFS și store-uri similare cu HDFS, fișierele foarte mari sunt împărțite după mărimea blocului HDFS (de obicei 128 MB sau 256 MB). O partiție per bloc.
  • Pentru S3 și object store-uri similare, Spark folosește spark.sql.files.maxPartitionBytes (default 128 MB) ca să spargă fișierele mari.
  • Fișierele minuscule sunt uneori coalesced via spark.sql.files.openCostInBytes ca să se evite overhead-ul de a deschide multe fișiere mici degeaba.

Concluzia practică: dacă citești un director cu 800 de fișiere Parquet, așteaptă-te la aproximativ 800 de partiții. Dacă acele fișiere sunt uriașe, așteaptă-te la mai multe. Dacă sunt minuscule, așteaptă-te la mai puține.

După un shuffle. Fiecare output de shuffle e repartiționat conform spark.sql.shuffle.partitions (default 200). Group-by, join, distinct, window, orice necesită un shuffle, produce 200 de partiții pe partea cealaltă, indiferent de mărimea input-ului. Default-ul ăsta e aproape mereu greșit, exact despre asta e lecția 32.

Repartition explicit. df.repartition(N) și df.coalesce(N) setează numărul de partiții explicit. Le acoperim în detaliu în lecțiile 33 și 34, dar deocamdată: repartition reshuffluiește la exact N partiții, coalesce îmbină partițiile existente fără un shuffle complet.

Mai există și o sursă indirectă: orice operație care păstrează partiționarea (un filter, un select, un withColumn care nu shuffluiește) păstrează numărul de partiții pe care l-a moștenit. Deci dacă citești 800 de fișiere, filtrezi 95% din rânduri și apoi faci group-by, pasul de filter are tot 800 de partiții, majoritatea minuscule, până când shuffle-ul de group-by schimbă lucrurile. Sună irosit și uneori e, de aia modulul 6 are o lecție întreagă despre coalesce exact pentru cazul ăsta.

O imagine mentală

Dacă vrei o singură imagine de ținut în cap, gândește-te la partiții ca la felii de pizza.

  • Pizza e DataFrame-ul tău.
  • Feliile sunt partițiile.
  • Oamenii așezați la masă sunt executor-ii.
  • Fiecare felie e mâncată de o singură persoană la un moment dat, o îmbucătură pe rând, asta e task-ul.
  • O pizza cu 200 de felii și 8 oameni: fiecare persoană mănâncă 25 de felii, una câte una. Masa e gata când termină toți.
  • O pizza cu o felie uriașă și 199 minuscule: 199 de oameni își termină feliile minuscule în secunde; o persoană încă mestecă felia uriașă 30 de minute mai târziu. Ăla e skew.
  • O pizza tăiată în 4 felii pentru 200 de oameni: 196 dintre ei n-au ce face. Asta e underpartitioning.
  • O pizza tăiată în 10000 de felii microscopice: toți petrec mai mult timp ridicând felii decât mâncând. Asta e overpartitioning, iar overhead-ul e real, fiecare task are cost de programare, fiecare task scrie propriul fișier de shuffle, fiecare task e o mică povară de bookkeeping pe driver.

Tot jocul e: tai pizza în felii aproximativ egale, de o mărime care ține fiecare mâncător ocupat fără ca nimeni să se înece și fără prea mult overhead per felie. Mărimea „corectă” a feliei depinde de workload-ul tău, cluster-ul tău și datele tale; lecția următoare e despre cum o alegi.

De ce contează asta acum

Modulul 6 va intra în butoanele și tiparele de partiționare: default-ul partițiilor de shuffle, repartition vs. coalesce, partiționare la scriere, bucketing. Nimic din astea nu are sens fără imaginea din lecția asta. Înainte să tunăm ceva:

  • O partiție e bucata de date pe care Spark o procesează ca o unitate.
  • O partiție = un task = un core, la un moment dat.
  • Task-uri pe stage = partițiile input-ului acelui stage.
  • Partiții implicite la citire ~ numărul de fișiere input (sau bucăți de mărime de bloc pentru fișiere mari).
  • Partiții implicite după shuffle = spark.sql.shuffle.partitions (200, aproape mereu greșit).
  • Partițiile pe disc sunt un concept separat, directoare într-o scriere partiționată, deși cuvântul e același.

Internalizează asta și restul modulului 6 sunt doar consecințe. Lecția 32 intră direct în cel mai consecvent default din Spark și arată cum să-l setezi corect pentru workload-ul tău.


Referințe: documentația Apache Spark despre partiționarea RDD-urilor și modelul de execuție SQL; ghidul de tuning Databricks despre partiții și paralelism. Consultat 2026-05-01.

Caută