PySpark, dalle fondamenta Lezione 9 / 60

Leggere dati: CSV, JSON, Parquet, e il tradeoff dello schema-on-read

Tre formati di file, tre comportamenti di default, e perché fare bene le letture all'inizio ti risparmia cento problemi dopo.

Un job Spark che non legge dati è un job Spark che non fa lavoro. Oggi mettiamo dati dentro Spark dai tre formati che incontrerai nel 95% dei job reali: CSV, JSON e Parquet. Ognuno ha default diversi, gotcha diverse, e un costo per riga molto diverso. Scegliere il formato giusto e le opzioni di lettura giuste è la differenza tra un job da 10 secondi e uno da 10 minuti.

Toccheremo anche il tradeoff dello schema-on-read: lasci che sia Spark a indovinare i tipi delle colonne, oppure glieli dici tu? La scelta sbagliata su un dataset da 200GB ti brucerà un’ora della tua mattinata prima che te ne accorga.

Setup

Stessa SparkSession dell’ultima lezione, assumiamo che spark sia già viva:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("ReadingData")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .getOrCreate())

spark.sparkContext.setLogLevel("WARN")

Ci servono dei dati di esempio. Salva questo snippet ed eseguilo una volta per generare tre piccoli file:

import json
from pathlib import Path

data_dir = Path("./data")
data_dir.mkdir(exist_ok=True)

# CSV
(data_dir / "orders.csv").write_text(
    "OrderId,CustomerId,Total,Country,OrderDate\n"
    "1001,1,59.00,NL,2026-03-05\n"
    "1002,1,29.00,NL,2026-03-18\n"
    "1003,2,149.00,IT,2026-02-15\n"
    "1004,2,89.50,IT,2026-03-22\n"
    "1005,3,199.00,DE,2026-03-10\n"
    "1006,4,42.42,RO,2026-03-28\n"
)

# JSON Lines (un record per riga, il formato che Spark preferisce)
(data_dir / "customers.jsonl").write_text(
    '{"CustomerId": 1, "Name": "Anna",  "Country": "NL"}\n'
    '{"CustomerId": 2, "Name": "Marco", "Country": "IT"}\n'
    '{"CustomerId": 3, "Name": "Dieter","Country": "DE"}\n'
    '{"CustomerId": 4, "Name": "Ioana", "Country": "RO"}\n'
)

# JSON multi-line (un grande array, span di più righe per "record")
(data_dir / "customers_pretty.json").write_text(json.dumps([
    {"CustomerId": 1, "Name": "Anna",   "Country": "NL"},
    {"CustomerId": 2, "Name": "Marco",  "Country": "IT"},
    {"CustomerId": 3, "Name": "Dieter", "Country": "DE"},
    {"CustomerId": 4, "Name": "Ioana",  "Country": "RO"},
], indent=2))

Adesso abbiamo qualcosa da leggere.

Due modi di scrivere la stessa lettura

PySpark ti dà due forme di builder. Fanno esattamente la stessa cosa.

# Forma corta
df = spark.read.csv("./data/orders.csv", header=True)

# Builder
df = (spark.read
      .format("csv")
      .option("header", "true")
      .load("./data/orders.csv"))

Usa la forma corta per letture una tantum. Usa il builder quando hai molte opzioni o quando il formato è parametrizzato (ad esempio leggi da una config che dice csv oggi e parquet domani).

CSV: il formato che mente sull’essere semplice

df = spark.read.csv("./data/orders.csv")
df.show()
df.printSchema()

Eseguilo e guarda:

+----+----------+------+-------+----------+
| _c0|       _c1|   _c2|    _c3|       _c4|
+----+----------+------+-------+----------+
|  Id|CustomerId| Total|Country| OrderDate|
|1001|         1| 59.00|     NL|2026-03-05|
|1002|         1| 29.00|     NL|2026-03-18|
...

Due problemi già. La riga di intestazione è stata trattata come dato, e ogni colonna è string. I default CSV di Spark sono pessimisti: senza che tu dica diversamente, assume:

  • Nessuna riga di intestazione.
  • Ogni colonna è una stringa.
  • La virgola è il delimitatore.
  • Il doppio apice è il carattere di quoting.

Quasi sempre devi fare override almeno dei primi due:

df = (spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("./data/orders.csv"))

