PySpark, dalle fondamenta Lezione 45 / 60

Leggere da JDBC: estrarre da Postgres, MySQL, SQL Server

Il connettore source JDBC, il trucco di partitionColumn, e perché una lettura ingenua manda al tappeto il database sorgente.

Le ultime due lezioni le abbiamo passate sui source basati su file. Ma una grossa fetta dei dati del mondo reale vive nei database transazionali (Postgres, MySQL, SQL Server, Oracle) e prima o poi ti toccherà tirarli dentro Spark per analitica, join, dati di training per ML, qualsiasi cosa.

Il connettore JDBC di Spark è il ponte. È ingannevolmente semplice da chiamare: un spark.read.format("jdbc") e hai un DataFrame. Ed è ingannevolmente facile da usare male: una chiamata sbagliata e fondi il database di produzione mentre la query di report del tuo collega va in timeout e il DBA ti chiama alle 11 di sera.

Questa lezione è il manuale per leggere da source JDBC in modo sicuro e veloce. La prossima lezione si occupa della scrittura.

La chiamata di base

df = (spark.read
        .format("jdbc")
        .option("url", "jdbc:postgresql://db.example.com:5432/orders")
        .option("dbtable", "public.orders")
        .option("user", "spark_reader")
        .option("password", "...")
        .option("driver", "org.postgresql.Driver")
        .load())

Un paio di cose da notare subito:

  • L’url è un URL JDBC, che include il nome del database, non solo l’host. Il formato è specifico del driver: jdbc:postgresql://..., jdbc:mysql://..., jdbc:sqlserver://..., jdbc:oracle:thin:@....
  • dbtable può essere il nome di una tabella o una subquery tra parentesi con un alias: "(SELECT id, amount FROM orders WHERE created_at > '2026-01-01') t". Quest’ultima è il modo in cui spingi il filtro al database quando il pushdown di Spark non è abbastanza intelligente.
  • driver indica la classe del driver JDBC. Ogni database ne ha uno suo: org.postgresql.Driver, com.mysql.cj.jdbc.Driver, com.microsoft.sqlserver.jdbc.SQLServerDriver. Il JAR del driver deve trovarsi nel classpath di Spark; ne parliamo tra poco.
  • Le credenziali nel codice sono una cattiva idea. Usa variabili d’ambiente, secret manager, o .option("user", os.environ["DB_USER"]). Non committare una password.

Questa è la versione semplice. Adesso la trappola.

Perché la versione semplice è pericolosa

Lancia quel codice contro una tabella da 200 milioni di righe. Cosa succede?

Spark apre una singola connessione JDBC, manda una sola query (SELECT * FROM public.orders) e legge l’intero result set attraverso un solo cursor. L’intero job gira su un executor su un task. Duecento CPU parallele restano ferme. Il database sorgente, intanto, sta generando duecento milioni di righe per una singola connessione che le tira giù alla velocità massima possibile.

È brutto in tre modi:

  1. Non hai parallelismo lato Spark. Il tuo cluster sfavillante da 50 nodi è inutile.
  2. Tieni una singola query a lunga esecuzione aperta contro il DB sorgente. Postgres prende uno snapshot, MySQL tiene lock (a seconda dell’isolation level), SQL Server può far crescere tempdb. La query transazionale del tuo collega aspetta.
  3. Se la lettura fallisce a metà, riparti da zero. Niente progresso parziale.

Ho visto personalmente un junior engineer mandare giù una replica di Postgres in produzione lanciando esattamente questo codice contro una tabella orders da 500 GB un venerdì pomeriggio. Il DBA è stato molto educato.

Letture parallele: le quattro opzioni magiche

Spark ti offre una via d’uscita. Quattro opzioni che lavorano insieme:

df = (spark.read
        .format("jdbc")
        .option("url", "jdbc:postgresql://db.example.com:5432/orders")
        .option("dbtable", "public.orders")
        .option("user", "spark_reader")
        .option("password", os.environ["DB_PASSWORD"])
        .option("driver", "org.postgresql.Driver")
        .option("partitionColumn", "id")
        .option("lowerBound", 1)
        .option("upperBound", 200_000_000)
        .option("numPartitions", 32)
        .option("fetchsize", 10_000)
        .load())

