PySpark, from the ground up Lesson 59 / 60

Adaptive Query Execution: Spark 3.x's killer feature

Dynamic partition coalescing, runtime skew handling, and join strategy switching — the configs to know and the cases AQE still can't help.

For most of Spark’s history, the optimizer worked the way every relational optimizer does: read the query, look at table statistics, plan the whole thing up front, run it. If the stats were wrong — say, you’re joining a table that came out of a previous Spark stage with no real stats at all — the plan was wrong, and you ate the cost.

Spark 3.0 introduced Adaptive Query Execution (AQE) and changed that. AQE rewrites the physical plan at runtime, after early stages complete and Spark has actual sizes instead of guesses. By Spark 3.2 it was on by default. By 3.5 it was load-bearing for half the optimizations the docs recommend. If you’re on a recent Spark and you’re not using AQE, you’re leaving 30-50% of the optimizer on the table.

This lesson is what AQE actually does, the configs that control it, and where it still can’t help you.

The three things AQE does

AQE is a collection of runtime optimizations, but practically you care about three.

1. Dynamic partition coalescing

This is the one most jobs benefit from immediately.

Without AQE: you set spark.sql.shuffle.partitions = 200. Spark always shuffles into 200 partitions. If your post-shuffle data is 4 GB total, that’s 20 MB per partition — fine. If it’s 40 MB total, that’s 200 KB per partition, and you have 200 tasks of overhead processing nearly empty partitions. If it’s 400 GB, you have 200 partitions of 2 GB each — heavy for one task.

The 200 number is a guess made before Spark knew anything about the data.

With AQE: after the shuffle, Spark looks at how big each output partition actually is, and coalesces small adjacent ones into bigger ones. The 200-partition guess becomes 8 partitions of 50 MB if that’s what the data calls for. Less overhead, fewer tiny tasks, faster job.

You almost never need to call coalesce() manually after a groupBy anymore. AQE handles it.

# Configs that drive coalescing
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.coalescePartitions.minPartitionNum": "1",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "64m",

The advisoryPartitionSizeInBytes is what AQE aims for after coalescing. 64 MB is the default; a lot of teams bump it to 128 MB or 256 MB for batch jobs to reduce task count further. Don’t go above what fits comfortably in one task’s working set.

2. Skew join handling

The second thing AQE does is detect skewed partitions in joins at runtime and split them.

Without AQE: a SortMergeJoin on a key where 80% of rows have country = 'US' produces one giant partition processed by one task — the classic skew tail. You sit and watch one task run for 40 minutes while the other 199 finished in 10 seconds. Salting (lesson 29) was the manual fix.

With AQE: Spark notices that one shuffled partition is more than skewedPartitionFactor × the median (default 5x) and larger than skewedPartitionThresholdInBytes (default 256 MB). When both conditions hold, Spark splits that fat partition into smaller pieces, replicates the matching rows from the other side, and runs them in parallel. Automated salting for the easy cases, no code change required.

"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.sql.adaptive.skewJoin.skewedPartitionFactor": "5",
"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes": "256MB",

If you’re still hitting skew with these on, lower the factor (try 3) or the threshold (try 64 MB) — AQE will split more aggressively. The tradeoff is that aggressive splitting on a non-skewed dataset adds overhead, so don’t push it without a measured win.

3. Join strategy switching

The third one is subtler but pays off on big jobs. Spark planned a SortMergeJoin because at planning time both sides looked too big to broadcast. But maybe the build side comes out of an earlier filter that drops 99% of the rows. At planning time Spark estimated 5 GB; at runtime it’s actually 30 MB. With AQE, Spark notices and switches the join to a BroadcastHashJoin — much cheaper.

This is enabled by default with AQE. The config is spark.sql.adaptive.localShuffleReader.enabled (true), which lets the broadcast side read shuffle files locally without re-shuffling the other side. The spark.sql.autoBroadcastJoinThreshold still applies — runtime sizes below the threshold flip the join.

How AQE actually works under the hood (briefly)

Worth understanding once, because it explains the limits.

A Spark plan is a tree of stages separated by shuffles. Without AQE, Spark plans the whole tree up front and runs it. With AQE, Spark plans up to the first shuffle boundary, runs that, measures the actual output sizes, and then re-plans the rest of the tree using those real numbers instead of the optimizer’s estimates. It does that recursively at every shuffle boundary.

That re-planning is where the magic happens. With actual sizes in hand, the optimizer can:

  • See that a “big” side is actually small and switch to broadcast.
  • See that a partition is skewed and split it.
  • See that the post-shuffle data is small and coalesce partitions.

The two consequences of this design:

  • AQE only helps at shuffle boundaries. No shuffle, no measurement, no re-plan. This is why source-side skew without a shuffle can’t benefit (more on that below).
  • AQE adds a small planning overhead per stage. For very fast queries on tiny inputs, the overhead can outweigh the benefit. For everything else, it’s negligible compared to the runtime savings.

You can see the boundary in the SQL plan: every shuffle is wrapped in a ShuffleQueryStage, and AQE inserts itself between stages.

