Quasi ogni programma PySpark è costruito a partire da due verbi: prendi qualche colonna, tieni qualche riga. Tutto il resto, join, aggregazioni, window, UDF, è decorazione che ci sta sopra. Quindi prima di arrivare a qualunque di quelle decorazioni, dovremmo essere molto a nostro agio con i due verbi base.
In PySpark questi sono select (prendi colonne) e filter/where (tieni righe). Non sono esotici; se hai scritto SQL, il modello mentale si trasferisce in modo pulito. La parte interessante è come ti riferisci a una colonna, perché PySpark ti dà quattro modi diversi di farlo, e solo tre di questi sopravvivono al contatto con dati del mondo reale.
select: proiettare colonne
select accetta un numero qualsiasi di riferimenti a colonne e restituisce un nuovo DataFrame con solo quelle colonne:
df.select("order_id", "amount").show(5)
Puoi passare stringhe, oggetti colonna, o espressioni. Si mescolano liberamente:
from pyspark.sql.functions import col, upper
df.select(
"order_id",
upper(col("country")).alias("country_code"),
(col("amount") * 1.22).alias("amount_with_vat"),
).show(5)
Lo schema di output è esattamente le colonne che hai chiesto, nell’ordine in cui le hai chieste. Nessuna sorpresa.
select è una trasformazione, non un’azione: non fa girare niente finché non lo inneschi (show, collect, write). Tutto quello che costruisce è un nodo del piano lazy che dice “quando qualcuno alla fine mi esegue, proietta queste colonne”.
where e filter: tenere righe
where e filter sono alias. Stesso metodo, due nomi. Entrambi prendono un’espressione di colonna booleana e tengono solo le righe dove l’espressione è vera:
df.filter(col("amount") > 100).show(5)
df.where(col("amount") > 100).show(5) # identico
Scegline uno e attaccati a quello. Io tendo verso where perché si legge come SQL. Alcuni team preferiscono filter perché si legge come Python. Né l’uno né l’altro è sbagliato; la coerenza dentro una singola codebase conta più della scelta in sé.
Puoi anche passare un frammento di stringa SQL se vuoi, il che può essere comodo quando copi-incolli da un editor SQL:
df.where("amount > 100 AND country = 'IT'").show(5)
Questo funziona, passa attraverso lo stesso ottimizzatore Catalyst, e produce lo stesso piano della forma con espressione di colonna. Usalo quando è chiaramente più leggibile; altrimenti rimani con le espressioni di colonna, perché vengono controllate al parse time e l’autocomplete dell’IDE le riconosce.
Quattro modi di riferirsi a una colonna
Qui PySpark si fa interessante. Supponiamo che il tuo DataFrame abbia una colonna che si chiama amount. Puoi riferirti ad essa come:
"amount" # stringa nuda
df.amount # accesso ad attributo
df["amount"] # accesso a item
col("amount") # chiamata di funzione (da pyspark.sql.functions)
Tutti e quattro funzionano in select. Non sono equivalenti ovunque. Ecco il riassunto:
1. Stringa nuda "amount": funziona in select, groupBy, orderBy, drop, dropDuplicates. Non funziona dove Spark si aspetta un oggetto Column, per esempio non puoi scrivere df.where("amount" > 100), perché Python valuta "amount" > 100 come confronto stringa-vs-int prima che Spark veda alcunché. Quello è un errore Python, non Spark.
2. Accesso ad attributo df.amount: funziona la maggior parte delle volte, fallisce in tre situazioni. (a) Il nome della colonna ha uno spazio o una punteggiatura: df.amount paid è un errore di sintassi, df["amount paid"] funziona. (b) Il nome della colonna fa ombra a un metodo del DataFrame: df.count non si riferisce alla colonna count, si riferisce al metodo count(). Confondente e silenzioso. (c) Il nome della colonna ha capitalizzazione strana o inizia con una cifra.
3. Accesso a item df["amount"]: funziona sempre, su qualsiasi nome di colonna, per quanto maledetto. Questo è quello che afferro quando non sono sicuro. È leggermente più rumoroso dell’accesso ad attributo ma non mi sorprende mai.
4. La funzione col(): funziona sempre, e in modo unico, funziona senza un DataFrame nello scope. Questo conta in due contesti:
from pyspark.sql.functions import col, when, sum as _sum
# Dentro agg(): non c'è un df.x da scrivere, devi usare col()
df.groupBy("country").agg(_sum(col("amount")).alias("total"))
# Quando costruisci espressioni riusabili in una funzione
def positive_amount():
return col("amount") > 0
df.where(positive_amount())
Userai col() di continuo una volta che inizierai a scrivere funzioni che operano sulle colonne in modo generico. Abituatici.
La raccomandazione, se ne vuoi una: usa col() nelle espressioni, df["x"] quando devi essere inequivocabile su quale DataFrame, e stringhe nude solo negli argomenti di liste di colonne come select e groupBy. Evita df.x nel codice condiviso perché i modi di fallire sono silenziosi.
Operatori booleani: la trappola della precedenza
Le espressioni di colonna di PySpark fanno overload degli operatori bitwise di Python (&, |, ~) per la logica booleana. Non sono uguali a and, or, not. Questo è l’errore singolo più comune dei principianti PySpark.
# SBAGLIATO -- l'`and` di Python non sa cos'è una Column
df.where(col("amount") > 100 and col("country") == "IT")
# Solleva: ValueError: Cannot convert column into bool
# GIUSTO
df.where((col("amount") > 100) & (col("country") == "IT"))
E ti servono assolutamente le parentesi attorno a ogni confronto. Perché? Perché & e | hanno precedenza più alta di >, ==, <, eccetera, nella grammatica di Python & lega più stretto degli operatori di confronto. Senza parentesi, Python fa il parsing della tua espressione nell’ordine sbagliato e ottieni un errore confondente o, peggio, un filtro silenziosamente sbagliato.
# SBAGLIATO -- parsato come col("amount") > (100 & col("country")) == "IT"
df.where(col("amount") > 100 & col("country") == "IT")
# GIUSTO
df.where((col("amount") > 100) & (col("country") == "IT"))
Allena le dita a battere le parentesi automaticamente. Ogni confronto in un filtro composto si avvolge in parentesi. Nessuna eccezione.
~ è la negazione:
df.where(~(col("country") == "NL"))
# uguale a
df.where(col("country") != "NL")
Entrambe le forme funzionano. La forma ~ si compone bene quando stai negando un’espressione più lunga, come ~col("email").isNull().
selectExpr: quando SQL è più corto
C’è un metodo gemello, selectExpr, che prende frammenti di stringa SQL invece di espressioni Column. È semplicemente select con un parser SQL davanti:
df.selectExpr(
"order_id",
"amount * 1.22 AS amount_with_vat",
"CASE WHEN amount > 100 THEN 'big' ELSE 'small' END AS bucket",
)
Quando un’espressione è breve e ovviamente di forma SQL, selectExpr è più compatto della catena col(...).cast(...) equivalente. Quando l’espressione viene costruita programmaticamente, la forma con espressione Column è meglio perché la puoi comporre. Scegli a seconda della situazione; entrambi compilano nello stesso piano.
Matching di stringhe
Quattro pattern, in ordine dal più economico al più generale:
df.where(col("email").startswith("anne")) # prefisso
df.where(col("email").endswith("@gmail.com")) # suffisso
df.where(col("email").contains("anne")) # sottostringa
df.where(col("name").like("anne %")) # SQL LIKE -- wildcard % e _
df.where(col("name").rlike("^[Aa]nne")) # regex (sintassi Java)
startswith e endswith sono i più economici perché a volte possono usare partition pruning o salti basati sull’ordinamento. contains è una ricerca di sottostringa. like segue le regole di wildcard di SQL: % fa match con qualsiasi sequenza, _ con un carattere. rlike è regex completo, il sapore Java delle regex, che è al 95% lo stesso del modulo re di Python ma non identico. Attenzione all’escape \\ vs \ quando metti una regex in una stringa Python.
Filtri sui null
isNull() e isNotNull() sono metodi su una colonna. Si applicano le stesse regole di logica a tre valori di SQL: un confronto che coinvolge null restituisce null, non true o false, quindi where(col("x") == None) non fa quello che pensi.
df.where(col("email").isNull()).count() # righe senza email
df.where(col("email").isNotNull()).count() # righe con email
# SBAGLIATO -- confronto con None; non fa mai match
df.where(col("email") == None)
Stessa trappola, lingua diversa. SQL ha = NULL che restituisce null. PySpark ha == None che restituisce null. Stesso fix: usa i metodi dedicati per i null.
C’è anche eqNullSafe (a volte scritto come l’operatore <=> in Spark SQL), che tratta null come un valore confrontabile: null eqNullSafe null è true, null eqNullSafe 1 è false. Utile per condizioni di join in cui vuoi che i null facciano match con i null, raro in pratica, ma è l’unica volta in cui == non fa quello che vuoi e c’è un’alternativa pulita.
Concatenare filtri
Puoi concatenare filtri quanto vuoi. Ognuno è il proprio nodo del piano, ma Catalyst li collasserà in un singolo filtro fisico al momento dell’esecuzione:
df.filter(col("country") == "IT") \
.filter(col("amount") > 100) \
.filter(col("ts") >= "2026-01-01") \
.show()
La forma a catena e la forma composta ((country == "IT") & (amount > 100) & (ts >= "...")) producono piani fisici identici. Scegli quale si legge meglio. Tendo a concatenare quando ogni filtro è una “regola” separata col proprio significato (“clienti italiani”, “ordini di alto valore”, “questo trimestre”) e a usare la forma composta quando è una singola condizione logica con più parti.
Mettere tutto insieme
Esempio realistico: da un DataFrame orders, ottieni l’ID dell’ordine, l’ID del cliente, l’importo, e il codice paese, per gli ordini da Italia, Spagna, o Francia nel primo trimestre, dove l’importo è sopra 50 e l’email non è null.
from pyspark.sql.functions import col
result = (
orders
.where(col("country").isin("IT", "ES", "FR"))
.where(col("amount") > 50)
.where(col("email").isNotNull())
.where((col("ts") >= "2026-01-01") & (col("ts") < "2026-04-01"))
.select(
col("order_id"),
col("customer_id"),
col("amount").alias("amount_eur"),
col("country"),
)
)
result.show(20)
isin è l’equivalente dell’IN di SQL. Prende qualsiasi numero di valori letterali. Non provare a passare una lista Python con l’unpacking * a meno che non ti serva davvero: isin("IT", "ES", "FR") va bene; isin(*countries_list) funziona anche se countries_list è dinamico.
Nota che l’ordine delle operazioni non conta per la correttezza: Catalyst riordinerà i filtri durante la pianificazione per mettere prima i più economici e più selettivi, e per spingerli giù fino alla sorgente dati dove possibile (arriveremo al predicate pushdown in una lezione più avanti). Scrivi i filtri nell’ordine che è più chiaro da leggere. All’ottimizzatore non importa.
Un’altra nota pratica: select e filter sono entrambe trasformazioni narrow, il che significa che non fanno shuffle dei dati tra executor. Sono economici. Puoi sparpagliarli liberamente per la pipeline, e l’ottimizzatore collasserà quelli adiacenti in un singolo passo fisico. Questo è l’opposto di operazioni come groupBy e join, alle quali arriveremo nelle prossime lezioni: quelle fanno shuffle, e dove le metti conta tantissimo.
Fai girare questo sulla tua macchina
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("select-filter-demo").getOrCreate()
data = [
(1, 42, 59.00, "IT", "anne@example.com", "2026-01-03 10:32:00"),
(2, 42, 29.00, "IT", None, "2026-01-04 14:22:00"),
(3, 17, 149.00, "DE", "bob@example.com", "2026-02-05 09:15:00"),
(4, 17, 199.00, "ES", "bob@example.com", "2026-02-22 11:40:00"),
(5, 8, 42.42, "RO", None, "2026-03-28 08:12:00"),
(6, 99, 12.00, "IT", "claire@example.com", "2026-04-15 16:00:00"),
]
schema = "order_id INT, customer_id INT, amount DOUBLE, country STRING, email STRING, ts TIMESTAMP"
orders = spark.createDataFrame(data, schema)
# Q1: simple filter, simple select
orders.where(col("country") == "IT").select("order_id", "amount").show()
# Q2: compound filter -- note the parens around each comparison
orders.where(
(col("amount") > 50) & (col("country").isin("IT", "ES", "FR"))
).show()
# Q3: null-safe filter
orders.where(col("email").isNotNull()).select("order_id", "email").show()
# Q4: the four column-reference styles, all equivalent here
orders.select("amount").show(2)
orders.select(orders.amount).show(2)
orders.select(orders["amount"]).show(2)
orders.select(col("amount")).show(2)
# Q5: a string-pattern filter
orders.where(col("email").rlike("^[ab]")).show()
# Q6: deliberate trap -- try removing the parens around the comparisons
# orders.where(col("amount") > 50 & col("country") == "IT") # FAILS
orders.where((col("amount") > 50) & (col("country") == "IT")).show()
Fai girare ogni query. Prova a togliere le parentesi in Q6. Leggi l’errore. Quell’errore ti farà risparmiare un’ora la prossima volta che lo vedrai.
Prossima lezione: aggiungere colonne con withColumn, l’helper lit, e la trappola della concatenazione che trasforma una pipeline da 50 step in un piano di esecuzione aggrovigliato.
Riferimento: Apache Spark Python API (https://spark.apache.org/docs/latest/api/python/), recuperato il 2026-05-01.