Un job Spark che non scrive niente è un job Spark che ha girato in un notebook e non ha aiutato nessuno. Oggi rendiamo durevole il nostro lavoro. Scrivere sembra lo specchio del leggere, df.write invece di spark.read, ma ha la sua propria pila di trappole. Save mode che ti dimenticherai esistano finché non cancellano la produzione. Layout di partizioni che trasformano un job da cinque minuti in uno da cinque ore. Il famigerato problema del “ho diecimila file da 4KB su S3 e ora la mia read è più lenta della write”.
Se hai letto la lezione 9 con attenzione hai già visto le ossa di df.write.parquet(...). Ora togliamo il coperchio per bene.
La forma di una write
Stesso doppio builder della read:
# Metodi di comodità
df.write.csv("./out/orders.csv")
df.write.json("./out/orders.json")
df.write.parquet("./out/orders.parquet")
# Forma builder
(df.write
.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.save("./out/orders.parquet"))
Le due forme sono equivalenti. Vado di default sul builder appena una write ha più di due opzioni: si legge dall’alto in basso ed è più facile da diff-are in code review.
Setup, nel caso parti da zero:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("WritingData")
.master("local[*]")
.getOrCreate())
spark.sparkContext.setLogLevel("WARN")
orders = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("./data/orders.csv"))
Le quattro save mode
.mode(...) decide cosa succede quando il path di destinazione esiste già. Ci sono esattamente quattro valori, e dovresti memorizzarli:
df.write.mode("error").parquet(path) # default
df.write.mode("errorifexists").parquet(path) # alias di "error"
df.write.mode("append").parquet(path)
df.write.mode("overwrite").parquet(path)
df.write.mode("ignore").parquet(path)
error (e il suo sinonimo errorifexists) è il default. Se il path esiste già, Spark si rifiuta di scrivere e solleva un’eccezione. È il default giusto: ti impedisce di sovrascrivere accidentalmente i dati di ieri. Tanti script una tantum usano error semplicemente non specificando una mode.
append scrive nuovi file accanto a qualunque cosa ci sia già. I file esistenti restano intatti; nuovi file part-*.parquet compaiono nella stessa cartella. È così che fai write incrementali giornaliere:
todays_orders.write.mode("append").parquet("./data/orders_lake.parquet")
Pericolo sottile: la modalità append non fa dedup. Se appendi gli stessi dati due volte, li hai due volte. Non c’è INSERT IGNORE, niente upsert. Append è un append letterale. (Delta Lake e Apache Iceberg risolvono con MERGE INTO. Parquet semplice no.)
overwrite cancella il path esistente e scrive da capo. È distruttiva e dovresti trattarla così:
final.write.mode("overwrite").parquet("./data/orders_clean.parquet")
Overwrite è la mode giusta per pipeline del tipo “ricostruisci questo dataset da zero ogni volta”: tipiche di dimension table e mart aggregati. È la mode sbagliata per pipeline del tipo “aggiungi il batch di oggi al lake”. Confondile e accumulerai duplicati oppure cancellerai un anno di storia.
C’è una variante a grana più fine chiamata dynamic partition overwrite, che sovrascrive solo le partizioni presenti nella tua write (non l’intero path). È un flag di config, non una mode:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
todays_orders.write \
.mode("overwrite") \
.partitionBy("OrderDate") \
.parquet("./data/orders_lake.parquet")
Con dynamic, viene rimpiazzata solo la cartella OrderDate=2026-03-28/; le altre date restano. Senza dynamic (la mode static di default), l’intera cartella orders_lake.parquet/ viene spazzata via. Il numero di team che hanno imparato questa differenza nel modo doloroso è grande. Imposta dynamic globalmente in qualunque pipeline che fa overwrite per partizione.
ignore è quella strana. Se il path esiste, non fa niente, in silenzio. Nessun errore, nessun overwrite, nessun append. Utile in script di setup idempotenti (“crea questa lookup table se non c’è ancora”) e quasi nient’altro. Avrò spedito forse due chiamate mode("ignore") in cinque anni.
Write partizionate: il layout di directory che rende veloci le read
partitionBy(col1, col2, ...) scrive una struttura di directory in stile Hive dove ogni valore di partizione diventa un nome di cartella:
orders.write \
.mode("overwrite") \
.partitionBy("Country") \
.parquet("./data/orders_by_country.parquet")
Su disco:
orders_by_country.parquet/
_SUCCESS
Country=NL/
part-00000-...snappy.parquet
Country=IT/
part-00000-...snappy.parquet
Country=DE/
part-00000-...snappy.parquet
Country=RO/
part-00000-...snappy.parquet
Il segmento Country= non è decorazione. È una cosa parsabile, interrogabile. Quando rileggi questo layout con un filtro su Country, Spark apre solo le cartelle che corrispondono:
italian_only = (spark.read
.parquet("./data/orders_by_country.parquet")
.filter("Country = 'IT'"))
italian_only.explain(True)
Guarda il piano fisico e vedrai PartitionFilters: [isnotnull(Country#X), (Country#X = IT)]: Spark ha spinto il filtro dentro il listing dei file stesso. Le cartelle DE, NL e RO non vengono mai aperte. Questo si chiama partition pruning, e su un dataset da diversi TB è la differenza tra scansionare 4TB e scansionare 4GB.
Puoi partizionare su più colonne:
orders.write \
.mode("overwrite") \
.partitionBy("Country", "OrderDate") \
.parquet("./data/orders_by_c_d.parquet")
orders_by_c_d.parquet/
Country=IT/
OrderDate=2026-02-15/
part-00000-...snappy.parquet
OrderDate=2026-03-22/
part-00000-...snappy.parquet
Country=NL/
...
Spark fa pruning da sinistra a destra: un filtro su Country salta gran parte dell’albero; un filtro su Country AND OrderDate ne salta ancora di più.
La colonna di partizione non viene memorizzata dentro i file Parquet: è codificata nel path. Quando Spark rilegge il dataset, ricostruisce la colonna dai nomi delle cartelle. Effetto collaterale: non puoi avere una riga in cui Country manca, perché non c’è una cartella per quello. I valori di partizione null prendono un nome di cartella speciale tipo Country=__HIVE_DEFAULT_PARTITION__. Per lo più va bene, occasionalmente sorprende.
La trappola della cardinalità
Il partitioning è un coltello, e la domanda è in che direzione taglia. La regola è:
Partiziona per colonne con cardinalità bassa o media. Punta a partizioni di almeno ~100MB ciascuna, idealmente da 256MB a 1GB.
Country ha, quanto, 200 valori distinti nel mondo. Forse 4 nel tuo dataset. È un’ottima colonna di partizione.
OrderDate è di cardinalità media: una cartella al giorno, quindi 365 cartelle all’anno. Anche buona per dati time-series, ed è il pattern de-facto standard per i data lake.
OrderId sarebbe catastrofico. Una cartella per ordine. Dieci milioni di ordini significano dieci milioni di cartelle, ognuna con un file Parquet contenente una riga, ogni file con un header Parquet più grande dei suoi dati. Rileggere il dataset è un’operazione di metadata contro dieci milioni di file, e la tua bolletta S3 piange. Questo è il problema dei file piccoli nella sua forma più pura.
Un esempio reale: un analista una volta ha partizionato un dataset da 50GB per (Country, OrderDate, CustomerId). Il risultato erano 1,4 milioni di file Parquet minuscoli. Il dataset originale si leggeva in 30 secondi. La versione partizionata ci metteva 40 minuti. L’abbiamo riscritta partizionata solo per (Country, OrderDate), ed è tornata a 35 secondi. CustomerId apparteneva ai dati, non al path.
Euristica per la scelta della colonna di partizione:
- Cardinalità: sotto qualche migliaio di valori distinti, in totale. Decine di migliaia al massimo.
- Selettività del filtro: la gente fa query su questa colonna spesso. Se nessuno filtra per essa, partizionarla è puro costo.
- Distribuzione uniforme: dati più o meno equivalenti per partizione. Una colonna in cui il 90% delle righe ha lo stesso valore ti dà una partizione gigante e mille minuscole: peggio di entrambi i mondi.
Per la maggior parte dei dataset analitici, la risposta giusta è partitionBy("date_column") e basta. Qualunque cosa più fine ha bisogno di una motivazione forte.
Numero file, dimensione file e .coalesce(1)
Anche dentro una singola partizione, Spark scrive un file per task di executor che era attivo quando la write è partita. Con 8 partizioni nel tuo DataFrame, otterrai 8 part-file in output:
orders.write.mode("overwrite").parquet("./data/multi_file.parquet")
import os
print(sorted(os.listdir("./data/multi_file.parquet")))
# ['_SUCCESS', 'part-00000-...snappy.parquet', 'part-00001-...snappy.parquet', ...]
Se ti serve un singolo file di output, mettiamo che stai spedendo un CSV a un partner e si rifiuta di accettare una cartella, la tentazione è coalesce(1):
orders.coalesce(1).write.mode("overwrite").csv("./data/single.csv")
coalesce(1) collassa l’intero DataFrame in una partizione prima di scrivere. Il risultato è un file. Semplice, intuitivo, e una trappola.
Cosa succede davvero: Spark forza tutti i dati attraverso un singolo task di executor. Tutti. Una pipeline che girava su 100 core in parallelo all’improvviso gira su 1 core, e quel core deve tenere ogni riga in memoria mentre scrive. Su 100MB va bene. Su 100GB va in OOM. Su 1TB è noto aver tirato giù cluster.
I due usi sicuri di coalesce(1):
- L’output è davvero piccolo (sotto 1GB, comodamente).
- Hai davvero bisogno di un file (un CSV per un consumer non-Spark, un piccolo report giornaliero).
Per tutto il resto, accetta l’output multi-file. La maggior parte dei tool, Spark, Hive, Presto, DuckDB, Pandas via glob, gestiscono nativamente cartelle Parquet multi-file.
Se hai il problema opposto, troppi file piccoli, la soluzione giusta è repartition(N) prima della write, dove N è “all’incirca, quanti file voglio?”:
# Punta a file di ~256MB. Se il tuo DataFrame è grosso ~25GB, sono ~100 file.
big_df.repartition(100).write.mode("overwrite").parquet("./out/big.parquet")
repartition fa uno shuffle completo e ti dà esattamente N partizioni. coalesce riduce solo il conteggio delle partizioni senza shuffle ma non può aumentare. Regola del pollice: repartition(N) per crescere, coalesce(N) per ridurre, entrambi con N scelto per far atterrare ogni file nel range 100MB-1GB.
Uno script completo di write
Mettendo insieme i pattern, il tipo di script che atterra alla fine di un vero job ETL:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month
spark = (SparkSession.builder
.appName("WriteShowcase")
.master("local[*]")
.getOrCreate())
spark.sparkContext.setLogLevel("WARN")
# Rendi dynamic partition overwrite il default: una gentilezza al te futuro.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
orders = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("./data/orders.csv"))
# Aggiungi anno/mese come colonne di partizione. Non partizionare per giorno su un dataset minuscolo,
# ma su un lake reale, anno/mese/giorno è il layout standard.
enriched = (orders
.withColumn("Year", year(col("OrderDate")))
.withColumn("Month", month(col("OrderDate"))))
# 1. Interno: Parquet, partizionato, overwrite (dinamico).
(enriched.write
.mode("overwrite")
.partitionBy("Year", "Month")
.option("compression", "snappy")
.parquet("./data/orders_lake.parquet"))
# 2. Append del batch di oggi: stesso path, file nuovi.
todays = enriched.filter("OrderDate = '2026-03-28'")
(todays.write
.mode("append")
.partitionBy("Year", "Month")
.parquet("./data/orders_lake.parquet"))
# 3. Report giornaliero: piccolo CSV, file singolo, per un consumer non-Spark.
report = (orders.groupBy("Country").sum("Total")
.withColumnRenamed("sum(Total)", "total_revenue"))
(report.coalesce(1)
.write
.mode("overwrite")
.option("header", "true")
.csv("./data/daily_report.csv"))
# 4. Rileggilo per dimostrare che funziona.
spark.read.parquet("./data/orders_lake.parquet").show()
spark.stop()
Tre write, tre forme diverse: Parquet per lo storage interno, CSV per il consumer, append per la slice giornaliera. Ogni pipeline reale finisce con qualche combinazione di queste.
Cloud storage, in breve
Tutto qui funziona allo stesso modo su S3, ADLS o GCS: cambia ./data/... in s3a://bucket/path/... (o abfss://, o gs://) e il codice è identico. Le uniche differenze pratiche sono:
- Le write su cloud sono più lente di quelle locali, specialmente per molti file piccoli (ogni file è un round-trip di rete).
- S3 in particolare ha semantica di rename debole, quindi i marker
_SUCCESSe le write atomiche richiedono cura extra. Spark lo gestisce internamente; i client S3 moderni sono ora strongly consistent quindi è meno un incendio di quanto fosse. - Lifecycle policy, versioning di bucket e IAM sono completamente fuori scope per il job Spark: vivono uno strato sotto.
Se stai facendo lavoro serio nel cloud, l’upgrade tipico è a un formato di tabella transazionale (Delta, Iceberg, Hudi) sopra Parquet su cloud. Stesse write che hai appena imparato, più ACID, più MERGE INTO, più time travel. Il modulo 9 ha la sua lezione su Delta Lake.
Torneremo sulla profondità del partitioning nel modulo 6, dove copriamo bucketing, Z-ordering e come progettare un layout di partizione per un workload di query che davvero capisci. Per ora, il titolo è: scegli una colonna data o categoria a bassa cardinalità, punta a partizioni nel range 100MB-1GB, e mai coalesce(1) qualcosa che non sta sul tuo portatile.
La prossima lezione è l’ultima del modulo: local mode contro un cluster vero. Cosa cambia, cosa no, e i bug che si presentano solo quando ci sono executor veri in scena.