PySpark, from the ground up Lesson 11 / 60

Writing data: modes, partitions, and the file-count problem

Save modes, partitioned writes, the difference between many small files and one giant file, and why Parquet is the default for a reason.

A Spark job that doesn’t write anything is a Spark job that ran in a notebook and helped nobody. Today we make our work durable. Writing looks like the mirror of reading — df.write instead of spark.read — but it has its own pile of footguns. Save modes you’ll forget exist until they delete production. Partition layouts that turn a five-minute job into a five-hour one. The notorious “I have ten thousand 4KB files in S3 and now my read is slower than the write” problem.

If you read lesson 9 carefully you’ve already seen the bones of df.write.parquet(...). We’re going to take the lid off properly.

The shape of a write

Same dual builder as read:

# Convenience methods
df.write.csv("./out/orders.csv")
df.write.json("./out/orders.json")
df.write.parquet("./out/orders.parquet")

# Builder form
(df.write
   .format("parquet")
   .mode("overwrite")
   .option("compression", "snappy")
   .save("./out/orders.parquet"))

The two forms are equivalent. I default to the builder once a write has more than two options — it reads top-to-bottom and is easier to diff in code review.

Setup, in case you’re starting fresh:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("WritingData")
         .master("local[*]")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

The four save modes

.mode(...) decides what happens when the destination path already exists. There are exactly four values, and you should memorize them:

df.write.mode("error").parquet(path)             # default
df.write.mode("errorifexists").parquet(path)     # alias of "error"
df.write.mode("append").parquet(path)
df.write.mode("overwrite").parquet(path)
df.write.mode("ignore").parquet(path)

error (and its synonym errorifexists) is the default. If the path already exists, Spark refuses to write and raises an exception. This is the right default — it stops you from accidentally clobbering yesterday’s data. Lots of one-off scripts use error simply by not specifying a mode.

append writes new files alongside whatever’s already there. The existing files are untouched; new part-*.parquet files appear in the same folder. This is how you do daily-incremental writes:

todays_orders.write.mode("append").parquet("./data/orders_lake.parquet")

A subtle danger: append mode does not dedupe. If you append the same data twice, you have it twice. There’s no INSERT IGNORE, no upsert. Append is a literal append. (Delta Lake and Apache Iceberg solve this with MERGE INTO. Plain Parquet does not.)

overwrite deletes the existing path and writes fresh. This is destructive and you should treat it that way:

final.write.mode("overwrite").parquet("./data/orders_clean.parquet")

Overwrite is the right mode for “rebuild this dataset from scratch every time” pipelines — typical of dimension tables and aggregated marts. It’s the wrong mode for “add today’s batch to the lake” pipelines. Mix them up and you’ll either accumulate duplicates or wipe a year of history.

There’s a finer-grained variant called dynamic partition overwrite, which only overwrites the partitions present in your write (not the whole path). It’s a config flag rather than a mode:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

todays_orders.write \
    .mode("overwrite") \
    .partitionBy("OrderDate") \
    .parquet("./data/orders_lake.parquet")

With dynamic, only the OrderDate=2026-03-28/ folder gets replaced; the other dates stay. Without dynamic (the default static mode), the entire orders_lake.parquet/ folder is wiped. The number of teams that have learned this difference the hard way is large. Set dynamic globally in any pipeline that overwrites by partition.

ignore is the strange one. If the path exists, do nothing — silently. No error, no overwrite, no append. Useful in idempotent setup scripts (“create this lookup table if it isn’t there yet”) and almost nothing else. I’ve shipped maybe two mode("ignore") calls in five years.

Partitioned writes — the directory layout that makes reads fast

partitionBy(col1, col2, ...) writes a Hive-style directory structure where each partition value becomes a folder name:

orders.write \
    .mode("overwrite") \
    .partitionBy("Country") \
    .parquet("./data/orders_by_country.parquet")

On disk:

