PySpark, dalle fondamenta Lezione 38 / 60

Window function: ranking, lag/lead, totali progressivi

Window.partitionBy().orderBy(), la famiglia delle window function e perché sono lo strumento secondo per utilità dopo groupBy.

Se groupBy è il primo strumento di potenza che ogni analista impara, le window function sono il secondo, e la maggior parte delle persone si ferma a metà strada nell’impararle, a metà di row_number(), senza mai diventare abbastanza sicura da ricorrervi quando sarebbero la risposta più pulita. Questa lezione è il programma di riabilitazione.

Una window function calcola un valore per ogni riga, guardando una finestra di righe correlate intorno a quella riga, senza collassarle. Quest’ultimo bit è tutto il punto. groupBy("country").agg(F.sum("total")) ti restituisce una riga per country. Una window function con la stessa partition ti dà lo stesso totale, accanto a ogni riga originale. Le righe di dettaglio sopravvivono.

Una volta interiorizzato ciò, ricadono fuori tre pattern: ranking (qual è la riga più recente, le top tre, la mediana), confronto con un vicino (oggi vs ieri, questo acquisto vs il precedente) e totali progressivi (revenue cumulativa, medie mobili). Tre pattern, un operatore, applicabile nell’80% delle query analitiche.

La window spec

Una window function ha bisogno di una window spec: cos’è la finestra, per una data riga. La spec ha due pezzi:

from pyspark.sql import Window
from pyspark.sql import functions as F

w = Window.partitionBy("user_id").orderBy("event_time")

partitionBy dice: le righe si dividono in gruppi per user_id e la finestra non attraversa mai i gruppi. Pensalo come un GROUP BY per la finestra: ogni utente ha la sua window privata. orderBy dice: dentro una partition le righe sono ordinate per event_time, così che concetti come “la riga precedente” e “totale progressivo fino a qui” abbiano un significato.

Poi applichi una funzione con .over(w):

df.withColumn("rn", F.row_number().over(w))

Ottieni una nuova colonna con il risultato della funzione calcolato per riga, contro la window definita da w. Il DataFrame ha ancora lo stesso numero di righe: niente viene collassato.

Due cose da ricordare:

  • partitionBy è la colonna equivalente allo shuffle. Spark deve portare tutte le righe per un dato utente sullo stesso executor per calcolare la window. E’ una wide transformation. Scegli una colonna di partition che scali: non fare partitionBy su qualcosa con due valori distinti su una tabella da un miliardo di righe; una di quelle due partition diventa una hot key.
  • Senza partitionBy, l’intero dataset è una sola finestra. A volte è quello che vuoi (un totale progressivo globale). Piu’ spesso è un bug. Esegui .explain() e vedrai un WindowExec senza partition key: ha girato su un singolo executor, e te ne accorgerai.

Il catalogo delle window function

C’è un piccolo zoo di funzioni progettate per essere usate sopra una window. Memorizza questa lista una volta e il resto è composizione.

Ranking e numerazione.

  • F.row_number() — 1, 2, 3, … per partition, senza pareggi. Due righe con valori di sort identici ricevono comunque numeri sequenziali.
  • F.rank() — 1, 2, 2, 4, … i pareggi ottengono lo stesso rank, e il rank successivo salta. Ranking standard “olimpico”.
  • F.dense_rank() — 1, 2, 2, 3, … i pareggi ottengono lo stesso rank, senza salti.
  • F.percent_rank() — rank relativo in [0, 1], utile per query di tipo percentile.
  • F.ntile(n) — divide ogni partition in n gruppi di dimensione approssimativamente uguale. ntile(4) su revenue dà i quartili.

Vicini.

  • F.lag(col, offset) — valore di col dalla riga di offset posizioni prima nella partition.
  • F.lead(col, offset) — valore di col dalla riga di offset posizioni dopo.
  • Entrambi accettano un default opzionale per quando la riga di offset non esiste.

Aggregati. Ogni aggregato (F.sum, F.avg, F.count, F.max, F.min, F.collect_list, …) diventa una window function quando viene chiamato tramite .over(...). Sum su una window è un totale progressivo; avg su un frame di sette righe è una media mobile; max su una window unbounded è il massimo progressivo.

First e last.

  • F.first(col) e F.last(col) su una window — valore di col alla prima/ultima riga del (frame della) window. Utile per “session start time” o “ultimo status”.

Frame spec

Ogni window function ha un frame implicito: quali righe della partition entrano nella funzione per la riga corrente. Le funzioni di ranking hanno un frame fisso. Aggregati e first/last rispettano un frame esplicito:

running = Window.partitionBy("user_id").orderBy("event_time") \
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)

moving = Window.partitionBy("user_id").orderBy("event_time") \
               .rowsBetween(-3, 3)   # 7-row centered window

