PySpark, from the ground up Lesson 25 / 60

What a shuffle actually is, in physical terms

The network operation at the heart of distributed computing, what happens during one, and why everyone fears it.

We’ve been throwing around the word “shuffle” for a few lessons now without ever opening it up. Today we open it up. By the end of this post you should be able to picture, almost frame by frame, what happens between executors when Spark performs a groupBy. That mental picture is what separates engineers who can tune a Spark job from engineers who paste the same .repartition(200) call into every notebook and hope.

The high concept first: a shuffle is the redistribution of data across the cluster so that records with the same key end up in the same partition. Anything that depends on “all the rows for key X in one place” — group, join, distinct, sort — requires one. It is the most expensive thing Spark routinely does. Modules 5 and 6 of this course are essentially a long, patient look at how to do fewer of them, and how to make the ones you can’t avoid as cheap as possible.

What triggers a shuffle

Memorize this list. Every time you write code that includes one of these, picture data flying across the network:

  • groupBy(...).agg(...) — group by key; same-key rows must co-locate.
  • df.distinct() — needs to deduplicate across all partitions.
  • df.dropDuplicates([...]) — same.
  • df.join(other, on=...) — both sides need to be partitioned by the join key (unless one side is broadcast — see lesson 28).
  • df.orderBy(...) / df.sort(...) — global ordering needs a range-partitioned shuffle.
  • df.repartition(...) — by definition, that’s its job.
  • df.repartitionByRange(...) — same, with a range partitioner.

Operations that do not trigger a shuffle (these are narrow):

  • select, filter, withColumn, drop, cast, na.fill, simple withColumnRenamed.
  • union (concatenates partitions; no redistribution).
  • coalesce(n) when reducing partition count (merges locally, no shuffle — different from repartition).

coalesce vs repartition is its own lesson (35), but the one-liner: coalesce is narrow and cheap; repartition is wide and a real shuffle.

The physical sequence

Let’s walk through what actually happens for a groupBy("status").sum("total") over a 100GB dataset on a 10-executor cluster, each executor with 4 cores. Suppose spark.sql.shuffle.partitions = 200.

Stage N: the map side

Every input partition has a task running on it. Suppose 200 input partitions, so 200 map tasks. Each task:

  1. Reads its input partition. From wherever — S3, HDFS, local disk, an upstream cached DataFrame.
  2. Applies the partial aggregation. Spark’s optimizer pre-aggregates per partition, so instead of writing every row out, the map task writes one partial sum per status value it saw. (For a groupBy with a few distinct keys, this is a massive reduction. For distinct or a high-cardinality group, less so.)
  3. Partitions the output by destination. For each output record, compute hash(status) mod 200 to figure out which downstream partition it belongs to. Buffer the records in 200 destination buckets.
  4. Writes the buckets to local disk. Each map task produces one shuffle file with 200 sections (or 200 small files, depending on the shuffle implementation). The data is serialized with whatever serializer is configured (Kryo or Java) and may be compressed (spark.shuffle.compress = true by default, LZ4).

At this point all 200 map tasks have finished. There are now 200 × 200 = 40,000 logical “blocks” sitting on the local disks of the 10 executors. Each block is the data destined for one downstream partition from one upstream partition.

The map side has done two expensive things: serialization to bytes, and a write to local disk.

Stage N+1: the reduce side

200 reduce tasks start (one per output partition). Each reduce task is responsible for one of the 200 destination buckets. To do its work, it has to fetch its bucket from every map task — which means making a network request to every executor that holds map output (potentially all 10).

Each reduce task:

  1. Fetches its blocks. Network requests to every executor that has data for it. Bytes flow over the wire. Spark’s BlockManager handles this; the shuffle service (when enabled) keeps map output available even if executors die.
  2. Deserializes the incoming blocks. Bytes back into JVM objects.
  3. Merges them. All the partial sums for the keys assigned to this partition are combined into final aggregates.
  4. Passes the merged data to the next operator (write to Parquet, in our example).

The reduce side has done: a fan-in network fetch from every node, deserialization, and a merge.

The cost, in numbers

In aggregate across the cluster, that one shuffle of a 100GB dataset moves ~100GB across the network. Depending on cardinality and partial aggregation, sometimes much less (a groupBy with 5 distinct keys collapses dramatically); sometimes basically the full dataset (a distinct on a high-cardinality column, or a join on raw rows).

Plus: the same ~100GB hits local disk on the map side and gets read back from local disk on the reduce side. Compression cuts the wire and disk volume but adds CPU.

The wallclock cost on a typical cluster: the shuffle is usually 60-90% of the wallclock time of a job that has one. Reading the source from S3 is also expensive, but the shuffle is comparable or worse, and unlike the source read it can’t be parallelized away — it’s bottlenecked by network bandwidth and the slowest fetch.

This is why people fear shuffles. Not because they’re mysterious, but because they’re genuinely the slowest thing in a typical Spark job.

A note on the shuffle service