df.show()
df.printSchema()
+-------+----------+-----+-------+----------+
|OrderId|CustomerId|Total|Country| OrderDate|
+-------+----------+-----+-------+----------+
|   1001|         1| 59.0|     NL|2026-03-05|
...

root
 |-- OrderId: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Total: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- OrderDate: date (nullable = true)

Meglio. Ma nota cosa fa davvero inferSchema=True: Spark legge l’intero file una volta per capire i tipi, poi lo legge di nuovo per caricare i dati. Su un CSV da 6 righe è gratis. Su un CSV da 200GB è una scansione da 200GB extra che non avevi bisogno di fare. Per qualcosa di più grande di “esplorazione ad-hoc”, è sbagliato.

La risposta giusta in produzione è dichiarare lo schema esplicitamente:

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType

orders_schema = StructType([
    StructField("OrderId",    IntegerType(), nullable=False),
    StructField("CustomerId", IntegerType(), nullable=False),
    StructField("Total",      DoubleType(),  nullable=False),
    StructField("Country",    StringType(),  nullable=False),
    StructField("OrderDate",  DateType(),    nullable=False),
])

df = (spark.read
      .option("header", "true")
      .schema(orders_schema)
      .csv("./data/orders.csv"))

df.show()
df.printSchema()

Un solo passaggio sul file. Tipi stretti. Gli errori di tipo falliscono in fretta (o vanno a null, a seconda del mode).

Opzioni CSV che userai davvero

Ci sono circa 30 opzioni CSV. La manciata che conta:

df = (spark.read
      .option("header", "true")            # La prima riga sono i nomi delle colonne
      .option("inferSchema", "false")      # Non indovinare, fornisci lo schema esplicitamente
      .option("delimiter", ",")            # O ";", "\t", "|"...
      .option("quote", '"')                # Carattere di quoting per i campi
      .option("escape", '"')               # Carattere di escape dentro campi quotati
      .option("nullValue", "")             # Quale stringa significa NULL
      .option("dateFormat", "yyyy-MM-dd")  # ISO-8601 di default
      .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
      .option("mode", "PERMISSIVE")        # PERMISSIVE | DROPMALFORMED | FAILFAST
      .schema(orders_schema)
      .csv("./data/orders.csv"))

Il mode è quello di cui nessuno parla e tutti hanno bisogno:

  • PERMISSIVE (default): le righe cattive vengono riempite di null. La riga malformata finisce in una colonna magica chiamata _corrupt_record se il tuo schema la include. Altrimenti silenziosamente persa.
  • DROPMALFORMED: le righe cattive vengono droppate silenziosamente. Non usarlo. Non saprai mai quante righe hai perso.
  • FAILFAST: le righe cattive sollevano un’eccezione immediatamente. Usa questo in produzione. I fallimenti rumorosi sono meglio della corruzione silenziosa.

JSON: due formati con la stessa estensione

Il reader JSON di Spark si aspetta di default JSON Lines: un oggetto JSON per riga, niente array che avvolge, niente virgole tra i record:

df = spark.read.json("./data/customers.jsonl")
df.show()
df.printSchema()
+-------+----------+------+
|Country|CustomerId|  Name|
+-------+----------+------+
|     NL|         1|  Anna|
|     IT|         2| Marco|
|     DE|         3|Dieter|
|     RO|         4| Ioana|
+-------+----------+------+

root
 |-- Country:    string (nullable = true)
 |-- CustomerId: long   (nullable = true)
 |-- Name:       string (nullable = true)

Nota che JSON ti dà i tipi gratis. Il formato codifica interi, float, booleani e stringhe in modo diverso, quindi Spark non deve indovinare. (Scansiona comunque il file una volta per scoprire lo schema, cioè quali campi esistono, ma i tipi non sono ambigui.)

L’altra forma di JSON che vedrai è un singolo grande array che si estende su più righe:

[
  {"CustomerId": 1, "Name": "Anna", ...},
  {"CustomerId": 2, "Name": "Marco", ...},
  ...
]

È quello che la maggior parte delle API restituisce. Il reader JSON di default di Spark lo manda in vacca: ogni riga viene parsata indipendentemente, e le righe con bracket / virgola falliscono il parsing. Per leggere questo formato, imposta multiLine=True:

df = (spark.read
      .option("multiLine", "true")
      .json("./data/customers_pretty.json"))

df.show()