trailing = Window.partitionBy("user_id").orderBy("event_time") \
                 .rowsBetween(-6, 0)  # last 7 rows including this one

I due endpoint sono Window.unboundedPreceding, interi (conteggi di righe), Window.currentRow e Window.unboundedFollowing. Esiste anche rangeBetween se vuoi che i bound siano espressi in termini del valore della colonna di ordinamento (ad esempio “ultimi 7 giorni” invece di “ultime 7 righe”), al costo di una gestione un po’ più attenta.

Frame di default:

  • Per le funzioni di ranking: irrilevante.
  • Per gli aggregati con orderBy: unboundedPreceding a currentRow, cioè un totale progressivo. Questo frega tutti. Se fai F.sum("total").over(Window.partitionBy("user_id").orderBy("event_time")) ottieni una somma progressiva, non il totale della partition. Per ottenere il totale della partition, ometti l’orderBy o imposta il frame a unboundedPreceding, unboundedFollowing.
# Running total per user
running_w = Window.partitionBy("user_id").orderBy("event_time")
df.withColumn("running", F.sum("total").over(running_w))

# Total per user, attached to every row
total_w = Window.partitionBy("user_id")
df.withColumn("user_total", F.sum("total").over(total_w))

Quella differenza, orderBy o no orderBy, morde tutti almeno una volta. Vale la pena memorizzarla.

Pattern 1: ultimo record per chiave

Probabilmente il singolo pattern più richiesto in analytics: “dammi la riga più recente per utente”. I self-join sono la risposta sbagliata; le window function sono quella giusta.

events = spark.createDataFrame(
    [
        (1, "u1", "login",   "2024-03-15 09:00"),
        (2, "u1", "view",    "2024-03-15 09:30"),
        (3, "u1", "logout",  "2024-03-15 10:00"),
        (4, "u2", "login",   "2024-03-15 11:00"),
        (5, "u2", "view",    "2024-03-15 11:15"),
    ],
    "event_id INT, user_id STRING, action STRING, ts STRING",
)

w = Window.partitionBy("user_id").orderBy(F.col("ts").desc())

latest = (events
          .withColumn("rn", F.row_number().over(w))
          .filter(F.col("rn") == 1)
          .drop("rn"))

latest.show()
# u1 -> logout @ 10:00
# u2 -> view  @ 11:15

row_number() invece di rank() perché vogliamo esattamente una riga per utente anche se due eventi condividono lo stesso timestamp. Scegli un tiebreaker (event_id desc) se per i tuoi dati conta:

w = Window.partitionBy("user_id").orderBy(F.col("ts").desc(), F.col("event_id").desc())

Lo stesso pattern funziona per “top N per gruppo”: sostituisci == 1 con <= N.

Pattern 2: questa riga vs la riga precedente

lag e lead brillano quando devi calcolare delta, gap o transizioni.

prices = spark.createDataFrame(
    [
        ("AAPL", "2024-03-15", 170.0),
        ("AAPL", "2024-03-16", 172.5),
        ("AAPL", "2024-03-17", 168.0),
        ("MSFT", "2024-03-15", 410.0),
        ("MSFT", "2024-03-16", 415.0),
        ("MSFT", "2024-03-17", 420.0),
    ],
    "ticker STRING, dt STRING, close DOUBLE",
)

w = Window.partitionBy("ticker").orderBy("dt")

with_change = prices.withColumn("prev_close", F.lag("close", 1).over(w)) \
                    .withColumn("daily_return",
                                (F.col("close") - F.col("prev_close")) / F.col("prev_close"))

with_change.show()

La prima riga per ticker ha prev_close = null e daily_return = null, il che è corretto. Se ti serve un default, F.lag("close", 1, 0.0) lo riempie.

Variazioni: lag con offset = 7 per confronti settimana su settimana, lead per etichette “cosa succede dopo” nell’event analysis, lag combinato con un controllo di segno per trovare i cambi di direzione.

Pattern 3: aggregati progressivi e mobili

La revenue cumulativa è una sola window function:

sales = spark.createDataFrame(
    [
        ("IT", "2024-03-15", 100.0),
        ("IT", "2024-03-16", 50.0),
        ("IT", "2024-03-17", 75.0),
        ("NL", "2024-03-15", 200.0),
        ("NL", "2024-03-16", 80.0),
    ],
    "country STRING, dt STRING, revenue DOUBLE",
)

w = Window.partitionBy("country").orderBy("dt")

cumulative = sales.withColumn("running_revenue", F.sum("revenue").over(w))
cumulative.show()
# IT 03-15 100  -> 100
# IT 03-16  50  -> 150
# IT 03-17  75  -> 225
# NL 03-15 200  -> 200
# NL 03-16  80  -> 280

Una media mobile a 7 giorni è a un cambio di frame di distanza:

