Siamo ora nel Modulo 8, la parte del corso in cui smettiamo di parlare di come Spark pensa e iniziamo a parlare di come Spark mangia. Ogni job inizia leggendo qualcosa e finisce scrivendo qualcosa. Il formato che scegli per quel qualcosa ha più impatto sulle performance di quasi ogni manopola di tuning del motore.
Questo modulo copre i formati che incontrerai davvero nel mondo reale: Parquet, ORC, Avro, Delta, JSON, CSV, sorgenti JDBC, object store cloud. Inizieremo con quello che userai l’80% delle volte e quello su cui Spark stesso fa default: Parquet.
Se di questo modulo ricordi una cosa sola, che sia questa: per i carichi di lavoro analitici, Parquet è quasi sempre la risposta giusta. Il resto della lezione spiega perché, e cosa fa il file sotto il cofano quando chiami spark.read.parquet(...).
Cosa significa columnar, in concreto
Immagina una tabella con cinque colonne e un milione di righe. Ci sono due modi di salvarla su disco.
Row-oriented salva tutti i valori della riga 1, poi tutti i valori della riga 2, poi della riga 3: il modo in cui lo fa il CSV, il modo in cui la maggior parte dei database transazionali lo fa su disco. Se vuoi la terza colonna della riga 783, lo strato di storage deve scansionare le prime due colonne di ogni riga precedente, oppure usare un indice per saltare lì.
Columnar salva tutti i valori della colonna 1 in modo contiguo, poi tutti i valori della colonna 2 in modo contiguo, e così via. Se vuoi la colonna 3, leggi un blocco contiguo e ignori interamente le altre quattro.
Ora pensa a una query analitica: SELECT AVG(amount) FROM orders WHERE country = 'IT'. Tocchi due colonne su forse quaranta. Con un formato row-oriented, leggi l’intero file per estrarre il dato di due colonne: la maggior parte dell’I/O è sprecata. Con un formato columnar, leggi quelle due colonne e ignori le altre trentotto.
Questa è l’intera ragione per cui esistono i formati columnar, e la ragione per cui Parquet ha mangiato il mondo analitico. Le letture selettive prendono solo le colonne che chiedi. Un ingenuo SELECT col1, col2 FROM t su una tabella Parquet a 50 colonne è grossomodo 25 volte più economico della stessa query sugli stessi dati in CSV.
C’è un secondo beneficio che si compone col primo: i valori dentro una colonna tendono a somigliarsi tra loro. Una colonna country è per lo più stringhe di due lettere. Una colonna timestamp è una sequenza monotonicamente crescente di interi a 8 byte. Una colonna status ha forse quattro valori distinti su milioni di righe. Quando valori simili stanno vicini, gli algoritmi di compressione li divorano. Rapporti di compressione di 5-10x su dati reali sono normali.
La struttura su disco
Un file Parquet non è solo “tutta la colonna 1, poi tutta la colonna 2”. È organizzato in modo gerarchico:
File
├── Row Group 0 (~128 MB di dati sorgente come target)
│ ├── Column Chunk: id (tutti i valori id per le righe in questo gruppo)
│ │ ├── Page 0 (~1 MB compressa)
│ │ ├── Page 1
│ │ └── ...
│ ├── Column Chunk: country
│ ├── Column Chunk: amount
│ └── ...
├── Row Group 1
├── Row Group 2
└── Footer
├── Schema
├── Metadati dei row group (min, max, conteggio null per column chunk)
└── Metadati key/value
Tre cose meritano attenzione:
- Row group. Un file Parquet è diviso in row group, ciascuno contenente un range contiguo di righe. La dimensione di default sta tra 128 MB e 1 GB a seconda del writer, e il punto dolce pratico è 100-500 MB per row group. I row group sono l’unità di parallelismo: un task Spark tipicamente legge un row group.
- Column chunk. Dentro un row group, ogni colonna ha il suo chunk. Qui avviene la magia columnar a livello di file: puoi posizionarti direttamente sul column chunk che vuoi e leggere solo i suoi byte.
- Il footer. Alla fine del file, Parquet salva un blocco di metadati: schema, offset dei row group e, fondamentale, statistiche per ogni column chunk: valore minimo, valore massimo, conteggio dei null. Spark legge prima il footer, guarda quelle statistiche e le usa per decidere quali row group può saltare senza nemmeno leggerli.
Quest’ultimo punto è la base del predicate pushdown.
Predicate pushdown
Quando scrivi df.filter(F.col("amount") > 1000) contro una sorgente Parquet, Spark non deve leggere il file e poi filtrare. Spinge il predicato giù fino allo strato di scan. In fase di lettura, per ogni row group, controlla le statistiche del footer: se max(amount) <= 1000, l’intero row group può essere saltato. Niente I/O per quel range, niente decompressione, nessuna riga materializzata.
Tutto ciò è invisibile dal tuo codice. Scrivi un normale .filter(). Spark e Parquet collaborano per saltare dati che non ti servivano. Puoi confermare che stia accadendo leggendo .explain():
df = spark.read.parquet("s3://lake/orders/")
df.filter(F.col("amount") > 1000).select("order_id", "amount").explain()
# == Physical Plan ==
# *(1) Project [order_id#3, amount#5]
# +- *(1) Filter (isnotnull(amount#5) AND (amount#5 > 1000))
# +- *(1) ColumnarToRow
# +- FileScan parquet [order_id#3,amount#5]
# PushedFilters: [IsNotNull(amount), GreaterThan(amount,1000)],
# ReadSchema: struct<order_id:bigint,amount:double>
La lista PushedFilters è la prova. I filtri che finiscono in quella lista sono valutati a livello di file/row-group. I filtri che non compaiono lì sono valutati in Spark dopo lo scan: ancora corretti, solo meno efficienti.
Cosa scende bene: uguaglianza, comparazione, IsNull/IsNotNull, In con un set piccolo. Cosa no: pattern LIKE con wildcard iniziali, chiamate di funzione come upper(country) = 'IT' (riscrivi come country = 'IT' se i dati sono già in maiuscolo), aritmetica sulla colonna filtrata (amount + tax > 1000 non scende; amount > 1000 - tax potrebbe). Guarda il piano: se il tuo filtro non è in PushedFilters, riformulalo finché non lo è.
ReadSchema è il fratello della column projection. Spark legge solo order_id e amount perché sono le uniche colonne che la query referenzia, anche se la tabella ne ha altre venti.
Codec di compressione
Parquet comprime ogni column chunk in modo indipendente. Il codec è configurabile. I quattro che incontrerai:
- Snappy — il default di Spark. Compressione veloce, decompressione veloce, rapporto decente (~2-3x). La scelta giusta per pipeline calde dove la CPU in lettura conta più dello spazio su disco.
- Gzip — più lento ma più piccolo (~3-4x). Vecchio, supportato ovunque. Buono per archivi freddi quando la frequenza di lettura è bassa.
- Zstd (zstandard) — il vincitore moderno. Più veloce di gzip, più piccolo di gzip, spesso comparabile a snappy in velocità di lettura. Se scegli oggi, è probabilmente questa la risposta. Spark lo supporta nativamente dalla 3.2.
- Lz4 — molto veloce, rapporto modesto. Di nicchia; raramente la scelta migliore sia per percorsi caldi sia freddi.
Imposti il codec per scrittura singola o globalmente:
# Per scrittura
df.write.option("compression", "zstd").parquet("s3://lake/orders/")
# Oppure globalmente per la session
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
df.write.parquet("s3://lake/orders/")
Una regola pratica: se i tuoi dati sono letti spesso (dashboard, job ricorrenti), usa snappy o zstd: la CPU in lettura domina. Se i dati sono letti una volta a trimestre per compliance, usa gzip o zstd a un livello più alto: il costo del disco domina. Non scegliere gzip per percorsi caldi solo perché è più piccolo: lo sentirai a ogni query.
Il vectorized reader
Abbiamo visto Tungsten nella lezione 42. Parquet è uno dei posti in cui Tungsten paga visibilmente. Il reader Parquet di Spark è vectorized: invece di materializzare un oggetto Row alla volta, decodifica i column chunk in batch di valori salvati in array off-heap piatti, e gli operatori a valle (filtri, proiezioni, aggregazioni) lavorano direttamente sui batch. Il nodo ColumnarToRow nel piano qui sopra è il confine in cui Spark riconverte finalmente i batch in righe per gli operatori non vettorializzati.
Non lo configuri tu; è attivo di default per i tipi primitivi. L’impostazione rilevante se ti serve disabilitarlo per debugging è spark.sql.parquet.enableVectorizedReader, di default true. Lascialo stare in produzione.
Lettura e scrittura in pratica
La lettura è il caso semplice:
# Tabella intera
df = spark.read.parquet("s3://lake/orders/")
# Column projection: legge solo le due colonne da ogni row group
df = spark.read.parquet("s3://lake/orders/").select("order_id", "amount")
# Con filtro: predicate pushdown più skip dei row group
df = (spark.read.parquet("s3://lake/orders/")
.filter(F.col("country") == "IT")
.filter(F.col("amount") > 1000))
La scrittura ha più manopole:
(df.write
.mode("overwrite")
.option("compression", "zstd")
.parquet("s3://lake/orders/"))
Best practice per il lato scrittura:
- Partiziona su una colonna a bassa cardinalità che corrisponda ai tuoi pattern di query (lezione 34).
partitionBy("year", "month")è l’esempio canonico. Il partitioning non è la stessa cosa dei row group: è la struttura a directory sopra il file. Una tabella partizionata può comunque avere predicate pushdown dentro un file tramite le statistiche dei row group. - Punta a 100-500 MB per file. Più piccolo e paghi overhead di metadati e costo di listing. Più grande e perdi parallelismo in lettura perché un task gestisce un (grande) row group. Il passo
coalesce/repartitionprima della scrittura controlla questo. - Ordina dentro le partition se hai una colonna di filtro prevedibile. Dati ordinati significano range min/max più stretti per row group, il che significa skip più aggressivi.
df.sortWithinPartitions("event_time").write...è un’assicurazione a basso costo. - Non scrivere file minuscoli. Un antipattern comune è scrivere 1.000 directory di partition con 200 file ognuna, tutti da 2 MB. Spark passa più tempo a listare i file che a leggerli. Coalesce o compatta.
Schema-on-read, ma con i tipi
Il CSV è schema-on-read in senso pigro: il file non ha idea di che tipi siano le sue colonne, e tu (o l’inferenza di Spark) tiri a indovinare in lettura. Parquet è schema-on-read in senso migliore: il file dichiara il proprio schema nel footer, i tipi sono enforced in scrittura, e le letture producono sempre gli stessi tipi a prescindere da chi legge. Hai la flessibilità di non dover definire una tabella in anticipo e la sicurezza di colonne tipizzate.
Anche il sistema di tipi è genuinamente ricco. Parquet ha struct annidati, campi ripetuti (array), map, decimali con precisione/scala, timestamp con metadati logici che distinguono istante-nel-tempo da datetime locale. Puoi salvare uno struct di array di struct e Spark lo riproduce pulito. Questo conta quando i tuoi dati hanno forma a evento: oggetti annidati simili a JSON si comprimono e si proiettano splendidamente in Parquet.
Quando Parquet non è la risposta giusta
Parquet è il default, ma non è universale. Saltalo quando:
- Stai facendo streaming di eventi uno alla volta. Il layout columnar di Parquet implica che le scritture devono bufferizzare un row group prima di flushare. Per ingest append-only a bassa latenza (Kafka verso storage durabile), Avro row-oriented è una scelta migliore. Lo incontreremo nella prossima lezione.
- Ti servono update e delete atomici. I file Parquet sono immutabili. Un
UPDATEsu una tabella Parquet significa riscrivere file. Delta Lake (anch’esso prossima lezione) sta sopra Parquet per aggiungere semantica transazionale. - I dati sono già piccoli e strutturati per la lettura umana. Un file di config da 50 righe non ha bisogno di storage columnar. Usa JSON o YAML.
Fuori da questi casi, default a Parquet. Quasi ogni team con cui ho lavorato che ha provato a essere furbo (“usiamo CSV per ora, è più semplice”) ha sprecato settimane di compute sei mesi dopo, quando hanno migrato.
Prova questo
Scrivi lo stesso DataFrame in tre formati e confronta:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
spark = (SparkSession.builder
.appName("ParquetDemo")
.master("local[*]")
.getOrCreate())
df = spark.range(0, 1_000_000).select(
F.col("id").alias("order_id"),
(F.col("id") % 100).alias("country_id"),
(F.rand() * 1000).alias("amount"),
F.current_timestamp().alias("created_at"),
)
df.write.mode("overwrite").csv("/tmp/demo/orders_csv")
df.write.mode("overwrite").parquet("/tmp/demo/orders_parquet")
df.write.mode("overwrite").option("compression", "zstd").parquet("/tmp/demo/orders_zstd")
def size_mb(path):
total = 0
for root, _, files in os.walk(path):
for f in files:
total += os.path.getsize(os.path.join(root, f))
return total / (1024 * 1024)
print(f"CSV: {size_mb('/tmp/demo/orders_csv'):.1f} MB")
print(f"Parquet snappy: {size_mb('/tmp/demo/orders_parquet'):.1f} MB")
print(f"Parquet zstd: {size_mb('/tmp/demo/orders_zstd'):.1f} MB")
# Predicate pushdown: controlla il piano
(spark.read.parquet("/tmp/demo/orders_parquet")
.filter(F.col("amount") > 500)
.select("order_id", "amount")
.explain())
Vedrai il CSV intorno ai 30-40 MB, Parquet snappy intorno ai 6-8 MB, Parquet zstd un filo più piccolo. Il piano per la lettura filtrata mostrerà PushedFilters: [..., GreaterThan(amount, 500)] e un ReadSchema con solo le due colonne proiettate. È il file format che si guadagna lo stipendio.
Nella prossima lezione guardiamo le alternative: ORC, Avro e la famiglia Delta/Iceberg/Hudi che sta sopra Parquet per renderlo transazionale.
Riferimenti: documentazione di Apache Parquet (https://parquet.apache.org/docs/) e guida alle data source di Apache Spark SQL (https://spark.apache.org/docs/latest/sql-data-sources-parquet.html). Consultati il 2026-05-01.