Cosa fa: Spark prende il range [lowerBound, upperBound] su partitionColumn, lo divide in numPartitions blocchi uguali, e lancia ogni blocco come una sua query SQL nel suo task sul suo executor. Con numPartitions=32, Spark emette 32 query verso il DB sorgente, all’incirca:

SELECT * FROM public.orders WHERE id <  6_250_001
SELECT * FROM public.orders WHERE id >= 6_250_001 AND id < 12_500_001
SELECT * FROM public.orders WHERE id >= 12_500_001 AND id < 18_750_001
...
SELECT * FROM public.orders WHERE id >= 193_750_001

Ora hai parallelismo a 32 vie sul lato Spark. Il DB sorgente vede 32 connessioni concorrenti invece di 1, ognuna che scansiona un range più piccolo. Il tempo totale wall-clock crolla in modo drammatico. Se un singolo task fallisce, viene ritentato solo quel range.

I vincoli per far funzionare bene la cosa:

  • partitionColumn dovrebbe essere numerica o di tipo data/timestamp. Spark costruisce filtri di range su di essa; colonne testuali arbitrarie non funzionano.
  • Dovrebbe essere distribuita in modo all’incirca uniforme su [lowerBound, upperBound]. Le primary key auto-incrementali sono perfette. Gli UUID no (a meno che tu non abbia una colonna numerica ombra). I timestamp funzionano se le righe sono distribuite sul range.
  • Dovrebbe essere indicizzata sul DB sorgente, altrimenti ognuna di quelle query parallele fa un full table scan, e hai peggiorato le cose, non migliorate.
  • lowerBound e upperBound non filtrano i dati. Le righe sotto lowerBound finiscono nella prima partition; le righe sopra upperBound finiscono nell’ultima. I bound definiscono solo come Spark divide il range. Se li imposti molto a casaccio, ti ritrovi con uno skew tremendo: una partition con il 99% delle righe.
  • numPartitions è il parallelismo. Allinealo all’incirca ai core dei tuoi executor, ma mettigli un cap su quanto il DB sorgente tollera in concorrenza. 32-64 è un range ragionevole; 500 ti fa bloccare dal DBA.

Un pattern sicuro è recuperare prima il min e il max effettivi di partitionColumn con una query minuscola, e usare quelli:

bounds = (spark.read.format("jdbc")
          .option("url", url)
          .option("dbtable", "(SELECT MIN(id) AS lo, MAX(id) AS hi FROM public.orders) b")
          .option("user", user).option("password", pw)
          .option("driver", driver)
          .load()
          .first())

df = (spark.read.format("jdbc")
        .option("url", url)
        .option("dbtable", "public.orders")
        .option("user", user).option("password", pw)
        .option("driver", driver)
        .option("partitionColumn", "id")
        .option("lowerBound", bounds["lo"])
        .option("upperBound", bounds["hi"])
        .option("numPartitions", 32)
        .load())

L’alternativa predicates

Il pattern a quattro opzioni funziona solo per colonne numeriche o di tipo data monotone. E se i tuoi dati sono partizionati naturalmente per qualcos’altro (un codice paese, un tenant ID, una status enum)? Usa l’opzione predicates, che prende una lista esplicita di clausole WHERE, una per task:

predicates = [
    "country = 'IT'",
    "country = 'FR'",
    "country = 'DE'",
    "country = 'ES'",
    "country IN ('US', 'CA', 'MX')",
    "country NOT IN ('IT', 'FR', 'DE', 'ES', 'US', 'CA', 'MX')",
]

df = spark.read.jdbc(
    url=url,
    table="public.orders",
    predicates=predicates,
    properties={"user": user, "password": pw, "driver": driver},
)

Ogni predicato diventa un task che lancia una query SQL. L’opzione predicates è più flessibile di partitionColumn perché controlli esattamente cosa pesca ogni task, ma è anche più soggetta a errori: i predicati devono complessivamente coprire ogni riga esattamente una volta. Se si sovrappongono, ottieni duplicati. Se hanno buchi, ti mancano righe. Non c’è validazione: Spark si fida di te.

Un pattern utile: split per hash:

predicates = [f"MOD(id, 32) = {i}" for i in range(32)]

