Lecția trecută am tras date din Postgres în Spark. Acum mergem invers: un DataFrame plin cu rezultate calculate și o bază de date relațională care așteaptă să le primească. Echipa de dashboard-uri vrea agregatele în Postgres-ul lor de raportare. Echipa de aplicație vrea îmbogățirile înapoi în MySQL ca să le servească API-ul. Indiferent de motiv, ai nevoie de df.write.format("jdbc").
Vestea bună: API-ul oglindește API-ul de citire. Vestea proastă: modurile de eșec sunt mai grave. O citire proastă pierde timp. O scriere proastă corupe date, iar coruperea datelor e genul de greșeală care îți rămâne în evaluarea de performanță multă vreme.
Lecția asta e manualul „sigur și rapid” pentru scriere către surse JDBC.
Apelul de bază
(df.write
.format("jdbc")
.option("url", "jdbc:postgresql://db.example.com:5432/reports")
.option("dbtable", "public.daily_summary")
.option("user", "spark_writer")
.option("password", os.environ["DB_PASSWORD"])
.option("driver", "org.postgresql.Driver")
.mode("append")
.save())
Același URL, același dbtable, același driver, aceeași poveste de autentificare pe care am acoperit-o în lecția 45. Piesa nouă este .mode(...), modul de salvare, plus o mică constelație de opțiuni specifice scrierii care schimbă totul în privința modului în care rulează asta.
Paralelismul este implicit (și asta e capcana)
O citire fără partitionColumn rulează ca un singur task. O scriere rulează cu câte partiții are DataFrame-ul. Dacă df.rdd.getNumPartitions() returnează 1000, Spark deschide 1000 de conexiuni la baza ta de date, fiecare rulând INSERT-uri în paralel.
Sună grozav până îți amintești că Postgres-ul tău are max_connections = 100 și că aplicația vrea și ea să-l folosească. Scrierea ta nu eșuează curat, eșuează la jumătate, ținând deschise 100 de conexiuni, cu celelalte 900 de partiții blocate să aștepte într-un pool, în timp ce aplicația începe să arunce erori de conexiune.
Așa că primul buton, de fiecare dată, înainte să te gândești măcar la moduri de salvare:
df_to_write = df.coalesce(16) # sau .repartition(16) daca ai nevoie de un shuffle
O valoare implicită rezonabilă pentru o țintă de scriere este 16 până la 32 de partiții, plafonată de cât poate tolera baza de date. DBA-ul îți va spune bugetul de conexiuni; nu îl depăși. Spark nu face mișcarea politicoasă aici. Va încerca bucuros 1000 de conexiuni simultane și va privi lumea arzând.
Notă: coalesce e ieftin (fără shuffle) dar reduce doar partițiile. Dacă DataFrame-ul tău are prea puține partiții și sunt înclinate (skewed), poate vrei repartition(N, key) ca să redistribui sarcina, acceptând costul shuffle-ului.
batchsize: câte rânduri pe round-trip
Implicit, driverul JDBC grupează insert-urile în batch-uri de 1000 de rânduri și le trimite ca un singur apel executeBatch. Opțiunea:
.option("batchsize", 5000)
Batch-uri mai mari înseamnă mai puține round-trip-uri în rețea și mai puține commit-uri de tranzacție pe rând, ceea ce într-un job greu pe scriere e diferența între minute și ore. Compromisurile:
- Memorie: fiecare batch buferizează
batchsizerânduri pe executor înainte de flush. 5000 de rânduri de date subțiri nu înseamnă nimic; 5000 de rânduri de coloane JSON late pot duce un task la OOM. - Durata blocării: un singur batch rulează de obicei într-o singură tranzacție. Batch mai mare = tranzacție mai lungă = blocări ținute mai mult pe tabela destinație.
- Granularitatea eșecului: dacă un batch eșuează la rândul 4500, întregul batch face rollback. Cu
batchsize=10000, înseamnă 10000 de rânduri pe care trebuie să le refaci. Cubatchsize=100, doar 100.
Un număr util pentru sarcini tipice de scriere: 5000-10000. Crește dacă scrierile sunt lente și rețeaua e bottleneck-ul. Scade dacă vezi OOM-uri sau batch-uri care depășesc timpul.
isolationLevel
Implicit este READ_COMMITTED pentru majoritatea driverelor. Fiecare batch rulează în propria tranzacție. Dacă un batch eșuează, acel batch face rollback; batch-urile commit-uite anterior rămân commit-uite.
Alte niveluri, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, NONE, sunt relevante mai ales când tabela destinație este citită concurent și îți pasă ce văd acei cititori. Pentru încărcări batch în background pe o tabelă pe care nu o citește nimeni altcineva, valoarea implicită e bună. Setarea isolationLevel=NONE (autocommit per rând) sună mai rapid dar este aproape întotdeauna mai lent și rupe orice noțiune de granițe de recovery.
Moduri de salvare: alege cu grijă
Patru moduri, iar alegerea are consecințe reale:
append: face INSERT pentru rânduri noi în tabela existentă. Tabela trebuie să existe cu o schemă compatibilă, sau Spark o creează la prima scriere folosind tipuri inferate. Aditiv, ceea ce este cel mai sigur mod pentru producție. Riscul sunt duplicatele dacă re-rulezi.
overwrite: și aici se rănesc oamenii. Implicit, overwrite face DROP TABLE urmat de CREATE TABLE și inserturi. Tabela originală a dispărut, inclusiv orice index, constrângeri, chei străine, grant-uri și triggere. Tabela recreată are doar coloanele și tipurile pe care Spark le-a inferat, fără constrângeri, fără indexuri, fără nimic. Asta va rupe în liniște aplicația trei zile mai târziu când cineva încearcă să interogheze după un index care lipsește.
Reparația:
.option("truncate", "true")
.mode("overwrite")
Cu truncate=true, Spark emite TRUNCATE TABLE în loc de DROP, apoi inserează. Schema, indexurile, constrângerile, toate păstrate. Folosește asta. Întotdeauna. Unele drivere (Postgres mai vechi) ignoră truncate=true și revin la drop-and-recreate; verifică documentația pentru versiunea ta de driver. Dacă a ta face asta, folosește pattern-ul de tabelă de staging (mai jos) în schimb.
Există și option("cascadeTruncate", "true") pentru Postgres dacă ai chei străine care indică spre tabela ta, ceea ce probabil nu ar trebui să ai pe o țintă analitică de scriere.
error (implicit): eșuează dacă tabela există. Bun pentru „vreau să fiu sigur că nu calc pe ceva.”
ignore: nu face nimic în liniște dacă tabela există. Util în scripturi de setup idempotente; periculos în scrieri reale (s-ar putea să sari peste scriere în liniște).
Problema idempotenței
Acum cea reală. Imaginează-ți secvența asta:
- Spark începe o scriere cu 16 partiții.
- Partițiile 1-12 se termină cu succes, 12 batch-uri commit-uite în baza de date.
- Partiția 13 eșuează: hopa de rețea, OOM, executor pierdut, ce-o fi.
- Spark reîncearcă partiția 13 pe un alt executor.
- Reîncercarea partiției 13 reușește.
Pare în regulă, nu? Cu excepția faptului că partiția 13 ar fi putut commit-ui jumătate din batch-urile ei înainte să eșueze. Reîncercarea apoi commit-uiește toate batch-urile ei. Ai rânduri duplicate pentru ce a scris parțial task-ul original.
Asta nu e teoretic. Spark garantează execuție at-least-once per partiție la retry. Cu modul append și fără alte măsuri de protecție, asta înseamnă at-least-once și la nivel de rând, ceea ce înseamnă duplicate la eșec.
Există două pattern-uri de calitate de producție pentru a repara asta.
Pattern 1: tabelă de staging cu swap atomic
Scrii într-o tabelă temporară care nu contează, apoi într-o singură tranzacție copiezi sau faci swap în tabela reală:
# Pasul 1: scrie in staging, duplicatele sunt ok aici
(df.coalesce(16).write
.format("jdbc")
.option("url", url)
.option("dbtable", "public.daily_summary_staging")
.option("user", user).option("password", pw)
.option("driver", driver)
.option("batchsize", 5000)
.mode("overwrite")
.option("truncate", "true")
.save())
# Pasul 2: swap atomic, executat direct pe baza de date
import psycopg2
with psycopg2.connect(host="...", dbname="reports", user="...", password="...") as conn:
with conn.cursor() as cur:
cur.execute("""
BEGIN;
DELETE FROM public.daily_summary
WHERE report_date = %s;
INSERT INTO public.daily_summary
SELECT DISTINCT * FROM public.daily_summary_staging;
COMMIT;
""", (report_date,))
Job-ul Spark are voie să fie neglijent pentru că nimeni din downstream nu citește tabela de staging. Swap-ul atomic în tabela reală este o singură tranzacție la nivelul bazei de date, deci nu există stare-jumătate vizibilă cititorilor. Dacă swap-ul eșuează, tabela de staging încă are datele și poți reîncerca. Dacă job-ul Spark eșuează, îl re-rulezi; tabela de staging este suprascrisă. Idempotent la fiecare pas.
Acesta este cel mai fiabil pattern. Te costă 2x storage-ul pentru scurt timp și un hop suplimentar și merită.
Pattern 2: upsert-uri idempotente prin cheie primară
Dacă tabela ta destinație are o cheie primară sau o constrângere de unicitate și te poți baza pe ea, folosește upsert-ul nativ al bazei de date:
-- Postgres
INSERT INTO daily_summary (report_date, country, total)
VALUES (...)
ON CONFLICT (report_date, country) DO UPDATE
SET total = EXCLUDED.total;
-- MySQL
INSERT INTO daily_summary (report_date, country, total)
VALUES (...)
ON DUPLICATE KEY UPDATE total = VALUES(total);
Writer-ul JDBC al Spark nu suportă upsert-uri nativ. Ai două opțiuni:
Opțiunea A: scrie într-o tabelă de staging (ca mai sus), apoi rulează INSERT ... ON CONFLICT pe partea bazei de date ca o singură instrucțiune SQL care acoperă toate rândurile.
Opțiunea B: coboară în df.foreachPartition() și scrie-ți propriile upsert-uri JDBC. Forma:
def upsert_partition(rows):
import psycopg2
conn = psycopg2.connect(...)
cur = conn.cursor()
sql = """
INSERT INTO daily_summary (report_date, country, total)
VALUES (%s, %s, %s)
ON CONFLICT (report_date, country) DO UPDATE
SET total = EXCLUDED.total
"""
batch = []
for row in rows:
batch.append((row.report_date, row.country, row.total))
if len(batch) >= 5000:
cur.executemany(sql, batch)
conn.commit()
batch.clear()
if batch:
cur.executemany(sql, batch)
conn.commit()
cur.close()
conn.close()
df.coalesce(16).foreachPartition(upsert_partition)
Fiecare rând este acum idempotent la nivelul bazei de date, un retry al aceluiași rând produce același rezultat. Faci mai multă muncă pe rând decât executeBatch-ul vectorizat al Spark, deci asta e mai lent decât un append direct, dar obții semantică reală at-most-once. Un pattern de păstrat în buzunar.
Cifre, pentru că ajută
Benchmark-uri concrete dintr-o sarcină reală: 100 de milioane de rânduri subțiri (timestamp, cheie, trei numerice) în Postgres pe același VPC:
| Configurație | Timp pe ceas |
|---|---|
Implicit append, 1000 partiții, batchsize 1000 | ~30 minute |
append, coalesce(16), batchsize 5000 | ~3 minute |
| Tabelă de staging + swap atomic, 16 partiții, batchsize 10000 | ~3.5 minute |
Upsert-uri foreachPartition, 16 partiții, batchsize 5000 | ~7 minute |
Primul rând e ce primești dacă uiți de partiționare. Al doilea e sweet spot-ul paralel-și-batched. Al treilea costă aproape nimic în plus și îți oferă idempotență. Al patrulea e mai lent dar îți oferă upsert-uri idempotente per rând. Alege-l pe cel care se potrivește nevoilor tale de fiabilitate.
Câteva butoane în plus
createTableOptions: clauze suplimentare adăugate la CREATE TABLE când Spark creează ținta. Util pentru lucruri precum WITH (autovacuum_enabled=false) pe o tabelă Postgres doar pentru încărcare.
createTableColumnTypes: suprascrie tipurile de coloană inferate de Spark. Folosește asta când vrei VARCHAR(255) în loc de TEXT, sau NUMERIC(18,2) în loc de DOUBLE PRECISION.
queryTimeout: omoară orice interogare singulară care rulează prea mult. Setează asta în producție. Implicit este nelimitat, ceea ce e exact ce nu vrei când un deadlock a înghețat unul dintre batch-urile tale.
sessionInitStatement: SQL rulat pe fiecare conexiune înainte de orice scriere. Util pentru SET statement_timeout = '5min', SET synchronous_commit = off (hack de performanță Postgres pe o destinație necritică), sau SET search_path = ....
Lista NU FACE, ediția scriere
Nu scrie un DataFrame de 1000 de partiții direct într-o bază de date tranzacțională. Coalesce sau repartiționează la un număr sănătos mai întâi. Întotdeauna.
Nu folosi mode("overwrite") fără option("truncate", "true") decât dacă chiar vrei tabela ștearsă. Și chiar și atunci, asigură-te că nicio aplicație nu depinde de indexurile sau grant-urile pe care urmează să le vaporizezi.
Nu presupune că append este idempotent. Nu este, la retry. Fie te angajezi la pattern-ul de tabelă de staging, fie la pattern-ul de upsert, fie acceptă că există duplicate ocazionale și deduplichează în downstream.
Nu sări peste strategia de indexare a destinației. Insert-urile bulk pe o tabelă puternic indexată sunt lente pentru că fiecare rând actualizează fiecare index. Dacă controlezi schema și scrierile sunt în rafale, un pattern comun este: dă drop la indexuri, bulk insert, recreează indexurile. Dar asta e periculos pe o tabelă live și e mult mai ușor cu pattern-ul staging-and-swap, unde poți construi indexurile pe staging înainte de swap.
Nu scrie în DB-ul tranzacțional de prod din Spark pentru orice e sensibil la timp. Aceeași avertizare ca lecția trecută, în sens invers: job-urile de scriere Spark țin conexiuni, fac insert-uri bulk, rulează tranzacții. Aplicația va observa. Aterizează datele într-un warehouse sau DB analitic în schimb și lasă aplicația să le tragă de acolo.
Încearcă asta
from pyspark.sql import SparkSession
import os
spark = (SparkSession.builder
.appName("JdbcWriteDemo")
.master("local[*]")
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
.getOrCreate())
# Construieste un DataFrame sintetic cu 5M randuri, 200 partitii
df = (spark.range(5_000_000)
.withColumnRenamed("id", "user_id")
.selectExpr(
"user_id",
"user_id % 100 as country_id",
"rand() * 1000 as amount",
)
.repartition(200))
url = "jdbc:postgresql://localhost:5432/demo"
props = {
"user": "spark_writer",
"password": os.environ["DB_PASSWORD"],
"driver": "org.postgresql.Driver",
"batchsize": "5000",
}
# Nu face asta, prea multe conexiuni
# df.write.jdbc(url=url, table="public.results_bad",
# mode="overwrite", properties=props)
# Fa asta, coalesce intai, truncate nu drop
(df.coalesce(16).write
.option("truncate", "true")
.jdbc(url=url, table="public.results",
mode="overwrite", properties=props))
Urmărește pg_stat_activity în timp ce rulează. Cu versiunea proastă (comentată), ai vedea 200 de conexiuni luptându-se pentru sloturile pool-ului de conexiuni și job-ul gâfâind. Cu versiunea bună, vezi 16 conexiuni constante, fiecare mestecând prin batch-uri până la final.
Lecția următoare, cloud object storage: S3, GCS, Azure Blob, noul „filesystem implicit” pentru Spark în 2026. Povestea consistenței este în sfârșit simplă, dar problema renumirii încă mușcă, și de aceea există direct-write committers.
Referințe: documentația sursei de date JDBC din Apache Spark (https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html). Consultat 2026-05-01.