Nota sulle performance: multiLine=True significa che Spark non può splittare il file tra executor: il parser JSON deve vedere l’intero file come uno stream. Su un JSON pretty-printed da 50GB, ottieni un task e un core CPU. Converti in JSON Lines (jq -c '.[]' input.json > output.jsonl) nel momento in cui il file diventa grande.

JSON gestisce anche i campi annidati naturalmente:

nested = spark.read.json(spark.sparkContext.parallelize([
    '{"order": 1001, "customer": {"id": 1, "name": "Anna"}}',
    '{"order": 1002, "customer": {"id": 2, "name": "Marco"}}',
]))

nested.printSchema()
nested.select("order", "customer.name").show()
root
 |-- customer: struct (nullable = true)
 |    |-- id:   long   (nullable = true)
 |    |-- name: string (nullable = true)
 |-- order:    long    (nullable = true)

+-----+-----+
|order| name|
+-----+-----+
| 1001| Anna|
| 1002|Marco|
+-----+-----+

La colonna customer arriva come struct. Affronteremo per bene la gestione di dati annidati nel Modulo 4.

Parquet: quello che a Spark piace davvero

Parquet è il formato per cui Spark è costruito. È un formato binario colonnare con schema integrato, compressione per colonna, e statistiche per row-group che permettono a Spark di saltare interi pezzi di un file durante il filtraggio.

Scriviamo il DataFrame degli ordini in Parquet, poi rileggiamolo:

df = (spark.read
      .option("header", "true")
      .schema(orders_schema)
      .csv("./data/orders.csv"))

df.write.mode("overwrite").parquet("./data/orders.parquet")

Questo scrive una cartella, non un singolo file. Dentro ./data/orders.parquet/ vedrai qualcosa tipo:

_SUCCESS
part-00000-abc123.snappy.parquet
part-00001-abc123.snappy.parquet
...

Un file per partizione, ognuno compresso individualmente (Snappy di default). Il marker _SUCCESS dice a Spark e ai tool Hadoop “questa scrittura è completata correttamente”. Se non vedi _SUCCESS, il writer è crashato a metà scrittura e dovresti trattare l’output come cattivo.

Rileggilo:

df2 = spark.read.parquet("./data/orders.parquet")
df2.show()
df2.printSchema()
+-------+----------+-----+-------+----------+
|OrderId|CustomerId|Total|Country| OrderDate|
+-------+----------+-----+-------+----------+
|   1001|         1| 59.0|     NL|2026-03-05|
...

Nessuna opzione necessaria. Nessuna inferenza dello schema. Stessi tipi del sorgente. Più veloce. Più piccolo. Più stretto. Per questo Parquet vince.

Perché Parquet è veloce: il column pruning

Un row-store come CSV memorizza 1001,1,59.00,NL,2026-03-05 come un singolo blocco. Per leggere solo la colonna Total, devi scansionare ogni byte di ogni riga.

Parquet memorizza tutti i valori OrderId insieme, poi tutti i CustomerId, poi tutti i Total. Per leggere solo Total, Spark legge solo quella colonna dal disco. Su una tabella larga (50+ colonne), è spesso 10-50 volte più veloce di CSV.

Esegui questo per sentire la differenza:

# Leggi Parquet, proietta una colonna
spark.read.parquet("./data/orders.parquet").select("Total").show()

# Leggi CSV, proietta una colonna: Spark deve comunque parsare ogni riga
(spark.read
 .option("header", "true")
 .schema(orders_schema)
 .csv("./data/orders.csv")
 .select("Total")
 .show())

Su 6 righe non vedi la differenza. Su 6 milioni di righe, la lettura Parquet finisce prima che la lettura CSV si scaldi.

Modalità di scrittura Parquet

df.write.mode("overwrite").parquet(path)   # Cancella e riscrive
df.write.mode("append").parquet(path)      # Aggiungi nuovi file accanto a quelli vecchi
df.write.mode("ignore").parquet(path)      # Se il path esiste, non fare nulla
df.write.mode("error").parquet(path)       # Default: solleva eccezione se il path esiste

error è il default ed è il default giusto: ti impedisce di sovrascrivere accidentalmente i dati. Negli script di produzione di solito uso overwrite solo con path partitioning per data, in modo che ogni giorno scriva nella propria cartella e “overwrite” tocchi solo lo slot di oggi.

Partitioning in scrittura

