In single-machine SQL, ORDER BY is cheap and you don’t think about it. Click “sort by date desc” on a million-row table and SQL Server does its thing in under a second. In Spark, sorting a DataFrame across a cluster is the kind of operation that turns a 30-second job into a 15-minute one if you’re not paying attention.
This lesson is about why a global sort is expensive in a distributed engine, the sortWithinPartitions escape hatch for when you don’t actually need a global order, and the one optimizer trick that makes orderBy(...).limit(N) perfectly fine even though orderBy(...).collect() is brutal.
orderBy and sort are the same function
First, the easy bit: orderBy and sort are aliases. Pick one and stick with it. The rest of this lesson uses orderBy because it matches the SQL keyword.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, asc
spark = (SparkSession.builder
.appName("SortingAtScale")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# Ascending by default
orders.orderBy("Total").show()
# Descending — three equivalent forms
orders.orderBy(col("Total").desc()).show()
orders.orderBy(desc("Total")).show()
orders.sort(F.col("Total").desc()).show() # alias proof
Multi-key sorts work how you’d expect — left to right priority:
# By country, then biggest order first within country
orders.orderBy("Country", col("Total").desc()).show()
# +-------+----------+------+-------+
# |OrderId|CustomerId| Total|Country|
# +-------+----------+------+-------+
# | 1005| 3|199.00| DE|
# | 1003| 2|149.00| IT|
# | 1004| 2| 89.50| IT|
# | 1008| 2| 75.00| IT|
# | 1001| 1| 59.00| NL|
# | 1002| 1| 29.00| NL|
# | 1007| 1| 12.00| NL|
# | 1006| 4| 42.42| RO|
# +-------+----------+------+-------+
For NULLs, Spark defaults to NULLs-first on ascending and NULLs-last on descending. Override explicitly with col("x").asc_nulls_last() or .desc_nulls_first() when you care.
What “global sort” actually means in a cluster
Here’s the part that’s invisible until you .explain() a query: in a single-machine engine, sorting an array means “compare elements, swap, repeat.” In Spark, your data isn’t a single array — it’s spread across N partitions on M executors, and a global sort means partition 0 contains all the smallest elements, partition 1 the next smallest, and so on, with every partition internally sorted.
That’s a strong guarantee. Stronger than just “sort within each partition.” And achieving it requires two phases:
- Range partitioning. Spark can’t just hash-partition (that mixes large and small keys). It needs to figure out the boundaries — what’s the cutoff between partition 0 and partition 1? It does this by sampling the data, building a rough histogram, and computing range cuts. The sample step is itself work. Then every row is shuffled across the network to its target partition.
- Local sort. Once each partition holds the right range of values, Spark sorts inside each partition. This part is fast — it’s just sorting an in-memory chunk.
The expensive part is step 1: a sample-based shuffle. Every row crosses the network. Look:
orders.orderBy("Total").explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [Total#3 ASC NULLS FIRST], true, 0
# +- Exchange rangepartitioning(Total#3 ASC NULLS FIRST, 8), ...
# +- Scan ExistingRDD ...
Exchange rangepartitioning is the shuffle. That’s the expensive line. Every time you see orderBy in a Spark query, that line is going to be in the plan, and that’s the cost you’re paying.
orderBy(...).limit(N) is OK
Here’s the saving grace. The Catalyst optimizer recognizes the pattern orderBy(...).limit(N) and rewrites it into a top-K operation:
orders.orderBy(col("Total").desc()).limit(3).explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- TakeOrderedAndProject(limit=3, orderBy=[Total#3 DESC NULLS LAST], ...)
# +- Scan ExistingRDD ...
TakeOrderedAndProject is dramatically cheaper than a full sort. Each partition keeps only its top 3 locally, then ships those (3 × N partitions) to the driver, which does a final merge. No range partitioning, no global shuffle. Linear in the data, sub-second on multi-GB tables.
So orderBy(...).limit(100) is fine. The pattern “give me the 100 biggest orders” is one of those things that looks scary and isn’t.
What’s brutal: orderBy(...).collect(), or worse, orderBy(...).write.parquet(...). There’s no limit, so the optimizer can’t apply the trick. The full range-partition shuffle runs. On a small dataset you won’t notice. On 500GB you’ll notice for an hour. If you don’t need a globally sorted output, don’t ask for one.
sortWithinPartitions: the escape hatch
Sometimes you genuinely don’t need a global order — you just need each partition’s contents to be sorted. The classic case is writing partitioned output where each output file should be sorted internally (good for query engines that read the file later, good for compression, good for delta encoding):
# Write Parquet partitioned by country, with each file sorted by date inside
(orders
.sortWithinPartitions("OrderDate")
.write
.partitionBy("Country")
.parquet("./out/orders"))
sortWithinPartitions does no shuffle. It sorts each existing partition in place. The result: each partition is internally ordered, but partition 0 might contain values larger than partition 1’s. There is no global ordering. For most ETL outputs this is exactly what you want and 10x cheaper than orderBy.
Compare the plans:
orders.orderBy("Total").explain()
# Includes: Exchange rangepartitioning(...) ← network shuffle
orders.sortWithinPartitions("Total").explain()
# == Physical Plan ==
# *(1) Sort [Total#3 ASC NULLS FIRST], false, 0 ← false = local sort, no shuffle
# +- Scan ExistingRDD ...
The false after the sort key in Sort [..., false, 0] is the giveaway: it’s a partial (per-partition) sort, not a global one. No Exchange line above it. No network cost.
When to use which:
- Need to display “the top 10 by X” →
orderBy(col("x").desc()).limit(10). Optimizer handles it. - Writing files where consumers read in order →
sortWithinPartitions(...). No shuffle. - Writing files where the entire output must be globally sorted (rare) →
orderBy(...). Eat the cost. - Sorting before a window function or join → usually unnecessary, the operator does its own sort.
- “I just want it sorted because it looks nice when I print it” →
.show()doesn’t need sorting; if you’re displaying a small sample, just.show().
A common mistake: sorting before aggregating
# Pointless and expensive
(orders
.orderBy("Country")
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.show())
The orderBy does nothing for the result — groupBy will shuffle and rearrange everything anyway. You’ve added a full range-partition shuffle for no reason. If you want the output sorted, sort after aggregating:
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(col("revenue").desc())
.show())
The optimizer sometimes catches this and removes the redundant sort, but don’t rely on it. Be intentional about where in the pipeline the sort goes.
Sorting by a derived expression
You don’t have to sort by a literal column — any expression works:
# Sort by length of country code, then alphabetically
orders.orderBy(F.length("Country").desc(), "Country").show()
# Sort by computed revenue, descending
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(F.col("revenue").desc())
.show())
# Sort by month-of-year extracted from a date
orders.orderBy(F.month(F.to_date("OrderDate"))).show()
The expression is computed before the sort. Spark doesn’t materialize it as a permanent column unless you withColumn it first — it’s just used for the comparison. Useful when the sort key is “this column transformed somehow” and you don’t want the transformation to leak into your output schema.
A common trap: sorting by an expression that uses a non-deterministic function (F.rand(), F.current_timestamp()) gives you different orders on different runs. Usually that’s not what you want. If you genuinely want randomized order — for sampling, say — fix the seed: F.rand(seed=42).
Sort stability
A stable sort preserves the relative order of rows with equal keys. Python’s sorted is stable; Spark’s orderBy is not guaranteed to be. Two rows with the same Country value can come out in either order, and rerunning the same query may produce different orders.
This matters when you’re paginating. “Page 1 shows rows A B C D E. Click next. Page 2 shows D E F G H.” D and E appeared twice because the sort tied on Country and the engine picked a different internal order on the second call. The fix is the same as in SQL: always include a tiebreaker, ideally the primary key.
# Brittle: ties on Country leave order undefined
orders.orderBy("Country").show()
# Reproducible: ties broken by OrderId
orders.orderBy("Country", "OrderId").show()
Make this a reflex. Every orderBy in production code should end with a column that’s unique per row.
repartitionByRange — sorting’s invisible cousin
If you call df.repartitionByRange(8, "Total"), Spark does the range-partitioning step from a global sort without the per-partition sort step. The result: each partition holds a contiguous range of Total values, but rows inside a partition aren’t ordered. Combine it with sortWithinPartitions and you’ve manually rebuilt what orderBy does:
# Equivalent to orderBy("Total"), in two explicit steps
manual_sort = (orders
.repartitionByRange(8, "Total")
.sortWithinPartitions("Total"))
When would you bother? Almost never. The optimizer already handles orderBy correctly. The reason to know about repartitionByRange is when you’re writing range-partitioned Parquet or bucketed tables and want explicit control over how data is laid out across files. For day-to-day sorting, stick with orderBy.
asc_nulls_last, desc_nulls_first, and friends
NULLs need a position. Spark’s defaults: NULLs sort first on ascending, last on descending. That’s the SQL standard interpretation, and most of the time you want it. When you don’t:
df.orderBy(col("MaybeNull").asc_nulls_last())
df.orderBy(col("MaybeNull").desc_nulls_first())
The four combinations cover every reasonable case. Be explicit when correctness matters; the defaults are fine for exploratory work.
Run this on your own machine
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc
spark = (SparkSession.builder
.appName("SortingAtScale")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# 1. Plain orderBy — note the Exchange rangepartitioning in the plan
orders.orderBy(col("Total").desc()).explain()
orders.orderBy(col("Total").desc()).show()
# 2. orderBy + limit — top-K, no global shuffle
orders.orderBy(col("Total").desc()).limit(3).explain()
orders.orderBy(col("Total").desc()).limit(3).show()
# 3. sortWithinPartitions — no Exchange in the plan
orders.sortWithinPartitions("Total").explain()
# 4. Multi-key with tiebreaker
orders.orderBy("Country", col("Total").desc(), "OrderId").show()
# 5. The pointless-pre-aggregation anti-pattern
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(col("revenue").desc())
.show())
Run each one. The crucial habit: type .explain() before you .show() on anything that involves sorting. Look for the word Exchange. If it’s there, you’re paying for a shuffle. Decide whether you actually need to.
Next lesson: the everyday cleanup operators — renaming, dropping, casting columns. Half of any real ETL is reshaping a column name or fixing a string-that-should-be-an-int. We’ll do it properly. Then in lesson 25 we open the hood on shuffle itself and explain exactly what’s flying across the network.