Anche con esigenze di partitioning non numerico, questo ti dà parallelismo a 32 vie senza sovrapposizioni o buchi. Il costo è che ogni task fa un filtro con chiamata a funzione, il che di solito significa un full scan a meno che tu non abbia un indice basato su funzione. Vale la pena su tabelle di dimensione moderata, doloroso su quelle enormi.

Il JAR del driver

JDBC ha bisogno di un JAR di driver per ogni tipo di database. Spark non li include. Devi fornirli tu.

Tre modi per attaccare un driver:

# 1. --jars al lancio
spark-submit --jars /path/to/postgresql-42.7.0.jar my_job.py

# 2. --packages con coordinate Maven (scarica in automatico)
spark-submit --packages org.postgresql:postgresql:42.7.0 my_job.py
# 3. Nella config della SparkSession (funziona per sviluppo locale)
spark = (SparkSession.builder
         .config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
         .getOrCreate())

Se ti dimentichi il JAR, ottieni java.lang.ClassNotFoundException: org.postgresql.Driver alla prima lettura. Diagnostica facile, fix facile.

Coordinate Maven comuni:

  • Postgres: org.postgresql:postgresql:42.7.0
  • MySQL: com.mysql:mysql-connector-j:8.3.0
  • SQL Server: com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre11
  • Oracle: com.oracle.database.jdbc:ojdbc11:23.3.0.23.09 (compatibilmente con la licenza)

Nel dubbio, allinea la versione alla major version del database; mismatch di solito funzionano ma occasionalmente producono strani bug di type coercion.

Pushdown e i suoi limiti

Spark prova a spingere filtri e proiezioni giù al database. I casi semplici funzionano:

df = (spark.read.format("jdbc")
        .option("url", url).option("dbtable", "public.orders")
        ...
        .load())

(df.filter("country = 'IT'")
   .select("order_id", "amount")
   .explain())

# == Physical Plan ==
# *(1) Scan JDBCRelation(public.orders) [order_id#0,amount#3]
#   PushedFilters: [*EqualTo(country,IT)],
#   ReadSchema: struct<order_id:bigint,amount:double>

I PushedFilters e ReadSchema funzionano allo stesso modo del Parquet (lezione 43): Spark manda una query come SELECT order_id, amount FROM public.orders WHERE country = 'IT' invece di tirare giù tutta la tabella. L’asterisco prima di EqualTo indica che il filtro è completamente pushed down: Spark non lo rivaluta.

Cosa va in pushdown: uguaglianza, confronto, IN con un set piccolo, IS NULL / IS NOT NULL, AND di predicati pushable. Cosa no: chiamate a funzioni (UPPER(country) = 'IT'), aritmetica sulla colonna filtrata, OR con almeno un lato non pushable, espressioni CASE complesse. Quando il pushdown fallisce, Spark tira più dati di quanto ti aspetti e filtra in cluster. Controlla sempre .explain() prima di dare per scontato che un filtro sia stato pushed down.

Quando il pushdown non basta, ripieghi sul trucco della subquery:

.option("dbtable", """
    (SELECT order_id, amount, country
     FROM public.orders
     WHERE country IN ('IT', 'FR', 'DE')
       AND created_at > '2026-01-01') t
""")

Il database esegue la subquery; Spark vede il risultato. Rinunci a un po’ di leggibilità ma controlli esattamente cosa viene mandato. Combinala con partitionColumn per letture parallele sul risultato della subquery, ma nota che partitionColumn deve riferirsi a una colonna che è nell’output della subquery.

fetchsize e altre manopole

fetchsize controlla quante righe il driver JDBC pesca per round-trip di rete. Il default è specifico del driver e di solito piccolo (Postgres: 0, che significa “tutto in una volta”, disastroso; MySQL: arriva in batch ma dovresti comunque fare il tuning). Settalo esplicitamente:

.option("fetchsize", 10_000)

fetchsize più grande significa meno round-trip, il che è più veloce su connessioni ad alta latenza. Costo: ogni task bufferizza fetchsize righe in memoria del driver prima di passarle oltre. 1.000-50.000 è tipico. Alza se le letture sono lente e l’executor ha headroom di memoria; abbassa se vedi OOM nei task JDBC.

Per Postgres in particolare, setta anche option("autocommit", "false") se vuoi che fetchsize abbia davvero effetto: il driver JDBC lo richiede per il fetching basato su cursor. Altri database hanno le loro stranezze; consulta i doc del driver quando il tuning conta.