df.write.mode("overwrite").partitionBy("Country").parquet("./data/orders_part.parquet")

Questo scrive una sottodirectory per ogni valore di Country:

orders_part.parquet/
  Country=NL/
    part-00000-...snappy.parquet
  Country=IT/
    part-00000-...snappy.parquet
  Country=DE/
    part-00000-...snappy.parquet
  Country=RO/
    part-00000-...snappy.parquet

Adesso spark.read.parquet(...).filter("Country = 'IT'") legge solo la cartella Country=IT. Saltare i dati è la vincita di performance più economica che esista.

Ma, e questo conta, partiziona solo su colonne a bassa cardinalità. Partizionare per OrderId creerebbe un file per ordine. Partizionare per Country (4-50 valori) è ottimo. Partizionare per OrderDate (una cartella per giorno) è il pattern standard per dati time-series. Torneremo su questo nella lezione 35.

Il tradeoff dello schema-on-read

Tre posizioni che puoi prendere quando leggi dati:

1. Schema inferito (inferSchema=True per CSV; default per JSON).

  • Pro: zero cerimonia, Spark lo capisce.
  • Contro: passaggio extra sul file completo per CSV. Fragile: se il file di venerdì ha una stringa in una colonna numerica, il job di lunedì si rompe. Nessuna garanzia di tipo nel codice.
  • Quando usarlo: esplorazione ad-hoc. Notebook a singolo sviluppatore. File abbastanza piccoli che il passaggio extra è gratis.

2. Schema esplicito (.schema(StructType(...))).

  • Pro: un solo passaggio. Tipi stretti. Errori di tipo presi presto. Auto-documentante.
  • Contro: più codice. Va tenuto in sincrono con la sorgente se si aggiungono colonne.
  • Quando usarlo: produzione. Qualunque cosa giri su schedule. Qualunque cosa dove tipi sbagliati causerebbero problemi di data quality a valle.

3. Schema nel file (Parquet, ORC, Avro, Delta).

  • Pro: nessuno schema da mantenere. Tipi stretti. Lo schema evolve con i dati.
  • Contro: richiede di controllare anche il lato della scrittura: non puoi fare questo con CSV che arrivano da un partner.
  • Quando usarlo: qualunque dato che la tua pipeline produce. Passa da CSV a Parquet al primo hop interno, poi non toccare mai più CSV.

In un tipico data lake il pattern è: CSV (con schema esplicito) all’edge di ingestion, Parquet (o Delta) per tutto ciò che è interno. Spenderemo la lezione 13 sulla definizione dello schema in sé: ogni tipo PySpark, quando usare cosa, come gestire la nullability, lo schema merging in append.

Uno script completo di lettura e visualizzazione

Mettiamo tutto insieme:

from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField,
                               IntegerType, DoubleType, StringType, DateType)

spark = (SparkSession.builder
         .appName("ReadingData")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

orders_schema = StructType([
    StructField("OrderId",    IntegerType(), False),
    StructField("CustomerId", IntegerType(), False),
    StructField("Total",      DoubleType(),  False),
    StructField("Country",    StringType(),  False),
    StructField("OrderDate",  DateType(),    False),
])

# CSV con schema esplicito, pattern di produzione
orders = (spark.read
          .option("header", "true")
          .option("mode", "FAILFAST")
          .schema(orders_schema)
          .csv("./data/orders.csv"))

# JSON Lines, i tipi arrivano gratis
customers = spark.read.json("./data/customers.jsonl")

# Scrivi in Parquet per i job a valle
orders.write.mode("overwrite").partitionBy("Country").parquet("./data/orders.parquet")

# Rileggi: veloce, schema integrato
parq = spark.read.parquet("./data/orders.parquet")

# Join veloce
joined = parq.join(customers, on="CustomerId", how="inner") \
             .select("OrderId", "Name", "Country", "Total", "OrderDate")
joined.show()

spark.stop()

Se questo gira da capo a fondo e stampa una tabella unita di 6 righe con clienti italiani, olandesi, tedeschi e rumeni, hai la pipeline di lettura funzionante. Tutto il resto in PySpark si costruisce su questo.

Prossimo modulo: lavorare davvero con i DataFrame. Selezionare colonne, filtrare righe, la differenza tra select e selectExpr, e perché .show() ti mente su cosa sta facendo davvero il tuo job.

Cerca