Every DataFrame operation you’ve written in this course has flowed through Catalyst. Every .filter, .join, .groupBy, every column expression, every Spark SQL string — Catalyst saw all of it, rewrote most of it, and decided how to actually run it. The reason DataFrame Spark is so much faster than RDD Spark is mostly Catalyst (with Tungsten doing the rest, which is the next lesson).
Up to now I’ve been hand-waving about “Spark figures out the right plan.” It’s time to look at how. Once you can read a query plan, you can predict performance from code, debug surprises, and write Spark with intention instead of hoping.
What Catalyst is
Catalyst is Spark’s query optimizer, written in Scala and shipped as part of spark-catalyst. It’s the layer between “your DataFrame code” and “tasks running on executors.” Every time you call an action — .show(), .count(), .write() — Catalyst takes the operations you’ve described, transforms them through four phases, and hands the result to Spark’s execution engine.
The phases:
- Parsed logical plan — your code, syntactically valid, no semantic checks yet.
- Analyzed logical plan — column references resolved against the schema; types checked; everything is concrete.
- Optimized logical plan — rule-based rewrites: predicate pushdown, column pruning, constant folding, projection collapse, join reordering, dozens more.
- Physical plan — choices of actual execution strategy: which join algorithm, which scan implementation, which exchange strategy.
In modern Spark there’s also a runtime layer on top of phase 4 — Adaptive Query Execution — that adjusts the physical plan mid-query based on observed shuffle statistics. We’ll get there at the end.
The whole pipeline runs every single time you trigger an action, and the cost is negligible compared to actually running the query.
A small query and its plans
Let’s take something concrete. Two tables, a filter, an aggregation, a join.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("CatalystDemo").master("local[*]").getOrCreate()
orders = spark.createDataFrame(
[(1, 100, 50.0, "2024-01-01"),
(2, 100, 75.0, "2024-01-02"),
(3, 200, 30.0, "2024-01-03")],
["order_id", "user_id", "amount", "dt"]
)
users = spark.createDataFrame(
[(100, "alice", "IT"),
(200, "bob", "DE"),
(300, "carol", "FR")],
["user_id", "name", "country"]
)
q = (orders
.filter(F.col("amount") > 40)
.join(users, "user_id")
.groupBy("country")
.agg(F.sum("amount").alias("total"))
.filter(F.col("country") == "IT"))
q.explain(True)
.explain(True) prints all four phases. Let’s walk through what each one shows.
Phase 1: parsed logical plan
== Parsed Logical Plan ==
'Filter ('country = IT)
+- Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
+- Join Inner, (user_id#A = user_id#B)
:- Filter (amount#Y > 40)
: +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
+- LogicalRDD [user_id#B, name#U, country#X], false
This is your code, faithfully reflected as a tree. Note the apostrophes — 'Filter, 'country — those mark unresolved references. At parse time Spark doesn’t yet know whether country exists or what type it is. It’s just a name. The LogicalRDD leaves are placeholders for the input DataFrames.
This phase is about syntactic validity: did you use operations that exist, with the right arity? A typo in a method name fails here. A typo in a column name does not — that’s the next phase’s job.
Phase 2: analyzed logical plan
== Analyzed Logical Plan ==
country: string, total: double
Filter (country#X = IT)
+- Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
+- Project [user_id#A, order_id#W, amount#Y, dt#V, name#U, country#X]
+- Join Inner, (user_id#A = user_id#B)
:- Filter (amount#Y > 40)
: +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
+- LogicalRDD [user_id#B, name#U, country#X], false
The apostrophes are gone. Every column has a unique exprId (the #X, #Y numbers), which is how Catalyst tracks identity through later rewrites. The output schema is printed at the top.
The analyzer is also where you find out you’ve made mistakes. Reference a column that doesn’t exist, try to compare a string to a struct, alias to a name that already exists in scope — all of those errors come out of this phase, with the message “cannot resolve foo given input columns …”. When people complain that “Spark errors are unreadable,” they usually mean analyzer errors, and the trick is to read the plan to see where the resolution actually broke.
Phase 3: optimized logical plan
This is where Catalyst earns its keep:
== Optimized Logical Plan ==
Aggregate [country#X], [country#X, sum(amount#Y) AS total#Z]
+- Project [amount#Y, country#X]
+- Join Inner, (user_id#A = user_id#B)
:- Project [user_id#A, amount#Y]
: +- Filter ((isnotnull(amount#Y) AND (amount#Y > 40)) AND isnotnull(user_id#A))
: +- LogicalRDD [order_id#W, user_id#A, amount#Y, dt#V], false
+- Project [user_id#B, country#X]
+- Filter ((country#X = IT) AND isnotnull(country#X) AND isnotnull(user_id#B))
+- LogicalRDD [user_id#B, name#U, country#X], false
Compare it to the analyzed plan. Several things changed:
- The outer
Filter (country = IT)from the original code has been pushed down through the aggregation and the join, all the way to theusersscan. The optimizer noticed thatcountryonly comes fromusersand that filtering it before the join is logically equivalent and physically much cheaper. - Above each scan you now see
Projectnodes selecting only the columns the rest of the plan needs.order_id,dt,name— never referenced after the scan — get dropped immediately. This is column pruning. isnotnullfilters were added implicitly. Inner joins on null keys never match, so Catalyst inserts the null filter to reduce the join input. Free win.- The order of the original
filter(amount > 40)and the join was preserved here, but in larger plans Catalyst will reorder filters and projections to push as much work as possible toward the leaves.
The list of optimizer rules is long — PushDownPredicates, ColumnPruning, ConstantFolding, ReorderJoin, EliminateOuterJoin, CollapseProject, dozens more. You don’t need to memorize them. You need to recognize their effects in the optimized plan and trust that the optimizer is making sensible choices.
The cost-based piece (CBO), introduced in Spark 2.2, kicks in when you’ve collected statistics on a table (ANALYZE TABLE ... COMPUTE STATISTICS). With stats, Catalyst can estimate row counts at each node and pick the better join order, decide when a side is small enough to broadcast, choose the cheaper of two equivalent plans. Without stats it falls back to heuristics. Most production teams never run ANALYZE, which means most optimizer decisions are heuristic. Worth knowing — the CBO gets noticeably better when you give it stats.
Phase 4: physical plan
== Physical Plan ==
*(5) HashAggregate(keys=[country#X], functions=[sum(amount#Y)])
+- Exchange hashpartitioning(country#X, 200), ENSURE_REQUIREMENTS
+- *(4) HashAggregate(keys=[country#X], functions=[partial_sum(amount#Y)])
+- *(4) Project [amount#Y, country#X]
+- *(4) BroadcastHashJoin [user_id#A], [user_id#B], Inner, BuildRight
:- *(4) Project [user_id#A, amount#Y]
: +- *(4) Filter ((isnotnull(amount#Y) AND (amount#Y > 40)) AND isnotnull(user_id#A))
: +- *(4) Scan ExistingRDD[order_id#W, user_id#A, amount#Y, dt#V]
+- BroadcastExchange HashedRelationBroadcastMode(...), [plan_id=...]
+- *(2) Project [user_id#B, country#X]
+- *(2) Filter ((country#X = IT) AND isnotnull(country#X) AND isnotnull(user_id#B))
+- *(2) Scan ExistingRDD[user_id#B, name#U, country#X]
Each logical operator has been mapped to a physical implementation. Three things to read here:
Operator names tell you the algorithm. BroadcastHashJoin because users was small after the filter — Catalyst chose to broadcast it instead of shuffling. If both sides were big you’d see SortMergeJoin, with two Exchange hashpartitioning(...) nodes preceding it. HashAggregate is a hash-based GROUP BY; for ordered or rollup queries you sometimes see SortAggregate instead. BroadcastNestedLoopJoin is the antipattern — it shows up when you’ve written a join Catalyst can’t express any other way (a non-equi-join over big data, typically), and it almost always means you should rewrite.
Exchange nodes are the shuffle markers. Anywhere you see Exchange, Spark will write shuffle files and read them back. The HashAggregate here has an Exchange hashpartitioning(country, 200) between the partial and final aggregations — the standard two-phase aggregation pattern. Counting Exchange nodes is the fastest way to gauge how much shuffling a query will do.
*(N) prefixes indicate whole-stage codegen. That’s a Tungsten thing — next lesson.
PushedFilters shows up on file scans. When you read Parquet or another columnar format, you’ll see something like:
+- FileScan parquet warehouse.orders[user_id#A, amount#Y]
PushedFilters: [IsNotNull(amount), GreaterThan(amount,40)],
ReadSchema: struct<user_id:int,amount:double>
Those are predicates that got pushed all the way down into the file format reader. Parquet evaluates them while reading and skips entire row groups that can’t match. Combined with column pruning (ReadSchema shows only the columns you actually use), this is why Parquet is so much faster than CSV: not just compression, but the optimizer’s ability to read less data.
Reading plans in practice
.explain(True) shows all four phases and is what you want when something surprising is happening. .explain() (no argument) shows just the physical plan, which is most of what you need day to day. There’s also .explain("formatted"), which renders the physical plan as a numbered list with details broken out below — easier on the eyes for big plans, harder to scan quickly.
A workflow that tends to be useful when debugging a slow query:
- Run
.explain()and countExchangenodes. Each is a shuffle. Three or fewer is usually fine; six is suspicious; ten means something is off. - Look at the join algorithms. Any
BroadcastNestedLoopJoinis a red flag. - Check the leaf scans. Are the filters you wrote actually appearing in
PushedFilters? Is theReadSchemathe minimum set of columns you need? - If something looks wrong, swap to
.explain(True)and walk the optimized logical plan to see why the rewrite did or didn’t happen.
This is also the right moment to mention Adaptive Query Execution. AQE (on by default since Spark 3.2) adds a fifth phase that runs during execution: after each shuffle, Spark looks at actual partition sizes and can dynamically coalesce small partitions, switch a sort-merge join to a broadcast join if a side turned out smaller than expected, or split skewed partitions. AQE shows up in the plan as AdaptiveSparkPlan at the root and AQEShuffleRead nodes after exchanges. If your physical plan looks weird because it changed at runtime, that’s why.
Extension points
You can plug your own rules into Catalyst — Spark Session Extensions lets you register optimizer rules, planner strategies, parser extensions, and analysis rules at session start. Iceberg, Delta Lake, and Hudi all use this to inject their own rewrites for things like predicate pushdown into transaction logs. Writing your own is rare and advanced; mostly you’ll encounter this surface as a user when a data-source library tells you to add a config like spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension. Now you know what that’s doing.
What to remember
Catalyst rewrites your code through four phases, and .explain(True) is your window into all of them. Optimized plans tell you what Spark thought about your query; physical plans tell you what it will do. Read them often enough that the operator names become familiar. Once you can read a plan, the rest of Spark performance work — joins, partitioning, caching, all of Module 6 — fits together as choices that show up directly in the plan.
Next lesson: the layer below Catalyst. Tungsten — code generation, the binary memory format, and why DataFrame Spark is so much faster than the RDD it replaced.
References: “Deep Dive into Spark SQL’s Catalyst Optimizer” (https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html) and the Apache Spark SQL performance tuning guide (https://spark.apache.org/docs/latest/sql-performance-tuning.html). Retrieved 2026-05-01.