PySpark, dalle fondamenta Lezione 47 / 60

Cloud storage: S3, GCS, Azure Blob, cosa cambia

Le note in piccolo sulla consistency, il problema del rename, e perche' esistono i committer direct-write.

Se hai usato Spark negli ultimi anni, le probabilità che i tuoi dati vivano davvero su HDFS sono molto basse. La maggior parte di noi legge e scrive path s3://..., gs://..., o abfs://.... Il cluster si alza, il cluster sparisce, i dati persistono nell’object storage. Questo è il modello.

È un gran modello. È anche un modello in cui Spark, originariamente progettato contro HDFS, fa leak di astrazione in modi interessanti. Gli object store non sono filesystem, anche quando le URL li fanno sembrare tali. Questa lezione parla di cosa cambia quando il tuo “filesystem” è in realtà un key-value store, e di cosa configurare perché Spark si comporti bene.

La differenza concettuale

Un filesystem come HDFS ha directory. Una directory è un’entità reale con metadati, operazioni atomiche e un albero parent-child. mv /old/path /new/path è un cambio di puntatore di metadati: istantaneo, atomico.

S3 non ha niente di tutto questo. S3 è un key-value store. Le chiavi sono stringhe piatte. Non c’è una directory /year=2024/month=03/; ci sono solo oggetti le cui chiavi capita inizino con year=2024/month=03/. La “directory” che vedi nella console AWS è un’affettazione UI costruita listando le chiavi con un prefisso comune.

Le conseguenze:

  • Niente rename atomico. aws s3 mv è copy più delete. Per un singolo piccolo oggetto è veloce. Per un oggetto da 50 GB è lento. Per una “directory” da 10000 oggetti sono 10000 coppie copy-delete separate.
  • Niente directory vuote. Una “directory” esiste se e solo se almeno una chiave la usa come prefisso. Spark a volte aggira la cosa scrivendo oggetti placeholder a zero byte.
  • Le operazioni di list sono paginate ed eventually-consistent in alcune operazioni. Listare un prefisso a cui si sono appena aggiunti 10000 oggetti restituisce i nuovi oggetti su S3 oggi (dal 2020), ma l’operazione è lenta e rate-limitata.

GCS e Azure Blob hanno modelli simili. API diverse, trade-off simili.

Il problema del rename

Ecco perché conta per Spark.

Quando Spark scrive un DataFrame su un path di partizione, ecco cosa fa il FileOutputCommitter standard (algoritmo v1):

  1. Ogni task scrive il suo output su un path temporaneo: s3://bucket/output/_temporary/0/_temporary/attempt_xxx/part-00000-...parquet.
  2. Quando il task si completa, il driver rinomina il path task-temp in path job-temp.
  3. Quando l’intero job si completa, il driver rinomina tutto dal path job-temp al path di output finale.

Su HDFS, quei rename sono cambi di puntatore. Microsecondi. L’intero step di commit è funzionalmente gratis.

Su S3, ogni “rename” è una copia ricorsiva seguita da delete di ogni file di parte. Per un job che produce 1000 file di parte da 100 MB ciascuno, il solo step di commit copia 100 GB attraverso S3, poi cancella gli originali. Ho visto job di produzione in cui il calcolo vero ci ha messo 10 minuti e il commit 40. Il job sembra bloccato al “99% complete” mentre il driver striscia attraverso operazioni di copy.

Fallisce anche in modo scomodo. Se il driver muore durante il rename, hai metà dei file nella loro location finale e metà nel path temporaneo. Non c’è rollback atomico perché non c’è rename atomico.

Questo è il problema del rename.

Committer direct-write

Il fix sono committer che non rinominano. Scrivono direttamente nella location finale e usano il protocollo multipart-upload di S3 per rimandare lo step “rendi visibile questo oggetto” al momento del commit. Il multipart upload è naturalmente a due fasi: carichi le parti, poi chiami CompleteMultipartUpload per far apparire l’oggetto assemblato. Se non chiami mai complete, niente appare.

Esistono diverse implementazioni:

  • S3A magic committer: quello open-source, viene con Hadoop 3.x. I task caricano le parti sulle chiavi S3 finali ma rimandano il completamento del multipart upload. Il driver li completa tutti al job commit. Molto veloce, niente rename.
  • EMRFS S3-optimized committer: la versione di AWS EMR, sostanzialmente equivalente.
  • Databricks DBIO committer: la versione proprietaria di Databricks. Stessa idea.
  • Hadoop FileOutputCommitter algoritmo v2: non S3-aware, ma un miglioramento parziale: salta il rename del secondo stadio promuovendo l’output del task alla location finale al momento del task-commit. Fa ancora un giro di rename, ancora più lento del magic committer, ma meglio di v1.

Se sei su EMR, Databricks o Dataproc, di solito la piattaforma te lo cabla. Se stai facendo girare Spark su Kubernetes vanilla o EC2 contro S3, devi configurarlo tu.

Configurare il magic committer di S3A