A real before/after

A small but realistic example. Read a fact table, filter to one customer, join to a dim table. Without AQE the filter doesn’t change the join strategy and you’re still in SortMergeJoin land:

spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "200")

orders = spark.read.parquet("s3://lake/orders/")          # 80 GB
customers = spark.read.parquet("s3://lake/customers/")    # 12 GB

orders.filter("customer_id = 'cust_42'") \
      .join(customers, "customer_id") \
      .groupBy("region") \
      .sum("amount") \
      .write.parquet("s3://out/cust42-by-region/")

That filter knocks the orders side down to ~80 MB. But Spark planned the join when both sides were big, so you get 200 reducer tasks for ~80 MB of data joined to 12 GB. Wall clock: 14 minutes.

Flip AQE on, same job:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

What changes:

  • After the filter shuffle, AQE sees the orders side is 80 MB and switches the plan to broadcast-hash-join the orders side into customers (the small side flipped — it’s now the filtered orders).
  • The post-join groupBy shuffle has 200 partitions but ~150 KB each. AQE coalesces to 4-8 partitions.

Wall clock: 90 seconds. Same code, same data, one config flip.

You won’t always see 10x. But you’ll often see 2-3x on real workloads, and AQE almost never makes a job slower — at worst it’s neutral on already-optimal queries.

Reading AQE-rewritten plans

The SQL tab in the UI shows the runtime plan. You’ll see operators like AdaptiveSparkPlan, CustomShuffleReader (coalesced or local), and ShuffleQueryStage. There’s an “Initial Plan” view and a “Final Plan” view; the diff between them is exactly what AQE rewrote.

A few patterns to recognize:

  • CustomShuffleReader coalesced → coalescing happened, fewer partitions than originally planned.
  • BroadcastHashJoin next to a ShuffleQueryStage → AQE flipped a SortMergeJoin to broadcast at runtime.
  • Multiple CustomShuffleReader reads of the same shuffle, with Skewed=true annotations → AQE handled skew.

If you don’t see these annotations on a query you expected AQE to optimize, check that AQE is actually on. spark.conf.get("spark.sql.adaptive.enabled") should return "true". Some platforms ship with it disabled in the cluster default — Databricks has it on, vanilla open-source Spark < 3.2 has it off.

Where AQE still can’t help

AQE is great. It’s not magic. It can’t help you with:

Skew on the source side, no shuffle. If your skewed key is in the input file (one Parquet file has 90% of the rows for country='US' because of how it was written), and your query doesn’t shuffle, AQE has nothing to react to. The fix is to repartition the source on read, or fix the upstream write, or filter the hot key out and process it separately. Lesson 28 still applies.

Broadcast joins. Once a join is a broadcast, there’s no shuffle to inspect. If the broadcast side itself has skew that explodes on the join (a one-to-many where one key matches 80% of the build side), AQE can’t intervene. You need a different join strategy, or the salting from lesson 29.

Skew on derived columns. If your join key is concat(country, city) and the skew is in country, AQE looks at the partition sizes of the concatenated key. It still works in most cases, but if the concat distributes the skew across multiple partitions evenly enough to escape the skewedPartitionFactor, AQE won’t split. You’ll need to inspect manually.

Tiny-partition coalescing into a memory-bound stage. AQE coalesces small partitions to reduce overhead. If the next stage’s per-task memory needs scale with partition size — a window function with a huge partition spec, an aggregation with high cardinality — coalescing to bigger partitions can push you into spill territory. Watch for spill in the next stage; if you see it after enabling AQE, drop advisoryPartitionSizeInBytes back down.

Streaming. AQE doesn’t apply to Structured Streaming queries. The streaming engine plans micro-batches differently and many AQE rewrites would break the incremental semantics. For streaming, you tune partitions explicitly.

What you should turn on, today

For batch jobs on Spark 3.2+, the safe default config is:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")

Most platforms have this on by default already. Verify with spark.conf.get. If you’re on a self-managed cluster running Spark 3.0 or 3.1, AQE was off by default — turn it on, run your jobs, watch them get faster.

The rare cases where you’d disable AQE: a very stable, hand-tuned pipeline where you’ve measured a regression, or a query where the runtime overhead of the AQE planner outweighs the benefit (very short queries on small inputs). Those are edge cases. The default answer is “leave it on.”

Tying it back to the debugging checklist

Lesson 58 was the diagnostic loop: find the slow stage, check skew, GC, shuffle volume, plan. AQE is the answer to “I have skew” and “my shuffle partitions are wrong” for many jobs. If you’re following the checklist and you keep finding skew, the first question is whether AQE is on. The second is whether it’s catching the skew (look at the SQL plan for skewedPartition annotations). The third — only if both are true — is whether you need manual salting on top.

Most days, AQE is enough. Most days you don’t have to think about it. That’s the point.

Next lesson is the course capstone: a 30-minute health check on a Spark cluster you’ve never seen. The closing sweep that pulls together everything we’ve covered. Bring coffee.

Search