If groupBy is the first power tool every analyst learns, window functions are the second — and most people stop somewhere in the middle of learning them, halfway through row_number(), never quite confident enough to reach for them when they’d be the cleanest answer. This lesson is the rehab program for that.
A window function computes a value for each row, looking at a window of related rows around it, without collapsing them. That last bit is the whole point. groupBy("country").agg(F.sum("total")) gives you one row per country. A window function with the same partition gives you the same total — next to every original row. The detail rows survive.
Once you internalize that, three patterns fall out: ranking (which row is the latest, the top three, the median), comparing to a neighbor (today vs yesterday, this purchase vs the previous one), and running totals (cumulative revenue, moving averages). Three patterns, one operator, applies in 80% of analytical queries.
The window spec
A window function needs a window spec — what the window is, for any given row. The spec has two pieces:
from pyspark.sql import Window
from pyspark.sql import functions as F
w = Window.partitionBy("user_id").orderBy("event_time")
partitionBy says: rows split into groups by user_id, and the window never crosses groups. Think of it like a GROUP BY for the window — each user gets their own private window. orderBy says: within a partition, rows are ordered by event_time, so concepts like “the previous row” and “running total up to here” have a meaning.
Then you apply a function with .over(w):
df.withColumn("rn", F.row_number().over(w))
You get a new column with the function’s result computed per row, against the window defined by w. The DataFrame still has the same number of rows — nothing is collapsed.
Two things to remember:
partitionByis the shuffle-equivalent column. Spark has to bring all rows for a given user onto the same executor to compute the window. This is a wide transformation. Pick a partition column that scales — don’tpartitionBysomething with two distinct values across a billion-row table; one of those two partitions becomes a hot key.- Without
partitionBy, the entire dataset is one window. Sometimes that’s what you want (a global running total). More often it’s a bug. Run.explain()and you’ll see aWindowExecwith no partition keys — it ran on a single executor, and you’ll feel it.
The catalog of window functions
There’s a small zoo of functions designed to be used over a window. Memorize this list once and the rest is composition.
Ranking and numbering.
F.row_number()— 1, 2, 3, … per partition, no ties. Two rows with identical sort values still get sequential numbers.F.rank()— 1, 2, 2, 4, … ties get the same rank, and the next rank skips. Standard “Olympic” ranking.F.dense_rank()— 1, 2, 2, 3, … ties get the same rank, no skips.F.percent_rank()— relative rank in [0, 1], useful for percentile-like queries.F.ntile(n)— bucket each partition intonroughly-equal-sized groups.ntile(4)over revenue gives quartiles.
Neighbors.
F.lag(col, offset)— value ofcolfromoffsetrows before in the partition.F.lead(col, offset)— value ofcolfromoffsetrows after.- Both take an optional default for when the offset row doesn’t exist.
Aggregates. Every aggregate (F.sum, F.avg, F.count, F.max, F.min, F.collect_list, …) becomes a window function when called via .over(...). Sum over a window is a running total; avg over a frame of seven rows is a moving average; max over an unbounded window is the running maximum.
First and last.
F.first(col)andF.last(col)over a window — value ofcolat the first/last row of the (frame of the) window. Useful for “session start time” or “latest status.”
Frame specs
Every window function has an implicit frame: which rows of the partition feed into the function for the current row. Ranking functions have a fixed frame. Aggregates and first/last honour an explicit one:
running = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
moving = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(-3, 3) # 7-row centered window
trailing = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(-6, 0) # last 7 rows including this one
The two endpoints are Window.unboundedPreceding, integers (row counts), Window.currentRow, and Window.unboundedFollowing. There’s also rangeBetween if you want the bounds expressed in terms of the order column’s value (e.g., “last 7 days” instead of “last 7 rows”), at the cost of slightly more careful handling.
Default frame:
- For ranking functions: irrelevant.
- For aggregates with
orderBy:unboundedPrecedingtocurrentRow— i.e., a running total. This catches people out. If you doF.sum("total").over(Window.partitionBy("user_id").orderBy("event_time"))you get a running sum, not the partition total. To get the partition total, omit theorderByor set the frame tounboundedPreceding, unboundedFollowing.
# Running total per user
running_w = Window.partitionBy("user_id").orderBy("event_time")
df.withColumn("running", F.sum("total").over(running_w))
# Total per user, attached to every row
total_w = Window.partitionBy("user_id")
df.withColumn("user_total", F.sum("total").over(total_w))
That difference — orderBy or no orderBy — bites everyone once. Worth memorizing.
Pattern 1: latest record per key
Probably the single most-asked-for pattern in analytics: “give me the most recent row per user.” Self-joins are the wrong answer; window functions are the right one.
events = spark.createDataFrame(
[
(1, "u1", "login", "2024-03-15 09:00"),
(2, "u1", "view", "2024-03-15 09:30"),
(3, "u1", "logout", "2024-03-15 10:00"),
(4, "u2", "login", "2024-03-15 11:00"),
(5, "u2", "view", "2024-03-15 11:15"),
],
"event_id INT, user_id STRING, action STRING, ts STRING",
)
w = Window.partitionBy("user_id").orderBy(F.col("ts").desc())
latest = (events
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn"))
latest.show()
# u1 -> logout @ 10:00
# u2 -> view @ 11:15
row_number() rather than rank() because we want exactly one row per user even if two events share the same timestamp. Pick a tiebreaker (event_id desc) if that matters for your data:
w = Window.partitionBy("user_id").orderBy(F.col("ts").desc(), F.col("event_id").desc())
Same pattern works for “top N per group”: replace == 1 with <= N.
Pattern 2: this row vs the previous row
lag and lead shine when you need to compute deltas, gaps, or transitions.
prices = spark.createDataFrame(
[
("AAPL", "2024-03-15", 170.0),
("AAPL", "2024-03-16", 172.5),
("AAPL", "2024-03-17", 168.0),
("MSFT", "2024-03-15", 410.0),
("MSFT", "2024-03-16", 415.0),
("MSFT", "2024-03-17", 420.0),
],
"ticker STRING, dt STRING, close DOUBLE",
)
w = Window.partitionBy("ticker").orderBy("dt")
with_change = prices.withColumn("prev_close", F.lag("close", 1).over(w)) \
.withColumn("daily_return",
(F.col("close") - F.col("prev_close")) / F.col("prev_close"))
with_change.show()
The first row per ticker has prev_close = null and daily_return = null, which is correct. If you need a default, F.lag("close", 1, 0.0) will fill it.
Variations: lag with offset = 7 for week-over-week comparisons, lead for “what happens next” labels in event analysis, lag combined with a sign check to find direction changes.
Pattern 3: running and moving aggregates
Cumulative revenue is one window function:
sales = spark.createDataFrame(
[
("IT", "2024-03-15", 100.0),
("IT", "2024-03-16", 50.0),
("IT", "2024-03-17", 75.0),
("NL", "2024-03-15", 200.0),
("NL", "2024-03-16", 80.0),
],
"country STRING, dt STRING, revenue DOUBLE",
)
w = Window.partitionBy("country").orderBy("dt")
cumulative = sales.withColumn("running_revenue", F.sum("revenue").over(w))
cumulative.show()
# IT 03-15 100 -> 100
# IT 03-16 50 -> 150
# IT 03-17 75 -> 225
# NL 03-15 200 -> 200
# NL 03-16 80 -> 280
A 7-day moving average is one frame change away:
w7 = Window.partitionBy("country").orderBy("dt").rowsBetween(-6, 0)
sales.withColumn("ma7", F.avg("revenue").over(w7)).show()
For the first six rows of each partition, the window is shorter than seven (it can’t read rows that don’t exist), so the average is over fewer rows. That’s usually the right behaviour; if you want strict “only when 7 are available,” follow up with a filter on a count window.
Performance notes
A window function with partitionBy requires a shuffle: Spark has to gather every row for a given partition key onto the same executor before it can compute the window. The mental model is the same as groupBy — partition column = shuffle column, and skewed partition keys produce skewed window jobs. The salting trick from lesson 29 applies if you have one user with 50 million events.
A window function without partitionBy is worse: every row goes to a single executor. Use it only on small datasets or for genuinely global computations (and even then, consider whether you need the global view at all).
Multiple windows in the same select are fine. Spark will reuse a single shuffle when the partition keys match across windows, even if the order or frame differs:
w1 = Window.partitionBy("user_id").orderBy("ts")
w2 = Window.partitionBy("user_id").orderBy("ts").rowsBetween(-3, 3)
df.select(
"*",
F.row_number().over(w1).alias("rn"),
F.avg("amount").over(w2).alias("ma"),
)
One shuffle, two windows. Add a third with partitionBy("country") and you get a second shuffle — Catalyst can’t reuse the user-keyed exchange.
If you’ve worked through the SQL Server windows-functions chapter elsewhere in these notes, the SQL semantics are identical; PySpark’s Window API is just the builder version of OVER (...) clause syntax. Cross-pollinating between the two is good for cementing both.
One more performance note: window functions and groupBy aren’t substitutes. If you only need an aggregate per group and don’t care about preserving the detail rows, use groupBy — it’s cheaper. The optimizer can apply partial aggregation on each input partition before the shuffle, sending small per-partition summaries across the network instead of full row sets. A window function has to ship every row to its destination executor because the function might be row_number or lag, where partial aggregation has no meaning. Catalyst plays it safe and shuffles the rows whole. So: groupBy when you can, window when you need the per-row output.
Run this on your own machine
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("WindowsDemo")
.master("local[*]")
.getOrCreate())
events = spark.createDataFrame(
[
(1, "u1", "login", "2024-03-15 09:00", 0.0),
(2, "u1", "buy", "2024-03-15 09:30", 25.0),
(3, "u1", "buy", "2024-03-15 10:00", 40.0),
(4, "u1", "logout", "2024-03-15 10:30", 0.0),
(5, "u2", "login", "2024-03-15 11:00", 0.0),
(6, "u2", "buy", "2024-03-15 11:15", 60.0),
(7, "u2", "buy", "2024-03-15 12:00", 30.0),
],
"event_id INT, user_id STRING, action STRING, ts STRING, amount DOUBLE",
)
w = Window.partitionBy("user_id").orderBy("ts")
enriched = (events
.withColumn("rn", F.row_number().over(w))
.withColumn("rk", F.rank().over(w))
.withColumn("prev_amt", F.lag("amount", 1).over(w))
.withColumn("next_act", F.lead("action", 1).over(w))
.withColumn("running", F.sum("amount").over(w))
.withColumn("user_total",
F.sum("amount").over(Window.partitionBy("user_id"))))
enriched.show(truncate=False)
# Latest per user
w_desc = Window.partitionBy("user_id").orderBy(F.col("ts").desc())
events.withColumn("rn", F.row_number().over(w_desc)) \
.filter(F.col("rn") == 1) \
.drop("rn") \
.show()
Read the output column by column. rn is dense, rk matches it because there are no ties. prev_amt is null on the first row per user. running grows; user_total is the same for every row in a partition. That single output frames the entire mental model.
Next lesson swaps from row-level operations to shape transformations — pivot to make data wide, unpivot to make it long, and the trick that lived inside selectExpr("stack(...)") for the years before Spark 3.4 added melt.
References: Apache Spark window function documentation (https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) and the pyspark.sql.Window API. Retrieved 2026-05-01.