Production clusters usually run an external shuffle service — a long-lived daemon on each worker node, separate from the executor JVMs. Its one job is to serve shuffle blocks to whoever asks for them. Why bother? Because executors come and go (autoscaling, dynamic allocation, spot eviction), and you don’t want a vanished executor to take its map output with it. The shuffle service decouples “who wrote this block” from “who can serve this block.” If the executor that produced shuffle output dies, the data is still on the node, and the shuffle service hands it out to anyone who needs it.

When the shuffle service is off, losing an executor mid-shuffle means re-running the upstream stage to regenerate the lost map output. Expensive. Turn the service on in production. Most managed Spark platforms (Databricks, EMR, Dataproc) enable it by default; bare-metal deployments need to set spark.shuffle.service.enabled = true and configure the daemon.

How to see this in the Spark UI

Open the Stages tab (lesson 22 covered this). Two columns are the shuffle’s fingerprint:

  • Shuffle Write — bytes written to local disk by this stage’s tasks (map side).
  • Shuffle Read — bytes fetched over the network by this stage’s tasks (reduce side).

A stage that produces a shuffle has a non-zero Shuffle Write. A stage that consumes one has a non-zero Shuffle Read. They come in pairs across stage boundaries.

Click into a stage and look at the task summary. The Shuffle Read Time column tells you how long each task spent waiting for its blocks to arrive. If that’s the dominant chunk of task duration, the bottleneck is network or remote-disk speed, not your code. If task durations are wildly skewed — median 4s, max 5 minutes — one key has way more rows than the others, and one reducer is doing almost all the work alone. That’s data skew, and it’s a whole topic (lesson 30).

Why “avoid shuffles” is a half-truth

You’ll hear “avoid shuffles” as advice constantly. It’s mostly right, but it can be misleading. Here’s the more honest version:

  • You cannot avoid shuffles entirely. Any aggregation across keys, any join (except broadcast), any global sort — these require redistribution. There’s no clever rewrite that gets you out of physics.
  • What you can do is shrink them. Reduce how much data the shuffle has to move.

Concrete shrinking moves, each its own future lesson:

  1. Filter early. Push your WHERE clauses upstream so less data hits the shuffle. (Spark’s Catalyst optimizer does a lot of this for you, but only when it can prove it’s safe.)
  2. Project narrow. Don’t drag 50 columns through a shuffle if you only need 4. select early.
  3. Broadcast small tables. A small dimension table joined to a big fact table doesn’t need a shuffle; it can be replicated to every executor in full. Lesson 28.
  4. Partition smartly upstream. If your input is already partitioned by the column you’re going to group on, Spark can skip the shuffle entirely. Lesson 36.
  5. Tune spark.sql.shuffle.partitions. Default 200 is wrong for almost every workload — too high for small data, too low for big data. Lesson 32.
  6. Watch out for skew. One hot key can wreck a shuffle that would otherwise be fine. Lesson 30 covers salting.

What the rest of the course is about

This is the moment to be honest with you: modules 5 and 6 of this course exist because of shuffles. Joins, partitioning, AQE, broadcast hints, salting, repartition vs coalesce, bucketing — every one of those topics is, at its root, a way to shape or avoid shuffles. If you understand today’s lesson well, every later optimization will feel like a logical step rather than a bag of tricks.

The mental model to walk away with:

Spark’s basic unit of pain is the shuffle. Every wide transformation triggers one. Each shuffle is map side serialize + write local disk, then fetch over network + deserialize + merge on the reduce side. Cost is roughly proportional to the bytes moved (after partial aggregation and compression). The Stages tab tells you how big each one is.

Run this on your own machine

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand

spark = (
    SparkSession.builder
        .appName("shuffle-demo")
        .config("spark.sql.shuffle.partitions", "16")
        .getOrCreate()
)

# 5M rows across 32 partitions, 1000 distinct keys
df = (
    spark.range(0, 5_000_000, numPartitions=32)
        .withColumn("k", (col("id") % 1000).cast("int"))
        .withColumn("v", rand() * 1000)
)

# Force materialization so we measure the shuffle, not the source read
df = df.cache()
df.count()

# 1) A groupBy: triggers ONE shuffle
df.groupBy("k").sum("v").count()

# 2) A distinct: also a shuffle
df.select("k").distinct().count()

# 3) A self-join on k: shuffle on BOTH sides
df.alias("a").join(df.alias("b"), "k").count()

input("Press Enter to exit (UI stays alive)... ")
spark.stop()

Open the UI’s Stages tab. For each of the three actions, find the stages where Shuffle Write and Shuffle Read are non-zero. Note the bytes shuffled. Compare the self-join (heaviest, two big shuffles in flight) to the simple groupBy (one shuffle, with partial aggregation collapsing it to almost nothing). That difference — between an aggregation that compresses well and a join that doesn’t — is the intuition that drives most of Spark performance work.

That’s the shuffle. Module 4 is now done; the next lesson opens module 5 with narrow vs wide transformations as a design principle — once you understand shuffles physically, you can start designing pipelines around them instead of just reacting to them.

Search