orders_by_country.parquet/
  _SUCCESS
  Country=NL/
    part-00000-...snappy.parquet
  Country=IT/
    part-00000-...snappy.parquet
  Country=DE/
    part-00000-...snappy.parquet
  Country=RO/
    part-00000-...snappy.parquet

The Country= segment isn’t decoration. It’s a parsable, queryable thing. When you read this layout back with a filter on Country, Spark only opens the matching folders:

italian_only = (spark.read
                .parquet("./data/orders_by_country.parquet")
                .filter("Country = 'IT'"))
italian_only.explain(True)

Look at the physical plan and you’ll see PartitionFilters: [isnotnull(Country#X), (Country#X = IT)] — Spark pushed the filter into the file listing itself. The DE, NL, and RO folders are never opened. This is called partition pruning, and on a multi-TB dataset it’s the difference between scanning 4TB and scanning 4GB.

You can partition on multiple columns:

orders.write \
    .mode("overwrite") \
    .partitionBy("Country", "OrderDate") \
    .parquet("./data/orders_by_c_d.parquet")
orders_by_c_d.parquet/
  Country=IT/
    OrderDate=2026-02-15/
      part-00000-...snappy.parquet
    OrderDate=2026-03-22/
      part-00000-...snappy.parquet
  Country=NL/
    ...

Spark prunes left-to-right: a filter on Country skips most of the tree; a filter on Country AND OrderDate skips even more.

The partition column is not stored inside the Parquet files — it’s encoded in the path. When Spark reads the dataset back, it reconstructs the column from the folder names. Side effect: you can’t have a row where Country is missing, because there’s no folder for that. Null partition values get a special folder name like Country=__HIVE_DEFAULT_PARTITION__. Mostly fine, occasionally surprising.

The cardinality trap

Partitioning is a knife, and the question is which way it cuts. The rule is:

Partition by columns with low to medium cardinality. Aim for partitions that are at least ~100MB each, ideally 256MB to 1GB.

Country has, what, 200 distinct values worldwide. Maybe 4 in your dataset. That’s a great partition column.

OrderDate is medium cardinality — one folder per day, so 365 folders per year. Also good for time-series data, and the de-facto standard pattern for data lakes.

OrderId would be catastrophic. One folder per order. Ten million orders means ten million folders, each with one Parquet file containing one row, each file with a Parquet header that’s larger than its data. Reading the dataset back is a metadata operation against ten million files, and your S3 bill cries. This is the small files problem in its purest form.

A real-world example: an analyst once partitioned a 50GB dataset by (Country, OrderDate, CustomerId). The result was 1.4 million tiny Parquet files. The original dataset would read in 30 seconds. The partitioned version took 40 minutes. We rewrote it partitioned by (Country, OrderDate) only, and it dropped back to 35 seconds. CustomerId belonged in the data, not the path.

Heuristic for partition column choice:

  • Cardinality: under a few thousand distinct values, total. Tens of thousands max.
  • Filter selectivity: people query by this column often. If nobody filters by it, partitioning by it is pure cost.
  • Even distribution: roughly equal data per partition. A column where 90% of rows have the same value gives you one giant partition and a thousand tiny ones — worst of both worlds.

For most analytical datasets, the right answer is partitionBy("date_column") and stop. Anything finer needs a strong reason.

File count, file size, and .coalesce(1)

Even within a single partition, Spark writes one file per executor task that was active when the write ran. With 8 partitions in your DataFrame, you’ll get 8 part-files in the output:

orders.write.mode("overwrite").parquet("./data/multi_file.parquet")
import os
print(sorted(os.listdir("./data/multi_file.parquet")))
# ['_SUCCESS', 'part-00000-...snappy.parquet', 'part-00001-...snappy.parquet', ...]

If you need a single output file — say you’re shipping one CSV to a partner and they refuse a folder — the temptation is coalesce(1):

orders.coalesce(1).write.mode("overwrite").csv("./data/single.csv")

coalesce(1) collapses the entire DataFrame to one partition before writing. The result is one file. Simple, intuitive, and a trap.

What actually happens: Spark forces all data through a single executor task. All of it. A pipeline that ran across 100 cores in parallel suddenly runs on 1 core, and that core has to hold every row in memory while it writes. On 100MB it’s fine. On 100GB it OOMs. On 1TB it has been known to take down clusters.

The two safe usages of coalesce(1):

  1. The output is genuinely small (under 1GB, comfortably).
  2. You really need one file (a CSV for a non-Spark consumer, a small daily report).

For everything else, accept the multi-file output. Most tools — Spark, Hive, Presto, DuckDB, Pandas via glob — handle multi-file Parquet folders natively.

If you have the opposite problem — too many small files — the right fix is repartition(N) before write, where N is “approximately, how many files do I want?”:

# Aim for ~256MB files. If your DataFrame is roughly 25GB, that's ~100 files.
big_df.repartition(100).write.mode("overwrite").parquet("./out/big.parquet")

repartition does a full shuffle and gives you exactly N partitions. coalesce only reduces partition count without a shuffle but can’t increase. Rule of thumb: repartition(N) to grow, coalesce(N) to shrink, both with N chosen to land each file in the 100MB–1GB range.

A complete write script

Pulling the patterns together — the kind of script that lands at the end of a real ETL job:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month

spark = (SparkSession.builder
         .appName("WriteShowcase")
         .master("local[*]")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

# Make dynamic partition overwrite the default — a kindness to your future self.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

orders = (spark.read
          .option("header", "true")
          .option("inferSchema", "true")
          .csv("./data/orders.csv"))

# Add year/month for partition columns. Don't partition by day on a tiny dataset —
# but on a real lake, year/month/day is the standard layout.
enriched = (orders
            .withColumn("Year",  year(col("OrderDate")))
            .withColumn("Month", month(col("OrderDate"))))

# 1. Internal: Parquet, partitioned, overwrite (dynamic).
(enriched.write
         .mode("overwrite")
         .partitionBy("Year", "Month")
         .option("compression", "snappy")
         .parquet("./data/orders_lake.parquet"))

# 2. Append today's batch — same path, new files.
todays = enriched.filter("OrderDate = '2026-03-28'")
(todays.write
       .mode("append")
       .partitionBy("Year", "Month")
       .parquet("./data/orders_lake.parquet"))

# 3. Daily report — small CSV, single file, for a non-Spark consumer.
report = (orders.groupBy("Country").sum("Total")
                .withColumnRenamed("sum(Total)", "total_revenue"))
(report.coalesce(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .csv("./data/daily_report.csv"))

# 4. Read it back to prove it works.
spark.read.parquet("./data/orders_lake.parquet").show()

spark.stop()

Three writes, three different shapes — Parquet for internal storage, CSV for the consumer, append for the daily slice. Every real pipeline ends in some combination of these.

Cloud storage, briefly

Everything here works the same on S3, ADLS, or GCS — change ./data/... to s3a://bucket/path/... (or abfss://, or gs://) and the code is identical. The only practical differences are:

  • Cloud writes are slower than local, especially for many small files (each file is a network round-trip).
  • S3 in particular has weak rename semantics, so _SUCCESS markers and atomic writes need extra care. Spark handles this internally; modern S3 clients are now strongly consistent so this is less of a fire than it used to be.
  • Lifecycle policies, bucket versioning, and IAM are entirely out of scope for the Spark job — they live one layer below.

If you’re doing serious work in the cloud, the usual upgrade is to a transactional table format (Delta, Iceberg, Hudi) on top of cloud Parquet. Same writes you’ve just learned, plus ACID, plus MERGE INTO, plus time travel. Module 9 has its own lesson on Delta Lake.

We’ll come back to partitioning depth in Module 6, where we cover bucketing, Z-ordering, and how to design a partition layout for a query workload you actually understand. For now, the headline is: choose a low-cardinality date or category column, aim for partitions in the 100MB–1GB range, and never coalesce(1) something that doesn’t fit on your laptop.

Next lesson is the last in this module: local mode versus a real cluster. What changes, what doesn’t, and the bugs that only show up when there are actual executors in the picture.

Search