Hadoop 3 open-source con il magic committer:

spark = (SparkSession.builder
    .appName("S3App")
    # Usa il connettore S3A
    .config("spark.hadoop.fs.s3a.impl",
            "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # Di' a Spark di usare il protocollo path-output-committer
    .config("spark.sql.sources.commitProtocolClass",
            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
    # E usa il magic committer per i path s3a
    .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
            "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
    .config("spark.hadoop.fs.s3a.committer.name", "magic")
    # Necessario: il committer normale di Parquet non si compone col magic
    .config("spark.sql.parquet.output.committer.class",
            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
    # Raccomandato: abilita la modalita' di risoluzione conflitti
    .config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
    .getOrCreate())

Sì, sono un sacco di opzioni. Fanno tutte qualcosa. Il pattern si ripete: di’ a Spark quale commit protocol usare, di’ a Hadoop quale committer factory usare per lo schema s3a, e patcha Parquet così che il suo committer custom non sovrascriva quello magico.

Una volta configurato, le tue scritture sembrano normali scritture Spark:

(df.write
   .mode("overwrite")
   .partitionBy("country")
   .parquet("s3a://my-bucket/datasets/orders/"))

Ma la fase di commit è adesso secondi invece di minuti. Le scritture sono anche atomiche a livello di oggetto: non vedi file scritti a metà nel bucket.

Ti servono anche i JAR giusti nel classpath: hadoop-aws, hadoop-cloud, e il aws-java-sdk-bundle corrispondente. Allinea la major version di Hadoop con la tua distribuzione di Spark; i mismatch ti danno eccezioni NoSuchMethodError sconcertanti al momento della scrittura.

Consistency: la vecchia storia è morta

Se leggi documentazione S3 da prima del dicembre 2020, vedrai warning sull’eventual consistency: “dopo aver scritto un oggetto, una list successiva potrebbe non vederlo; una read-after-overwrite potrebbe restituire la vecchia versione”. Spark doveva aggirare la cosa con strumenti come S3Guard (una cache di consistency basata su DynamoDB) e vari layer di retry.

È tutto sparito. AWS ha annunciato a dicembre 2020 che S3 è strongly read-after-write consistent per tutte le operazioni: PUT, GET, LIST, DELETE, tutto quanto. Se la tua scrittura ha successo, ogni lettura successiva da tutti i client vede la nuova versione. S3Guard è stato deprecato in Hadoop 3.3 e rimosso in 3.4.

Nel 2026, puoi tranquillamente ignorare i caveat di consistency in vecchi blog post e risposte di Stack Overflow. Se stai ancora configurando S3Guard, smetti. Stai pagando una tabella DynamoDB di cui non hai bisogno.

GCS e Azure Blob offrono strong consistency da più tempo. Si applica la stessa semplificazione.

La storia dei nomi s3 / s3a / s3n

Vedrai tre schemi URL nel codice vecchio, e solo uno è attuale:

  • s3://: schema Hadoop originale, supportato da un filesystem block-based salvato su S3. Deprecato da tanto. AWS moderna usa questa forma URL in CLI/SDK per intendere “il protocollo standard per oggetti S3”, ma nelle configurazioni Hadoop/Spark storicamente significava qualcosa di diverso. Non usare s3:// in URI Spark contro Hadoop open-source.
  • s3n://: seconda generazione, backend S3 nativo. Rimosso in Hadoop 3.
  • s3a://: generazione attuale, viene con Hadoop 2.7+. Performance-tuned, supporta il magic committer, supporta i ruoli IAM, supporta server-side encryption. Questo è quello da usare.

L’eccezione: AWS EMR usa s3:// internamente per EMRFS, il connettore S3 di AWS. Quindi su EMR specificamente, s3:// funziona ed è il default. Su Databricks, s3a:// e un dbfs:/ Databricks-specifico per i path montati. Su Kubernetes-su-EC2 vanilla, sempre s3a://.

Se scrivi codice portabile, parametrizza lo schema:

input_path = os.environ.get("INPUT_PATH", "s3a://my-bucket/raw/")

Autenticazione: la best practice del ruolo IAM

Tre modi di autenticarsi. Dal peggiore al migliore:

Chiavi hardcoded nella config della SparkSession. No.

# NON FARLO
.config("spark.hadoop.fs.s3a.access.key", "AKIA...")
.config("spark.hadoop.fs.s3a.secret.key", "...")

Le chiavi finiscono nei log, nei process listing, negli screenshot. Anche se “le ruoti dopo”, le hai gia’ fatte trapelare.

Variabili d’ambiente. Meglio. Il connettore S3A legge AWS_ACCESS_KEY_ID e AWS_SECRET_ACCESS_KEY automaticamente tramite la default credential provider chain. Funziona per sviluppo locale contro account sandbox AWS. Comunque non farlo in produzione: le env var su compute condiviso sono visibili a chiunque possa fare introspezione sul processo.

Instance profile / ruoli IAM per service account. La risposta giusta.

  • Su EC2: attacca un instance profile IAM ai nodi executor. Il connettore S3A chiama il servizio di metadati dell’istanza EC2 per ottenere credenziali short-lived. Niente chiavi da nessuna parte.
  • Su EKS / Kubernetes: usa IRSA (IAM Roles for Service Accounts). Il pod ottiene un token proiettato; il connettore S3A lo scambia per credenziali STS. Niente chiavi.
  • Su EMR: il ruolo del cluster è automatico.

Per S3A nello specifico, la credential provider chain è:

.config("spark.hadoop.fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

Quella chain prova env var, system properties, il file di profilo AWS, e infine il servizio di metadati dell’istanza EC2, in ordine. Configura un ruolo IAM e la chain lo trova senza alcuna config esplicita di chiavi.

GCS usa workload identity su GKE o JSON key di service account (che hanno gli stessi caveat “non farle trapelare”). Azure Blob usa managed identity o token SAS. Stesso principio: preferisci il meccanismo di identità della piattaforma rispetto al materiale di chiave nel codice.

GCS e Azure: stessa storia, connettore diverso

Il panorama concettuale è identico. Classi connettore diverse, prefissi di config diversi.

GCS:

.config("spark.hadoop.fs.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl",
        "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

Le URI sono gs://bucket/path/. GCS supporta un committer direct-write simile (incluso nel connettore GCS dalla 2.x). Strong consistency dal lancio.

Azure Data Lake Storage Gen2 usa il driver ABFS:

.config("spark.hadoop.fs.abfs.impl",
        "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem")

Le URI sono abfs://container@account.dfs.core.windows.net/path/. ADLS Gen2 con namespace gerarchico abilitato è più vicino a un filesystem reale rispetto a S3: le directory sono first-class e il rename è atomico. Puoi usare il FileOutputCommitter standard senza pagare la tassa del rename.

Un esempio funzionante di lettura-e-scrittura su S3

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("S3Demo")
    .config("spark.jars.packages",
            "org.apache.hadoop:hadoop-aws:3.3.6,"
            "org.apache.hadoop:hadoop-cloud:3.3.6")
    # Filesystem
    .config("spark.hadoop.fs.s3a.impl",
            "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    # Magic committer
    .config("spark.sql.sources.commitProtocolClass",
            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
    .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
            "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
    .config("spark.hadoop.fs.s3a.committer.name", "magic")
    .config("spark.sql.parquet.output.committer.class",
            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
    # Pool di connessioni modesto, retry
    .config("spark.hadoop.fs.s3a.connection.maximum", "100")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "10")
    .getOrCreate())

# Lettura
events = spark.read.parquet("s3a://my-bucket/raw/events/year=2026/")

# Aggregazione
daily = (events
         .groupBy("country", "event_date")
         .count())

# Scrittura - partizionata, supportata dal magic committer
(daily.write
      .mode("overwrite")
      .partitionBy("event_date")
      .parquet("s3a://my-bucket/curated/daily_events/"))

Se vuoi verificare che il magic committer sia attivo, guarda il log del driver al momento della scrittura. Dovresti vedere righe come Using committer: MagicS3GuardCommitter. Se vedi FileOutputCommitter algorithm v1, sei tornato sul percorso lento; controlla il blocco di quattro config sopra per refusi.

Una breve lista dei NON fare

Non usare URI s3:// contro Spark/Hadoop open-source. Usa s3a://. Lo schema conta; alcune operazioni di listing dei bucket cadranno silenziosamente su percorsi inefficienti.

Non scrivere una tabella partizionata su S3 con mille piccoli file per partizione. Ogni file è un oggetto separato, ogni oggetto è una PUT separata, e S3 fa rate-limiting delle PUT a circa 3500/secondo per prefisso. Coalesce o repartition prima; mira a file di 128MB-1GB.

Non disabilitare le aspettative di strong consistency. Funzionano. Smettila di mettere chiamate Thread.sleep difensive “nel caso S3 non si sia allineata”.

Non incollare chiavi AWS nella config. Usa ruoli IAM. Se proprio devi usare chiavi per uno script di local-dev, usa il file di profilo AWS, mai stringhe hardcoded.

Non ignorare spark.hadoop.fs.s3a.connection.maximum. La pool size di default (intorno a 15) va bene per piccoli job. Workload reali con centinaia di core di executor hanno bisogno di 100+. Se vedi “Timeout waiting for connection from pool” nei log, questa è la manopola.

Prossima lezione, schema evolution: cosa succede quando il team a monte aggiunge una colonna, ne rinomina una, o cambia un tipo, e come i diversi formati di file gestiscono la cosa. mergeSchema di Parquet, la risoluzione writer-reader di Avro, e il modo Delta/Iceberg di versionare i cambiamenti di schema nel log della tabella.


Riferimenti: documentazione di Apache Spark, documentazione di Hadoop S3A (https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html), annuncio AWS sulla strong consistency di S3 (dicembre 2020). Recuperati il 2026-05-01.

Cerca