Dacă ai folosit Spark în ultimii ani, șansele ca datele tale să stea pe HDFS efectiv sunt foarte mici. Cei mai mulți dintre noi citim și scriem căi s3://..., gs://... sau abfs://.... Cluster-ul pornește, cluster-ul dispare, datele persistă în object storage. Acesta e modelul.
E un model excelent. E și un model în care Spark, proiectat inițial pentru HDFS, scapă abstracțiuni în moduri interesante. Object store-urile nu sunt filesystem-uri, chiar dacă URL-urile le fac să arate așa. Lecția asta e despre ce se schimbă când „filesystem-ul” tău este de fapt un key-value store, și ce să configurezi ca Spark să se comporte civilizat.
Diferența conceptuală
Un filesystem precum HDFS are directoare. Un director este o entitate reală cu metadate, operațiuni atomice și un arbore părinte-copil. mv /old/path /new/path este o schimbare de pointer de metadate, instantaneu, atomic.
S3 nu are nimic din astea. S3 este un key-value store. Cheile sunt string-uri plate. Nu există un director /year=2024/month=03/; există doar obiecte ale căror chei se întâmplă să înceapă cu year=2024/month=03/. „Directorul” pe care îl vezi în consola AWS este o ficțiune UI construită prin listarea cheilor cu un prefix comun.
Consecințele:
- Nicio renumire atomică.
aws s3 mvestecopyplusdelete. Pentru un singur obiect mic asta e rapid. Pentru un obiect de 50 GB e lent. Pentru un „director” cu 10000 de obiecte sunt 10000 de perechi separate copy-delete. - Niciun director gol. Un „director” există dacă și numai dacă cel puțin o cheie îl are ca prefix. Spark uneori rezolvă asta scriind obiecte placeholder de zero bytes.
- Operațiunile de listare sunt paginate și consistente eventual la unele operațiuni. Listarea unui prefix căruia tocmai i s-au adăugat 10000 de obiecte returnează obiectele noi pe S3 azi (din 2020), dar operațiunea este lentă și rate-limited.
GCS și Azure Blob au modele similare. API-uri diferite, compromisuri similare.
Problema renumirii
Iată de ce contează asta pentru Spark.
Când Spark scrie un DataFrame într-o cale de partiție, iată ce face FileOutputCommitter-ul standard (algoritmul v1):
- Fiecare task își scrie output-ul într-o cale temporară:
s3://bucket/output/_temporary/0/_temporary/attempt_xxx/part-00000-...parquet. - Când task-ul se finalizează, driverul redenumește calea task-temp în calea job-temp.
- Când întregul job se finalizează, driverul redenumește totul din calea job-temp în calea finală de output.
Pe HDFS, acele renumiri sunt schimbări de pointer. Microsecunde. Întregul pas de commit este funcțional gratuit.
Pe S3, fiecare „renumire” este un copy-then-delete recursiv al fiecărui fișier part. Pentru un job care produce 1000 de fișiere part de câte 100 MB, doar pasul de commit copiază 100 GB prin S3, apoi șterge originalele. Am văzut job-uri în producție unde calculul efectiv a durat 10 minute și commit-ul a durat 40. Job-ul pare blocat la „99% complete” în timp ce driverul se târăște prin operațiuni de copiere.
Eșuează și prost. Dacă driverul moare în timpul renumirii, ai jumătate din fișiere în locația lor finală și jumătate în calea temporară. Nu există rollback atomic pentru că nu există renumire atomică.
Aceasta este problema renumirii.
Direct-write committers
Reparația este committers care nu redenumesc. Scriu direct în locația finală și folosesc protocolul S3 multipart-upload pentru a amâna pasul „fă obiectul ăsta vizibil” până la momentul commit-ului. Multipart upload este natural în două faze: încarci părți, apoi apelezi CompleteMultipartUpload pentru a face obiectul asamblat să apară. Dacă nu apelezi niciodată complete, nu apare nimic.
Există mai multe implementări:
- S3A magic committer: cel open-source, livrat cu Hadoop 3.x. Task-urile încarcă părți la cheile S3 finale dar amână finalizarea multipart-upload. Driverul le finalizează pe toate la commit-ul job-ului. Foarte rapid, fără renumiri.
- EMRFS S3-optimized committer: versiunea AWS EMR, în mare parte echivalentă.
- Databricks DBIO committers: versiunea proprietară a Databricks. Aceeași idee.
- Hadoop
FileOutputCommitteralgoritm v2: nu este S3-aware, dar o îmbunătățire parțială: sare peste renumirea de a doua etapă promovând output-ul task-ului în locația finală la momentul commit-ului task-ului. Tot face o rundă de renumiri, tot mai lent decât magic committer-ul, dar mai bun decât v1.
Dacă ești pe EMR, Databricks sau Dataproc, platforma de obicei îți cablează asta. Dacă rulezi Spark pe Kubernetes simplu sau EC2 împotriva S3, trebuie să-l configurezi.
Configurarea S3A magic committer
Hadoop 3 open-source cu magic committer:
spark = (SparkSession.builder
.appName("S3App")
# Foloseste conectorul S3A
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
# Spune-i Spark sa foloseasca protocolul path-output-committer
.config("spark.sql.sources.commitProtocolClass",
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
# Si foloseste magic committer pentru cai s3a
.config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
# Necesar: committer-ul normal Parquet nu se compune cu magic
.config("spark.sql.parquet.output.committer.class",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
# Recomandat: activeaza modul de rezolvare a conflictelor
.config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
.getOrCreate())
Da, sunt multe opțiuni. Toate fac ceva. Pattern-ul se repetă: spune-i Spark ce protocol de commit să folosească, spune-i Hadoop ce committer factory să folosească pentru schema s3a și petice Parquet ca committer-ul lui personalizat să nu suprascrie pe cel magic.
Odată configurate, scrierile tale arată ca scrierile Spark normale:
(df.write
.mode("overwrite")
.partitionBy("country")
.parquet("s3a://my-bucket/datasets/orders/"))
Dar faza de commit este acum secunde în loc de minute. Scrierile sunt și atomice la nivel de obiect, nu vezi fișiere scrise pe jumătate în bucket.
Ai nevoie și de JAR-urile potrivite pe classpath: hadoop-aws, hadoop-cloud și aws-java-sdk-bundle corespunzător. Potrivește versiunea majoră Hadoop a distribuției tale Spark; nepotrivirile îți dau excepții derutante NoSuchMethodError la momentul scrierii.
Consistență: vechea poveste e moartă
Dacă citești documentație S3 dinainte de decembrie 2020, vei vedea avertismente despre consistență eventuală: „după scrierea unui obiect, o listare ulterioară s-ar putea să nu îl vadă; o citire-după-suprascriere ar putea returna versiunea veche.” Spark trebuia să rezolve asta cu unelte precum S3Guard (un cache de consistență suportat de DynamoDB) și diverse straturi de retry.
Toate astea au dispărut. AWS a anunțat în decembrie 2020 că S3 este puternic consistent read-after-write pentru toate operațiunile: PUT, GET, LIST, DELETE, totul. Dacă scrierea ta reușește, fiecare citire ulterioară din toți clienții vede versiunea nouă. S3Guard a fost depreciat în Hadoop 3.3 și eliminat în 3.4.
În 2026, poți ignora în siguranță avertismentele despre consistență din postările vechi de blog și răspunsurile Stack Overflow. Dacă încă configurezi S3Guard, oprește-te. Plătești pentru o tabelă DynamoDB de care nu ai nevoie.
GCS și Azure Blob oferă consistență puternică de mai mult timp. Aceeași simplificare se aplică.
Istoria denumirilor s3 / s3a / s3n
Vei vedea trei scheme de URL în cod vechi și doar una este actuală:
s3://: schema Hadoop originală, suportată de un filesystem block-based stocat pe S3. Depreciat de mult timp. AWS modern folosește această formă URL în CLI/SDK-uri pentru a însemna „protocolul S3 standard de obiecte”, dar în configurațiile Hadoop/Spark a însemnat istoric ceva diferit. Nu folosis3://în URI-uri Spark împotriva Hadoop open-source.s3n://: a doua generație, backend S3 nativ. Eliminat în Hadoop 3.s3a://: generația actuală, livrat cu Hadoop 2.7+. Optimizat pentru performanță, suportă magic committer, suportă rolurile IAM, suportă criptarea pe partea de server. Asta este cea de folosit.
Excepția: AWS EMR folosește s3:// intern pentru EMRFS, propriul conector S3 al AWS. Deci pe EMR în mod specific, s3:// funcționează și este implicit. Pe Databricks, s3a:// și un dbfs:/ specific Databricks pentru căi montate. Pe Kubernetes-on-EC2 simplu, întotdeauna s3a://.
Dacă scrii cod portabil, parametrizează schema:
input_path = os.environ.get("INPUT_PATH", "s3a://my-bucket/raw/")
Autentificare: cea mai bună practică cu rol IAM
Trei moduri de a te autentifica. De la cel mai prost la cel mai bun:
Chei hardcodate în configurația SparkSession. Nu.
# NU FACE ASTA
.config("spark.hadoop.fs.s3a.access.key", "AKIA...")
.config("spark.hadoop.fs.s3a.secret.key", "...")
Cheile ajung în log-uri, în listele de procese, în screenshot-uri. Chiar dacă „le rotești mai târziu”, le-ai scăpat.
Variabile de mediu. Mai bine. Conectorul S3A citește AWS_ACCESS_KEY_ID și AWS_SECRET_ACCESS_KEY automat prin lanțul implicit de furnizori de credențiale. Asta funcționează pentru dezvoltare locală împotriva conturilor sandbox AWS. Tot să nu o faci în producție: env vars pe compute partajat sunt vizibile oricui poate inspecta procesul.
Profile de instanță / roluri IAM pentru conturi de serviciu. Răspunsul corect.
- Pe EC2: atașează un profil de instanță IAM la nodurile executor. Conectorul S3A apelează serviciul de metadate al instanței EC2 ca să obțină credențiale de scurtă durată. Fără chei nicăieri.
- Pe EKS / Kubernetes: folosește IRSA (IAM Roles for Service Accounts). Pod-ul primește un token proiectat; conectorul S3A îl schimbă pentru credențiale STS. Fără chei.
- Pe EMR: rolul cluster-ului este automat.
Pentru S3A specific, lanțul de furnizori de credențiale este:
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
Acel lanț încearcă variabile de mediu, proprietăți de sistem, fișierul de profil AWS și în final serviciul de metadate al instanței EC2, în ordine. Configurează un rol IAM și lanțul îl găsește fără nicio configurație explicită de chei.
GCS folosește workload identity pe GKE sau chei JSON de cont de serviciu (care au aceleași avertismente „nu le scăpa”). Azure Blob folosește identități gestionate sau token-uri SAS. Același principiu: preferă mecanismul de identitate al platformei în detrimentul materialului de chei în cod.
GCS și Azure: aceeași poveste, alt conector
Peisajul conceptual este identic. Clase de conector diferite, prefixe de configurație diferite.
GCS:
.config("spark.hadoop.fs.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
.config("spark.hadoop.fs.AbstractFileSystem.gs.impl",
"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
URI-urile sunt gs://bucket/path/. GCS suportă un direct-write committer similar (integrat în conectorul GCS din 2.x). Consistență puternică de la lansare.
Azure Data Lake Storage Gen2 folosește driver-ul ABFS:
.config("spark.hadoop.fs.abfs.impl",
"org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem")
URI-urile sunt abfs://container@account.dfs.core.windows.net/path/. ADLS Gen2 cu hierarchical namespace activat este mai aproape de un filesystem real decât S3, directoarele sunt cetățeni de prima clasă și renumirea este atomică. Poți folosi FileOutputCommitter-ul standard fără să plătești taxa de renumire.
Un exemplu funcțional de citire-și-scriere S3
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("S3Demo")
.config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.3.6,"
"org.apache.hadoop:hadoop-cloud:3.3.6")
# Filesystem
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
# Magic committer
.config("spark.sql.sources.commitProtocolClass",
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
.config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
.config("spark.sql.parquet.output.committer.class",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
# Pool de conexiuni modest, retry-uri
.config("spark.hadoop.fs.s3a.connection.maximum", "100")
.config("spark.hadoop.fs.s3a.attempts.maximum", "10")
.getOrCreate())
# Citire
events = spark.read.parquet("s3a://my-bucket/raw/events/year=2026/")
# Agregare
daily = (events
.groupBy("country", "event_date")
.count())
# Scriere, partitionata, sustinuta de magic committer
(daily.write
.mode("overwrite")
.partitionBy("event_date")
.parquet("s3a://my-bucket/curated/daily_events/"))
Dacă vrei să verifici că magic committer-ul este activ, uită-te în log-ul driverului la scriere. Ar trebui să vezi linii precum Using committer: MagicS3GuardCommitter. Dacă vezi FileOutputCommitter algorithm v1, ești înapoi pe calea lentă; verifică blocul de patru config-uri de mai sus pentru greșeli de tipar.
O scurtă listă NU FACE
Nu folosi URI-uri s3:// împotriva Spark/Hadoop open-source. Folosește s3a://. Schema contează; unele operațiuni de listare a bucket-ului vor reveni în liniște la căi ineficiente.
Nu scrie o tabelă partiționată în S3 cu o mie de fișiere mici per partiție. Fiecare fișier este un obiect separat, fiecare obiect este un PUT separat, iar S3 limitează rate-ul PUT la aproximativ 3500/secundă pe prefix. Coalesce sau repartiționează mai întâi; vizează fișiere de 128MB-1GB.
Nu dezactiva așteptările de consistență puternică. Funcționează. Oprește-te din pus apeluri defensive Thread.sleep „în caz că S3 nu a prins din urmă.”
Nu lipi chei AWS în config. Folosește roluri IAM. Dacă chiar trebuie să folosești chei pentru un script de dev local, folosește fișierul de profil AWS, niciodată string-uri hardcodate.
Nu ignora spark.hadoop.fs.s3a.connection.maximum. Dimensiunea implicită a pool-ului (în jur de 15) e bună pentru job-uri mici. Sarcinile reale cu sute de core-uri executor au nevoie de 100+. Dacă vezi „Timeout waiting for connection from pool” în log-uri, ăsta e butonul.
Lecția următoare, schema evolution: ce se întâmplă când echipa upstream adaugă o coloană, redenumește una sau schimbă un tip, și cum gestionează diferitele formate de fișier asta. mergeSchema din Parquet, rezoluția writer-reader din Avro și modul Delta/Iceberg de a versiona schimbările de schemă în log-ul tabelei.
Referințe: documentația Apache Spark, documentația Hadoop S3A (https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html), anunțul AWS S3 strong consistency (decembrie 2020). Consultat 2026-05-01.