Un paio di altre opzioni che vale la pena conoscere:

  • isolationLevel: il livello di isolamento di transazione che Spark usa. Default READ_UNCOMMITTED per la maggior parte dei driver, va bene per analitica su una tabella stabile. Se stai leggendo una tabella sotto pesante scrittura, REPEATABLE_READ ti dà uno snapshot consistente al costo di più locking.
  • queryTimeout: per quanto tempo la query di un singolo task può girare prima di essere uccisa. Default illimitato. Settalo in produzione così una query impazzita non resta appesa per sempre.
  • sessionInitStatement: SQL eseguito su ogni connessione prima della query principale. Utile per SET search_path, statement timeout, o modalità read-only.

La lista del NO

Un paio di pattern che sembrano ragionevoli e non lo sono:

Non lanciare job analitici Spark contro il tuo DB transazionale di prod. Anche con letture parallele, stai competendo con il traffico applicativo a cui quel DB serve. L’architettura giusta è la replica: snapshot notturno, stream di change-data-capture, o un servizio gestito tipo Postgres logical replication verso un warehouse, export su S3, o Debezium verso Kafka verso Avro lake. Poi lanci Spark contro la replica o contro il lake.

Non leggere una tabella transazionale direttamente quando in realtà vuoi lo snapshot più recente per analitica. Se la tua fact table cambia ogni minuto, una lettura range-partizionata senza bucket impiega ore, e i dati sono cambiati nel frattempo. Snapshotta la tabella una volta in Parquet, poi itera contro la copia Parquet.

Non alzare numPartitions senza chiedere al proprietario del DB. Postgres di solito regge 32 letture concorrenti. SQL Server ne tollera di più. Oracle è più permaloso. Il DBA conosce il limite di connessioni, la dimensione del connection pool, e quali orari della giornata sono carichi. Chiedi.

Non usare dbtable con una subquery complessa e partitionColumn insieme senza testare. L’interazione è sottile: il filtro di partition avvolge la subquery, e alcuni database lo gestiscono peggio di altri. Lancia sempre prima una singola partition per controllare che lo SQL eseguito sia ragionevole, poi scala.

Provalo

Un esempio autocontenuto che usa un Postgres locale (Docker è la via più semplice):

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("JdbcDemo")
         .master("local[*]")
         .config("spark.jars.packages", "org.postgresql:postgresql:42.7.0")
         .getOrCreate())

url = "jdbc:postgresql://localhost:5432/demo"
props = {
    "user": "spark_reader",
    "password": "...",
    "driver": "org.postgresql.Driver",
    "fetchsize": "10000",
}

# Lettura sequenziale: un task, una query
slow = spark.read.jdbc(url=url, table="public.orders", properties=props)
print(slow.rdd.getNumPartitions())   # 1

# Lettura parallela: 16 task, 16 query
fast = spark.read.jdbc(
    url=url,
    table="public.orders",
    column="id",
    lowerBound=1,
    upperBound=2_000_000,
    numPartitions=16,
    properties=props,
)
print(fast.rdd.getNumPartitions())   # 16

# Pattern predicates: utile quando non c'è una colonna monotona
preds = [f"MOD(id, 8) = {i}" for i in range(8)]
hashed = spark.read.jdbc(
    url=url, table="public.orders",
    predicates=preds, properties=props,
)
print(hashed.rdd.getNumPartitions())  # 8

Confronta i tempi wall-clock delle tre letture. Quella parallela dovrebbe essere visibilmente più veloce su qualsiasi tabella non banale. Guarda pg_stat_activity sul lato Postgres mentre gira una lettura parallela: vedrai le 16 query concorrenti, ognuna che scansiona la sua fetta.

Prossima lezione, scrivere su JDBC: la storia analoga sul parallelismo per le insert, perché la dimensione dei batch conta ancora di più sul lato write, e la ginnastica di upsert/merge richiesta per aggiornare righe esistenti da un DataFrame di Spark.


Riferimenti: documentazione del data source JDBC di Apache Spark (https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) e guida del driver JDBC PostgreSQL (https://jdbc.postgresql.org/documentation/). Recuperato il 2026-05-01.

Cerca