In SQL su singola macchina, ORDER BY è economico e non ci pensi. Clicca “ordina per data desc” su una tabella da un milione di righe e SQL Server fa la sua cosa in meno di un secondo. In Spark, ordinare un DataFrame attraverso un cluster è il tipo di operazione che trasforma un job da 30 secondi in uno da 15 minuti se non stai attento.
Questa lezione tratta del perché un sort globale è costoso in un motore distribuito, della scappatoia sortWithinPartitions per quando non ti serve davvero un ordine globale, e dell’unico trucco dell’optimizer che rende orderBy(...).limit(N) perfettamente accettabile anche se orderBy(...).collect() è brutale.
orderBy e sort sono la stessa funzione
Prima la parte facile: orderBy e sort sono alias. Scegline uno e tienitelo. Il resto della lezione usa orderBy perché coincide con la keyword SQL.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, asc
spark = (SparkSession.builder
.appName("SortingAtScale")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "8")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# Ascending by default
orders.orderBy("Total").show()
# Descending -- three equivalent forms
orders.orderBy(col("Total").desc()).show()
orders.orderBy(desc("Total")).show()
orders.sort(F.col("Total").desc()).show() # alias proof
I sort multi-chiave funzionano come ti aspetteresti, priorità da sinistra a destra:
# By country, then biggest order first within country
orders.orderBy("Country", col("Total").desc()).show()
# +-------+----------+------+-------+
# |OrderId|CustomerId| Total|Country|
# +-------+----------+------+-------+
# | 1005| 3|199.00| DE|
# | 1003| 2|149.00| IT|
# | 1004| 2| 89.50| IT|
# | 1008| 2| 75.00| IT|
# | 1001| 1| 59.00| NL|
# | 1002| 1| 29.00| NL|
# | 1007| 1| 12.00| NL|
# | 1006| 4| 42.42| RO|
# +-------+----------+------+-------+
Per i NULL, Spark di default li mette per primi sull’ascending e per ultimi sul descending. Forza esplicitamente con col("x").asc_nulls_last() o .desc_nulls_first() quando ti interessa.
Cosa significa davvero “global sort” in un cluster
Ecco la parte invisibile finché non fai .explain() di una query: in un motore single-machine, ordinare un array significa “confronta elementi, scambia, ripeti”. In Spark i tuoi dati non sono un singolo array: sono distribuiti su N partizioni su M executor, e un sort globale significa la partizione 0 contiene tutti gli elementi più piccoli, la partizione 1 i successivi più piccoli, e così via, con ogni partizione internamente ordinata.
È una garanzia forte. Più forte del semplice “ordinato dentro ogni partizione”. E ottenerla richiede due fasi:
- Range partitioning. Spark non può semplicemente fare hash-partition (quello mescola chiavi grandi e piccole). Deve capire i confini: qual è il taglio tra la partizione 0 e la partizione 1? Lo fa campionando i dati, costruendo un istogramma approssimativo e calcolando i tagli di range. Lo step di campionamento è lavoro in sé. Poi ogni riga viene shuffled attraverso la rete fino alla sua partizione di destinazione.
- Local sort. Una volta che ogni partizione contiene il range giusto di valori, Spark ordina dentro ogni partizione. Questa parte è veloce: è solo ordinare un chunk in memoria.
La parte costosa è lo step 1: uno shuffle basato su campione. Ogni riga attraversa la rete. Guarda:
orders.orderBy("Total").explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [Total#3 ASC NULLS FIRST], true, 0
# +- Exchange rangepartitioning(Total#3 ASC NULLS FIRST, 8), ...
# +- Scan ExistingRDD ...
Exchange rangepartitioning è lo shuffle. Quella è la riga costosa. Ogni volta che vedi orderBy in una query Spark, quella riga sarà nel piano, ed è il costo che stai pagando.
orderBy(...).limit(N) va bene
Ecco la salvezza. L’optimizer Catalyst riconosce il pattern orderBy(...).limit(N) e lo riscrive in un’operazione top-K:
orders.orderBy(col("Total").desc()).limit(3).explain()
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- TakeOrderedAndProject(limit=3, orderBy=[Total#3 DESC NULLS LAST], ...)
# +- Scan ExistingRDD ...
TakeOrderedAndProject è drasticamente più economico di un sort completo. Ogni partizione tiene solo la sua top 3 in locale, poi spedisce quelle (3 x N partizioni) al driver, che fa un merge finale. Niente range partitioning, niente shuffle globale. Lineare nei dati, sotto il secondo su tabelle multi-GB.
Quindi orderBy(...).limit(100) va bene. Il pattern “dammi i 100 ordini più grandi” è una di quelle cose che sembra spaventosa ma non lo è.
Quello che è brutale: orderBy(...).collect(), o peggio, orderBy(...).write.parquet(...). Non c’è limit, quindi l’optimizer non può applicare il trucco. Lo shuffle di range-partition completo gira. Su un dataset piccolo non te ne accorgi. Su 500GB te ne accorgi per un’ora. Se non ti serve un output ordinato globalmente, non chiederlo.
sortWithinPartitions: la scappatoia
A volte non ti serve davvero un ordine globale: ti serve solo che il contenuto di ogni partizione sia ordinato. Il caso classico è scrivere output partizionato dove ogni file di output dovrebbe essere ordinato internamente (utile per i query engine che leggono il file dopo, utile per la compressione, utile per il delta encoding):
# Write Parquet partitioned by country, with each file sorted by date inside
(orders
.sortWithinPartitions("OrderDate")
.write
.partitionBy("Country")
.parquet("./out/orders"))
sortWithinPartitions non fa alcuno shuffle. Ordina ogni partizione esistente sul posto. Il risultato: ogni partizione è internamente ordinata, ma la partizione 0 potrebbe contenere valori più grandi di quelli della partizione 1. Non c’è alcun ordinamento globale. Per la maggior parte degli output ETL è esattamente quello che vuoi e 10 volte più economico di orderBy.
Confronta i piani:
orders.orderBy("Total").explain()
# Includes: Exchange rangepartitioning(...) <- network shuffle
orders.sortWithinPartitions("Total").explain()
# == Physical Plan ==
# *(1) Sort [Total#3 ASC NULLS FIRST], false, 0 <- false = local sort, no shuffle
# +- Scan ExistingRDD ...
Il false dopo la chiave di sort in Sort [..., false, 0] è la spia: è un sort parziale (per partizione), non globale. Niente riga Exchange sopra. Nessun costo di rete.
Quando usare cosa:
- Mostrare “i top 10 per X” ->
orderBy(col("x").desc()).limit(10). L’optimizer se ne occupa. - Scrivere file dove i consumer leggono in ordine ->
sortWithinPartitions(...). Niente shuffle. - Scrivere file dove l’intero output deve essere globalmente ordinato (raro) ->
orderBy(...). Mangia il costo. - Ordinare prima di una window function o di una join -> di solito non necessario, l’operatore fa il suo sort.
- “Lo voglio ordinato solo perché fa carino quando lo stampo” ->
.show()non ha bisogno di ordinamento; se stai mostrando un piccolo sample, basta.show().
Un errore comune: ordinare prima di aggregare
# Pointless and expensive
(orders
.orderBy("Country")
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.show())
L’orderBy non fa niente per il risultato: groupBy farà shuffle e riarrangerà tutto comunque. Hai aggiunto uno shuffle di range-partition completo per niente. Se vuoi l’output ordinato, ordina dopo aver aggregato:
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(col("revenue").desc())
.show())
L’optimizer a volte se ne accorge e rimuove il sort ridondante, ma non farci affidamento. Sii intenzionale su dove nella pipeline va il sort.
Ordinare per un’espressione derivata
Non devi ordinare per una colonna letterale: qualsiasi espressione funziona:
# Sort by length of country code, then alphabetically
orders.orderBy(F.length("Country").desc(), "Country").show()
# Sort by computed revenue, descending
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(F.col("revenue").desc())
.show())
# Sort by month-of-year extracted from a date
orders.orderBy(F.month(F.to_date("OrderDate"))).show()
L’espressione viene calcolata prima del sort. Spark non la materializza come colonna permanente a meno che tu non faccia prima withColumn: viene usata solo per il confronto. Utile quando la chiave di sort è “questa colonna trasformata in qualche modo” e non vuoi che la trasformazione si infili nello schema di output.
Una trappola comune: ordinare per un’espressione che usa una funzione non deterministica (F.rand(), F.current_timestamp()) ti dà ordini diversi su run diverse. Di solito non è quello che vuoi. Se vuoi davvero un ordine random (per il sampling, diciamo), fissa il seed: F.rand(seed=42).
Stabilità del sort
Un sort stabile preserva l’ordine relativo delle righe con chiavi uguali. La sorted di Python è stabile; l’orderBy di Spark non è garantito che lo sia. Due righe con lo stesso valore di Country possono uscire in qualsiasi ordine, e rieseguire la stessa query può produrre ordini diversi.
Conta quando paginate. “Pagina 1 mostra le righe A B C D E. Click next. Pagina 2 mostra D E F G H.” D ed E sono apparse due volte perché il sort ha avuto un pareggio su Country e il motore ha scelto un ordine interno diverso alla seconda chiamata. La fix è la stessa di SQL: includi sempre un tiebreaker, idealmente la chiave primaria.
# Brittle: ties on Country leave order undefined
orders.orderBy("Country").show()
# Reproducible: ties broken by OrderId
orders.orderBy("Country", "OrderId").show()
Falla diventare un riflesso. Ogni orderBy in codice di produzione dovrebbe terminare con una colonna che è unica per riga.
repartitionByRange: la cugina invisibile dell’ordinamento
Se chiami df.repartitionByRange(8, "Total"), Spark fa lo step di range-partitioning di un sort globale senza lo step di sort per partizione. Risultato: ogni partizione contiene un range contiguo di valori Total, ma le righe dentro una partizione non sono ordinate. Combinala con sortWithinPartitions e hai ricostruito manualmente ciò che fa orderBy:
# Equivalent to orderBy("Total"), in two explicit steps
manual_sort = (orders
.repartitionByRange(8, "Total")
.sortWithinPartitions("Total"))
Quando ti scomoderesti? Quasi mai. L’optimizer già gestisce orderBy correttamente. Il motivo per conoscere repartitionByRange è quando scrivi Parquet range-partizionato o tabelle bucketed e vuoi controllo esplicito su come i dati sono disposti tra i file. Per il sort di tutti i giorni, attieniti a orderBy.
asc_nulls_last, desc_nulls_first e amici
I NULL hanno bisogno di una posizione. I default di Spark: NULL prima sull’ascending, ultimi sul descending. È l’interpretazione standard di SQL e la maggior parte delle volte è quello che vuoi. Quando non lo è:
df.orderBy(col("MaybeNull").asc_nulls_last())
df.orderBy(col("MaybeNull").desc_nulls_first())
Le quattro combinazioni coprono ogni caso ragionevole. Sii esplicito quando la correttezza conta; i default vanno bene per il lavoro esplorativo.
Esegui questo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc
spark = (SparkSession.builder
.appName("SortingAtScale")
.master("local[*]")
.getOrCreate())
orders = spark.createDataFrame(
[
(1001, 1, 59.00, "NL"), (1002, 1, 29.00, "NL"),
(1003, 2, 149.00, "IT"), (1004, 2, 89.50, "IT"),
(1005, 3, 199.00, "DE"), (1006, 4, 42.42, "RO"),
(1007, 1, 12.00, "NL"), (1008, 2, 75.00, "IT"),
],
"OrderId INT, CustomerId INT, Total DOUBLE, Country STRING",
)
# 1. Plain orderBy -- note the Exchange rangepartitioning in the plan
orders.orderBy(col("Total").desc()).explain()
orders.orderBy(col("Total").desc()).show()
# 2. orderBy + limit -- top-K, no global shuffle
orders.orderBy(col("Total").desc()).limit(3).explain()
orders.orderBy(col("Total").desc()).limit(3).show()
# 3. sortWithinPartitions -- no Exchange in the plan
orders.sortWithinPartitions("Total").explain()
# 4. Multi-key with tiebreaker
orders.orderBy("Country", col("Total").desc(), "OrderId").show()
# 5. The pointless-pre-aggregation anti-pattern
(orders
.groupBy("Country")
.agg(F.sum("Total").alias("revenue"))
.orderBy(col("revenue").desc())
.show())
Eseguili una a una. L’abitudine cruciale: digita .explain() prima di .show() su qualsiasi cosa che coinvolga ordinamento. Cerca la parola Exchange. Se c’è, stai pagando uno shuffle. Decidi se davvero ti serve.
Prossima lezione: gli operatori di pulizia di tutti i giorni. Rinominare, droppare, fare cast delle colonne. Metà di qualsiasi ETL reale è rimettere a posto un nome di colonna o sistemare una stringa-che-dovrebbe-essere-int. Lo faremo per bene. Poi nella lezione 25 apriamo il cofano sullo shuffle in sé e spieghiamo esattamente cosa sta volando attraverso la rete.