w7 = Window.partitionBy("country").orderBy("dt").rowsBetween(-6, 0)
sales.withColumn("ma7", F.avg("revenue").over(w7)).show()

Per le prime sei righe di ogni partition la window è più corta di sette (non può leggere righe che non esistono), quindi la media è calcolata su meno righe. Di solito è il comportamento giusto; se vuoi uno stretto “solo quando ne sono disponibili 7”, fai un filter successivo su una count window.

Note sulle performance

Una window function con partitionBy richiede uno shuffle: Spark deve raccogliere ogni riga per una data partition key sullo stesso executor prima di poter calcolare la finestra. Il modello mentale è lo stesso di groupBy: colonna di partition = colonna di shuffle, e partition key skewed producono job di window skewed. Il trucco del salting della lezione 29 si applica se hai un utente con 50 milioni di eventi.

Una window function senza partitionBy è peggio: ogni riga finisce su un singolo executor. Usala solo su dataset piccoli o per calcoli genuinamente globali (e anche allora, considera se hai davvero bisogno della vista globale).

Window multiple nello stesso select vanno bene. Spark riutilizza un singolo shuffle quando le partition key combaciano tra le window, anche se l’ordine o il frame differiscono:

w1 = Window.partitionBy("user_id").orderBy("ts")
w2 = Window.partitionBy("user_id").orderBy("ts").rowsBetween(-3, 3)

df.select(
    "*",
    F.row_number().over(w1).alias("rn"),
    F.avg("amount").over(w2).alias("ma"),
)

Uno shuffle, due window. Aggiungi una terza con partitionBy("country") e ottieni un secondo shuffle: Catalyst non può riutilizzare l’exchange con chiave per utente.

Se hai già affrontato il capitolo sulle windows function di SQL Server altrove in queste note, la semantica SQL è identica; l’API Window di PySpark è soltanto la versione builder della sintassi della clausola OVER (...). Impollinare in croce tra le due è utile per consolidarle entrambe.

Un’ultima nota sulle performance: window function e groupBy non sono sostituti. Se ti serve solo un aggregato per gruppo e non ti importa preservare le righe di dettaglio, usa groupBy: è più economico. L’optimizer può applicare aggregazione parziale su ogni partition di input prima dello shuffle, mandando attraverso la rete piccoli riassunti per partition invece di set di righe completi. Una window function deve spedire ogni riga al suo executor di destinazione perché la funzione potrebbe essere row_number o lag, dove l’aggregazione parziale non ha significato. Catalyst gioca sul sicuro e fa lo shuffle delle righe intere. Quindi: groupBy quando puoi, window quando ti serve l’output per riga.

Esegui questo sulla tua macchina

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("WindowsDemo")
         .master("local[*]")
         .getOrCreate())

events = spark.createDataFrame(
    [
        (1, "u1", "login",  "2024-03-15 09:00", 0.0),
        (2, "u1", "buy",    "2024-03-15 09:30", 25.0),
        (3, "u1", "buy",    "2024-03-15 10:00", 40.0),
        (4, "u1", "logout", "2024-03-15 10:30", 0.0),
        (5, "u2", "login",  "2024-03-15 11:00", 0.0),
        (6, "u2", "buy",    "2024-03-15 11:15", 60.0),
        (7, "u2", "buy",    "2024-03-15 12:00", 30.0),
    ],
    "event_id INT, user_id STRING, action STRING, ts STRING, amount DOUBLE",
)

w = Window.partitionBy("user_id").orderBy("ts")

enriched = (events
    .withColumn("rn",        F.row_number().over(w))
    .withColumn("rk",        F.rank().over(w))
    .withColumn("prev_amt",  F.lag("amount", 1).over(w))
    .withColumn("next_act",  F.lead("action", 1).over(w))
    .withColumn("running",   F.sum("amount").over(w))
    .withColumn("user_total",
                F.sum("amount").over(Window.partitionBy("user_id"))))

enriched.show(truncate=False)

# Latest per user
w_desc = Window.partitionBy("user_id").orderBy(F.col("ts").desc())
events.withColumn("rn", F.row_number().over(w_desc)) \
      .filter(F.col("rn") == 1) \
      .drop("rn") \
      .show()

Leggi l’output colonna per colonna. rn è denso, rk lo segue perché non ci sono pareggi. prev_amt è null sulla prima riga per utente. running cresce; user_total è lo stesso per ogni riga in una partition. Quel singolo output inquadra tutto il modello mentale.

La prossima lezione passa dalle operazioni a livello di riga alle trasformazioni di forma: pivot per allargare i dati, unpivot per allungarli, e il trucco che viveva dentro selectExpr("stack(...)") negli anni prima che Spark 3.4 aggiungesse melt.


Riferimenti: documentazione delle window function di Apache Spark (https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) e l’API pyspark.sql.Window. Recuperati il 2026-05-01.

Cerca