Job-ul tău Spark durează 3 ore. Dublezi dimensiunea clusterului. Acum durează 2 ore și 50 de minute. Ce s-a întâmplat? Aproape sigur, partiționarea.
Ce sunt de fapt partițiile
Un DataFrame Spark nu e un singur tabel mare. E tăiat în partiții — bucăți de date distribuite pe executoare. Fiecare partiție e procesată independent, în paralel. Dacă ai 200 de partiții și 20 de executoare, fiecare executor gestionează ~10 partiții, una câte una.
Toată promisiunea Spark-ului — „procesează date în paralel” — se reduce la cât de bine sunt distribuite datele tale pe partiții. Dacă o partiție are 10 milioane de rânduri iar celelalte au câte 1.000, felicitări: un executor face toată munca în timp ce 19 stau degeaba. Asta se numește data skew (dezechilibru de date), și e cea mai frecventă problemă de performanță în Spark.
Valoarea implicită e aproape mereu greșită
Numărul implicit de partiții în Spark vine din una din acestea:
spark.sql.shuffle.partitions— implicit 200 pentru orice operație care implică shuffle (join-uri, group by etc.)- Numărul de fișiere sau blocuri din sursa de date (la citire)
200 de partiții era o valoare rezonabilă în 2014 când clusterele aveau 50 de core-uri. Azi, cu mii de core-uri și terabytes de date, 200 e aproape mereu greșit — de obicei prea puține, ducând la partiții prea mari care creează presiune pe memorie, sau ocazional prea multe, ducând la partiții minuscule cu overhead mare de planificare.
Cum verifici partițiile
df.rdd.getNumPartitions()
Mai util — verifică dimensiunile partițiilor:
from pyspark.sql.functions import spark_partition_id, count
df.groupBy(spark_partition_id().alias("partition_id")) \
.agg(count("*").alias("row_count")) \
.orderBy("row_count", ascending=False) \
.show(10)
Dacă partiția maximă e de 100× mai mare decât cea minimă, ai o problemă de skew. Dacă fiecare partiție are 50 de rânduri, ai prea multe partiții.
repartition() vs coalesce()
Acestea sunt cele două instrumente pentru a corecta numărul de partiții:
repartition(n) — shuffle complet. Fiecare rând e reatribuit la una din cele n partiții noi. Scump (mută date prin rețea) dar îți dă partiții de dimensiuni egale.
df = df.repartition(500) # 500 de partiții aproximativ egale
coalesce(n) — unește partiții fără un shuffle complet. Poate doar reduce numărul, niciodată nu-l crește. Ieftin dar poate crea partiții inegale.
df = df.coalesce(50) # reduce de la 200 la 50
Regula de bază
- Mergi în sus cu numărul de partiții? →
repartition() - Mergi în jos (de ex. înainte de a scrie fișiere de output)? →
coalesce() - Date dezechilibrate după un join? →
repartition()pe coloana dezechilibrată, sau folosește salting
Ghid de dimensionare a partițiilor
Nu există un răspuns universal corect, dar regulile astea funcționează în majoritatea job-urilor reale:
- Țintește 128–256 MB per partiție. Asta echilibrează paralelismul cu overhead-ul.
- Țintește 2–4 partiții per core. Cu 100 de core-uri, țintește 200–400 de partiții.
- După o transformare largă (join, groupBy), verifică partițiile. Cele 200 implicite s-ar putea să nu mai fie corecte.
- Înainte de a scrie pe disc, fă coalesce. Scrierea a 200 de partiții în Parquet îți dă 200 de fișiere minuscule. Cei care citesc downstream te vor blestema. Fă coalesce la un număr care produce fișiere de 128–512 MB.
Partiționare pe coloană (pentru scriere)
df.write.partitionBy("year", "month").parquet("s3://bucket/data/")
Asta creează o structură de directoare de tipul year=2025/month=03/part-00001.parquet. Query-urile downstream care filtrează pe an și lună pot sări complet peste folderele irelevante — partition pruning. Îmbunătățirea e adesea de 10–100× pe seturi mari de date.
Alege coloane de partiție care:
- Sunt folosite frecvent în clauze WHERE
- Au cardinalitate mică spre medie (an, țară, status — nu user_id)
- Nu creează milioane de directoare minuscule
Soluția de urgență pentru skew: salting
Când o cheie de join domină (să zicem, 80% din comenzi vin de la un singur client), datele acelei chei ajung într-o singură partiție și totul se blochează. Soluția: adaugă o coloană de „salt” aleatorie, fă join pe salt + cheie, și combină rezultatele.
from pyspark.sql.functions import lit, rand, floor, col, concat
salt_buckets = 10
# Adaugă salt la tabelul dezechilibrat (cel mare)
large = large.withColumn("salt", floor(rand() * salt_buckets).cast("int"))
# Explodează tabelul mic ca să corespundă tuturor valorilor de salt
from pyspark.sql.functions import explode, array
small = small.withColumn("salt", explode(array([lit(i) for i in range(salt_buckets)])))
# Join pe cheia originală + salt
result = large.join(small, ["join_key", "salt"]).drop("salt")
Urât? Da. Eficient? Dramatic. Dacă dai regulat de skew, merită să memorezi pattern-ul ăsta.
Versiunea într-o propoziție
Majoritatea problemelor de performanță în Spark sunt probleme de partiționare. Înainte să adaugi mai mult hardware, verifică numărul și dimensiunile partițiilor — soluția e de obicei o singură linie de cod.