Am petrecut ultimele două lecții cu surse bazate pe fișiere. Dar o porțiune uriașă din datele din lumea reală trăiește în baze de date tranzacționale: Postgres, MySQL, SQL Server, Oracle, iar la un moment dat va trebui să tragi din ele în Spark pentru analitică, join-uri, date de antrenament ML, orice.
Conectorul JDBC al Spark e puntea. E înșelător de simplu de apelat: un spark.read.format("jdbc") și ai un DataFrame. E înșelător de ușor de folosit greșit: un apel greșit și topești baza de date de producție în timp ce raportul colegului tău dă timeout, iar DBA-ul te sună la 11 noaptea.
Lecția asta e manualul sigur și rapid pentru citirea din surse JDBC. Lecția următoare se ocupă de scriere.
Apelul de bază
df = (spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://db.example.com:5432/orders")
.option("dbtable", "public.orders")
.option("user", "spark_reader")
.option("password", "...")
.option("driver", "org.postgresql.Driver")
.load())
Câteva lucruri de remarcat imediat:
urle un URL JDBC, incluzând numele bazei de date, nu doar un host. Formatul e specific driverului:jdbc:postgresql://...,jdbc:mysql://...,jdbc:sqlserver://...,jdbc:oracle:thin:@....dbtablepoate fi un nume de tabel sau o subinterogare între paranteze cu un alias:"(SELECT id, amount FROM orders WHERE created_at > '2026-01-01') t". Ultima e modul în care împingi filtrarea către baza de date când pushdown-ul Spark nu e suficient de deștept.drivernumește clasa driverului JDBC. Fiecare bază de date are nevoie de propria:org.postgresql.Driver,com.mysql.cj.jdbc.Driver,com.microsoft.sqlserver.jdbc.SQLServerDriver. JAR-ul driverului trebuie să fie pe classpath-ul Spark; vom acoperi asta într-o clipă.- Credențialele în cod sunt rele. Folosește variabile de mediu, secret managers sau
.option("user", os.environ["DB_USER"]). Nu commite o parolă.
Asta e versiunea simplă. Acum capcana.
De ce versiunea simplă e periculoasă
Rulează acel cod pe un tabel cu 200 de milioane de rânduri. Ce se întâmplă?
Spark construiește o singură conexiune JDBC, trimite o singură interogare, SELECT * FROM public.orders, și citește întregul set de rezultate printr-un singur cursor. Întregul job rulează pe un singur executor pe o singură sarcină. Două sute de CPU-uri paralele stau degeaba. Între timp, baza de date sursă generează două sute de milioane de rânduri pentru o singură conexiune care le trage cât de repede poate.
Asta e rău în trei moduri:
- Nu obții niciun paralelism pe partea Spark. Clusterul tău șic de 50 de noduri e inutil.
- Ții o singură interogare de lungă durată deschisă pe DB-ul sursă. Postgres ia un snapshot, MySQL ține lacăte (în funcție de nivelul de izolare), SQL Server poate crește tempdb. Interogarea tranzacțională a colegului tău așteaptă.
- Dacă citirea eșuează la jumătatea drumului, începi de la zero. Niciun progres parțial.
Am urmărit personal cum un inginer junior a doborât un replica Postgres de producție rulând exact acest cod pe un tabel de comenzi de 500 GB într-o vineri după-amiază. DBA-ul a fost foarte politicos în legătură cu asta.
Citiri paralele: cele patru opțiuni magice
Spark îți dă o ieșire. Patru opțiuni care funcționează împreună:
df = (spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://db.example.com:5432/orders")
.option("dbtable", "public.orders")
.option("user", "spark_reader")
.option("password", os.environ["DB_PASSWORD"])
.option("driver", "org.postgresql.Driver")
.option("partitionColumn", "id")
.option("lowerBound", 1)
.option("upperBound", 200_000_000)
.option("numPartitions", 32)
.option("fetchsize", 10_000)
.load())
Ce face asta: Spark ia intervalul [lowerBound, upperBound] pe partitionColumn, îl împarte în numPartitions bucăți egale și rulează fiecare bucată ca propria interogare SQL în propria sarcină pe propriul executor. Cu numPartitions=32, Spark emite 32 de interogări către DB-ul sursă, aproximativ:
SELECT * FROM public.orders WHERE id < 6_250_001
SELECT * FROM public.orders WHERE id >= 6_250_001 AND id < 12_500_001
SELECT * FROM public.orders WHERE id >= 12_500_001 AND id < 18_750_001
...
SELECT * FROM public.orders WHERE id >= 193_750_001
Acum ai paralelism 32-way pe partea Spark. DB-ul sursă vede 32 de conexiuni concurente în loc de 1, fiecare scanând un interval mai mic. Timpul total pe ceasul de perete scade dramatic. Dacă o singură sarcină eșuează, doar acel interval e reîncercat.
Constrângerile ca asta să funcționeze bine:
partitionColumnar trebui să fie numerică sau o dată/timestamp. Spark construiește filtre de interval pe ea; coloanele text arbitrare nu funcționează.- Ar trebui să fie aproximativ uniform distribuită pe
[lowerBound, upperBound]. Cheile primare auto-increment sunt perfecte. UUID-urile nu sunt (decât dacă ai o coloană numerică ascunsă). Timestamp-urile funcționează dacă rândurile sunt răspândite pe interval. - Ar trebui să fie indexată pe DB-ul sursă, altfel fiecare dintre acele interogări paralele face un scan complet de tabel și ai înrăutățit lucrurile, nu le-ai îmbunătățit.
lowerBoundșiupperBoundnu filtrează datele. Rândurile sublowerBoundajung în prima partiție; rândurile pesteupperBoundajung în ultima. Limitele definesc doar cum împarte Spark intervalul. Dacă le setezi sălbatic greșit, vei obține skew teribil: o singură partiție cu 99% din rânduri.numPartitionse paralelismul. Potrivește-l aproximativ cu nucleele executorului tău, dar limitează-l la ce poate tolera DB-ul sursă concurent. 32-64 e un interval rezonabil; 500 te va face să fii blocat de DBA.
Un tipar sigur e să iei min-ul și max-ul real al partitionColumn mai întâi, într-o interogare minusculă, și să le folosești:
bounds = (spark.read.format("jdbc")
.option("url", url)
.option("dbtable", "(SELECT MIN(id) AS lo, MAX(id) AS hi FROM public.orders) b")
.option("user", user).option("password", pw)
.option("driver", driver)
.load()
.first())
df = (spark.read.format("jdbc")
.option("url", url)
.option("dbtable", "public.orders")
.option("user", user).option("password", pw)
.option("driver", driver)
.option("partitionColumn", "id")
.option("lowerBound", bounds["lo"])
.option("upperBound", bounds["hi"])
.option("numPartitions", 32)
.load())
Alternativa predicates
Tiparul cu patru opțiuni funcționează doar pentru coloane numerice sau de date monotone. Dacă datele tale sunt partiționate natural după altceva, un cod de țară, un ID de tenant, un enum de status? Folosește opțiunea predicates, care ia o listă explicită de clauze WHERE, una per sarcină:
predicates = [
"country = 'IT'",
"country = 'FR'",
"country = 'DE'",
"country = 'ES'",
"country IN ('US', 'CA', 'MX')",
"country NOT IN ('IT', 'FR', 'DE', 'ES', 'US', 'CA', 'MX')",
]
df = spark.read.jdbc(
url=url,
table="public.orders",
predicates=predicates,
properties={"user": user, "password": pw, "driver": driver},
)
Fiecare predicat devine o sarcină care rulează o interogare SQL. Opțiunea predicates e mai flexibilă decât partitionColumn pentru că tu controlezi exact ce ia fiecare sarcină, dar e și mai predispusă la erori: predicatele trebuie să acopere colectiv fiecare rând exact o dată. Dacă se suprapun, obții duplicate. Dacă au goluri, obții rânduri lipsă. Nu există validare: Spark are încredere în tine.
Un tipar util: împarte după hash:
predicates = [f"MOD(id, 32) = {i}" for i in range(32)]
Chiar și cu nevoi de partiționare nenumerice, asta îți dă paralelism 32-way fără suprapunere sau goluri. Costul e că fiecare sarcină face un filtru cu apel de funcție, ceea ce înseamnă de obicei un scan complet, decât dacă ai un index bazat pe funcție. Merită pentru tabele de dimensiune moderată, dureros pentru cele uriașe.
JAR-ul driverului
JDBC are nevoie de un JAR de driver pentru fiecare tip de bază de date. Spark nu le livrează. Trebuie să le furnizezi.
Trei moduri de a atașa un driver:
# 1. --jars when you launch
spark-submit --jars /path/to/postgresql-42.7.0.jar my_job.py
# 2. --packages with Maven coordinates (downloads automatically)
spark-submit --packages org.postgresql:postgresql:42.7.0 my_job.py
# 3. In the SparkSession config (works for local development)
spark = (SparkSession.builder
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
.getOrCreate())
Dacă uiți JAR-ul, primești java.lang.ClassNotFoundException: org.postgresql.Driver la prima citire. Diagnostic ușor, rezolvare ușoară.
Coordonate Maven obișnuite:
- Postgres:
org.postgresql:postgresql:42.7.0 - MySQL:
com.mysql:mysql-connector-j:8.3.0 - SQL Server:
com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre11 - Oracle:
com.oracle.database.jdbc:ojdbc11:23.3.0.23.09(cu permisiunea licenței)
Potrivește versiunea cu versiunea majoră a bazei tale de date când ai dubii; nepotrivirile funcționează de obicei, dar produc ocazional bug-uri ciudate de coerciție de tipuri.
Pushdown și limitele lui
Spark încearcă să împingă filtrele și proiecțiile către baza de date. Cazurile simple funcționează:
df = (spark.read.format("jdbc")
.option("url", url).option("dbtable", "public.orders")
...
.load())
(df.filter("country = 'IT'")
.select("order_id", "amount")
.explain())
# == Physical Plan ==
# *(1) Scan JDBCRelation(public.orders) [order_id#0,amount#3]
# PushedFilters: [*EqualTo(country,IT)],
# ReadSchema: struct<order_id:bigint,amount:double>
PushedFilters și ReadSchema funcționează la fel ca la Parquet (lecția 43): Spark trimite o interogare ca SELECT order_id, amount FROM public.orders WHERE country = 'IT' în loc să tragă tot tabelul. Asteriscul dinaintea lui EqualTo indică faptul că filtrul e împins complet în jos: Spark nu îl va re-evalua.
Ce se împinge: egalitate, comparație, IN cu mulțime mică, IS NULL / IS NOT NULL, AND de predicate care se pot împinge. Ce nu: apeluri de funcții (UPPER(country) = 'IT'), aritmetică pe coloana filtrată, OR cu cel puțin o parte care nu se poate împinge, expresii CASE complexe. Când pushdown-ul eșuează, Spark trage mai multe date decât te aștepți și filtrează în cluster. Verifică mereu .explain() înainte să presupui că un filtru s-a împins.
Când pushdown-ul nu e suficient, recurge la trucul cu subinterogarea:
.option("dbtable", """
(SELECT order_id, amount, country
FROM public.orders
WHERE country IN ('IT', 'FR', 'DE')
AND created_at > '2026-01-01') t
""")
Baza de date execută subinterogarea; Spark vede rezultatul. Renunți la o parte din lizibilitate, dar controlezi exact ce se trimite. Combină cu partitionColumn pentru citiri paralele pe rezultatul subinterogării, dar reține că partitionColumn trebuie să refere o coloană care e în output-ul subinterogării.
fetchsize și alte butoane
fetchsize controlează câte rânduri trage driverul JDBC per round-trip de rețea. Default-ul e specific driverului și de obicei mic (Postgres: 0, însemnând „totul deodată”, dezastruos; MySQL: sosește în loturi, dar tot ar trebui să tunezi asta). Setează-l explicit:
.option("fetchsize", 10_000)
fetchsize mai mare înseamnă mai puține round-trips, ceea ce e mai rapid pe conexiuni cu latență mare. Costul: fiecare sarcină tamponează fetchsize rânduri în memoria driverului înainte să le treacă mai departe. 1.000-50.000 e tipic. Tunează în sus dacă citirile sunt lente și executorul are spațiu de memorie; tunează în jos dacă vezi OOM în sarcinile JDBC.
Pentru Postgres specific, setează și option("autocommit", "false") dacă vrei ca fetchsize să aibă efect: driverul JDBC îl cere pentru fetching bazat pe cursor. Alte baze de date au propriile lor ciudățenii; consultă docs-ul driverului când tuningul contează.
Câteva alte opțiuni de știut:
isolationLevel: izolarea de tranzacție pe care o folosește Spark. DefaultREAD_UNCOMMITTEDpentru majoritatea driverelor, ceea ce e ok pentru analitică pe un tabel stabil. Dacă citești un tabel scris intens,REPEATABLE_READîți dă un snapshot consistent cu costul mai multor lacăte.queryTimeout: cât timp poate rula interogarea unei singure sarcini înainte să fie omorâtă. Default nelimitat. Setează asta în producție, ca o interogare care a luat-o razna să nu atârne la nesfârșit.sessionInitStatement: SQL rulat pe fiecare conexiune înainte de interogarea principală. Util pentruSET search_path, statement timeouts sau mod read-only.
Lista cu „Nu fă”
Câteva tipare care arată rezonabil și nu sunt:
Nu rula joburi analitice Spark pe DB-ul tranzacțional de producție. Chiar și cu citiri paralele, concurezi cu traficul aplicației pe care DB-ul există ca să-l servească. Arhitectura corectă e replicarea: snapshot nocturn, stream change-data-capture sau un serviciu gestionat precum replicarea logică Postgres într-un depozit, export S3 sau Debezium → Kafka → Avro lake. Apoi rulează Spark pe replica sau pe lake.
Nu citi un tabel tranzacțional direct când chiar vrei snapshot-ul cel mai recent pentru analitică. Dacă tabelul tău fact se schimbă în fiecare minut, o citire range-partitioned nebucketizată ia ore, iar datele s-au schimbat până termini. Snapshot-ează tabelul o dată în Parquet, apoi iterează pe copia Parquet.
Nu tuna numPartitions în sus fără să verifici cu proprietarul DB-ului. Postgres gestionează de obicei 32 de citiri concurente. SQL Server tolerează mai multe. Oracle e mai pretențios. DBA-ul știe limita de conexiuni, dimensiunea pool-ului de conexiuni și ce momente ale zilei sunt aglomerate. Întreabă.
Nu folosi dbtable cu o subinterogare complexă și partitionColumn împreună fără testare. Interacțiunea e subtilă: filtrul de partiție înconjoară subinterogarea, iar unele baze de date gestionează asta mai prost decât altele. Rulează mereu o singură partiție întâi ca să verifici că SQL-ul executat e rezonabil, apoi scalează în sus.
Încearcă asta
Un exemplu de sine stătător folosind un Postgres local (Docker e cel mai ușor):
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("JdbcDemo")
.master("local[*]")
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
.getOrCreate())
url = "jdbc:postgresql://localhost:5432/demo"
props = {
"user": "spark_reader",
"password": "...",
"driver": "org.postgresql.Driver",
"fetchsize": "10000",
}
# Sequential read — one task, one query
slow = spark.read.jdbc(url=url, table="public.orders", properties=props)
print(slow.rdd.getNumPartitions()) # 1
# Parallel read — 16 tasks, 16 queries
fast = spark.read.jdbc(
url=url,
table="public.orders",
column="id",
lowerBound=1,
upperBound=2_000_000,
numPartitions=16,
properties=props,
)
print(fast.rdd.getNumPartitions()) # 16
# Predicates pattern — useful when there's no monotonic column
preds = [f"MOD(id, 8) = {i}" for i in range(8)]
hashed = spark.read.jdbc(
url=url, table="public.orders",
predicates=preds, properties=props,
)
print(hashed.rdd.getNumPartitions()) # 8
Compară timpii pe ceasul de perete ai celor trei citiri. Cea paralelă ar trebui să fie vizibil mai rapidă pe orice tabel netrivial. Uită-te la pg_stat_activity pe partea Postgres în timp ce o citire paralelă rulează: vei vedea cele 16 interogări concurente, fiecare scanându-și felia.
Lecția următoare, scrierea înapoi în JDBC: povestea analogă a paralelismului pentru insert-uri, de ce dimensiunile loturilor contează și mai mult pe partea de scriere și gimnastica de upsert/merge cerută pentru a actualiza rânduri existente dintr-un DataFrame Spark.
Referințe: documentația sursei de date JDBC din Apache Spark (https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) și ghidul driverului JDBC PostgreSQL (https://jdbc.postgresql.org/documentation/). Consultat 2026-05-01.