PySpark, dalle fondamenta Lezione 18 / 60

Rinominare, droppare, fare cast: gli operatori di pulizia di tutti i giorni

withColumnRenamed, drop, cast e le operazioni piccole-ma-frequenti che costituiscono metà di qualsiasi ETL reale.

I post sui blog di Spark e i talk alle conferenze mostrano sempre i pezzi affascinanti: window function, broadcast join, AQE, query in meno di un secondo su tabelle da petabyte. I job ETL reali sono per l’80% qualcos’altro: la colonna a monte si chiama cust_id e a valle customer_id, il file è pieno di stringhe che dovrebbero essere int, qualcuno ha aggiunto una colonna di debug tre release fa che nessuno rimuove.

Questa lezione è la cassetta degli attrezzi per quel pezzo centrale e noioso. withColumnRenamed, toDF, drop, cast, selectExpr. Operatori piccolissimi, usati decine di volte per pipeline, con uno o due tranelli ciascuno che mordono quando scali.

Setup

Un piccolo DataFrame disordinato. Fai finta che venga da un CSV con default permissivi: ogni colonna letta come stringa, nomi presi dritto dall’header dell’export legacy:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, DateType

spark = (SparkSession.builder
         .appName("CleanupOps")
         .master("local[*]")
         .getOrCreate())

raw = spark.createDataFrame(
    [
        ("1001", "1", "59.00",  "NL", "2026-03-05", "debug-x"),
        ("1002", "1", "29.00",  "NL", "2026-03-18", "debug-y"),
        ("1003", "2", "149.00", "IT", "2026-02-15", "debug-z"),
        ("1004", "2", "89.50",  "IT", "2026-03-22", "debug-q"),
        ("1005", "3", "abc",    "DE", "2026-03-10", "debug-q"),    # bad number
        ("1006", "4", "42.42",  "RO", "not-a-date", "debug-q"),    # bad date
    ],
    "ord_id STRING, cust_id STRING, total_str STRING, ctry STRING, ord_dt STRING, scratch STRING",
)

raw.printSchema()
# root
#  |-- ord_id: string (nullable = true)
#  |-- cust_id: string (nullable = true)
#  |-- total_str: string (nullable = true)
#  |-- ctry: string (nullable = true)
#  |-- ord_dt: string (nullable = true)
#  |-- scratch: string (nullable = true)

Sei colonne, tutte stringhe, nomi corti e strani, una colonna scratch lasciata da una vecchia sessione di debugging di qualcuno, e due righe che non si castano in modo pulito. Realistico.

Rinominare una colonna alla volta: withColumnRenamed

Il cavallo da soma del rename singolo:

renamed = raw.withColumnRenamed("ord_id", "OrderId")
renamed.printSchema()
# root
#  |-- OrderId: string (nullable = true)
#  |-- cust_id: string (nullable = true)
#  ...

Due parametri: vecchio nome, nuovo nome. Restituisce un nuovo DataFrame (in Spark è tutto immutabile: raw non cambia). Se il vecchio nome non esiste, withColumnRenamed è silenziosamente un no-op: niente errore, niente warning. Scrivi or_id invece di ord_id e la tua pipeline gira benissimo ma la colonna a valle si chiama ancora ord_id. Stai attento. Aggiungi un assert "OrderId" in df.columns difensivo dopo un rename se è load-bearing.

Per rinominare più colonne, concatena le chiamate:

renamed = (raw
    .withColumnRenamed("ord_id",   "OrderId")
    .withColumnRenamed("cust_id",  "CustomerId")
    .withColumnRenamed("total_str","Total")
    .withColumnRenamed("ctry",     "Country")
    .withColumnRenamed("ord_dt",   "OrderDate"))

Leggibile, esplicito, difficile da sbagliare. Per cinque colonne va bene. Per 50 diventa noioso: c’è una scorciatoia.

La scorciatoia toDF(*new_names)

toDF restituisce un nuovo DataFrame con gli stessi dati e i nomi di colonna che gli passi, posizionalmente:

renamed = raw.toDF(
    "OrderId", "CustomerId", "Total", "Country", "OrderDate", "Scratch"
)

Una chiamata, tutto rinominato. Bellissimo, e pericoloso. toDF non conosce i tuoi vecchi nomi. Se una versione futura dell’export a monte riordina le colonne, la tua colonna OrderId ora contiene ciò che prima era cust_id. Corruzione silenziosa.

Usa toDF solo quando sei sicuro dell’ordine delle colonne, ad esempio subito dopo read.csv(..., header=False) dove controlli tu lo schema, o nelle fixture di test. Per i rename contro una sorgente esterna, preferisci la catena esplicita di withColumnRenamed. La verbosità vale la pena.

Una via di mezzo più sicura quando hai molti rename è un loop programmatico:

rename_map = {
    "ord_id":    "OrderId",
    "cust_id":   "CustomerId",
    "total_str": "Total",
    "ctry":      "Country",
    "ord_dt":    "OrderDate",
}

renamed = raw
for old, new in rename_map.items():
    renamed = renamed.withColumnRenamed(old, new)

Esplicito, ordinato, facile da leggere in una code review. È quello che scrivo io quando ci sono più di tre rename.

Droppare colonne: drop

cleaned = renamed.drop("Scratch")

Droppa la colonna. Drop multi-colonna in un’unica chiamata:

cleaned = renamed.drop("Scratch", "Country", "OrderDate")

Due proprietà importanti:

  1. drop è idempotente. Chiamare drop("not_a_column") è un no-op, niente errore. In realtà è utile: il tuo codice di pulizia continua a funzionare quando l’export a monte rimuove una colonna che stavi già droppando.
  2. drop è l’alternativa sicura a select quando vuoi praticamente tutto tranne una o due colonne. Scrivere df.select("a", "b", "c", ..., "z") per rimuovere una colonna è fragile; df.drop("y") è a prova di proiettile.

Detto questo, quando vuoi essere esatto sul tuo schema di output (ogni colonna elencata, niente sorprese), uno select esplicito è meglio. È la differenza tra “rimuovi le colonne che non voglio” e “tieni esattamente le colonne che voglio”. Il secondo è più difensivo.

Cast: col(...).cast(...) e il tranello del NULL silenzioso

Rinominare e droppare non cambiano i tipi. Per trasformare una stringa in int fai cast:

typed = (cleaned
    .withColumn("OrderId",     col("OrderId").cast("int"))
    .withColumn("CustomerId",  col("CustomerId").cast("int"))
    .withColumn("Total",       col("Total").cast("double"))
    .withColumn("OrderDate",   col("OrderDate").cast("date")))

typed.printSchema()
# root
#  |-- OrderId: integer (nullable = true)
#  |-- CustomerId: integer (nullable = true)
#  |-- Total: double (nullable = true)
#  |-- Country: string (nullable = true)
#  |-- OrderDate: date (nullable = true)

Puoi passare o una stringa ("int", "double", "date", "timestamp", "string", "boolean", "long", "decimal(10,2)") o un vero oggetto type (IntegerType(), DoubleType(), ecc.). La forma stringa è più corta, la forma type-object dà l’autocomplete dell’editor. Io uso le stringhe.

Ora il tranello. Guarda la riga 5 (Total = "abc") e la riga 6 (OrderDate = "not-a-date"):

typed.show()
# +-------+----------+------+-------+----------+
# |OrderId|CustomerId| Total|Country| OrderDate|
# +-------+----------+------+-------+----------+
# |   1001|         1|  59.0|     NL|2026-03-05|
# |   1002|         1|  29.0|     NL|2026-03-18|
# |   1003|         2| 149.0|     IT|2026-02-15|
# |   1004|         2|  89.5|     IT|2026-03-22|
# |   1005|         3|  NULL|     DE|2026-03-10|   <- "abc" became NULL
# |   1006|         4| 42.42|     RO|      NULL|   <- "not-a-date" became NULL
# +-------+----------+------+-------+----------+

I cast falliti restituiscono NULL silenziosamente. Niente eccezione, niente messaggio di log. Il tuo job arriva a completamento e lo 0,4% dei tuoi numeri svanisce. È il singolo bug ETL più comune che vedo nelle code review: qualcuno si è fidato di un cast su dati generati dagli utenti e ha perso una fetta di fatturato.

Pattern difensivi:

# Option 1: count the casualties before continuing
bad_total = (cleaned
    .where(col("Total").isNotNull() & col("Total").cast("double").isNull())
    .count())
print(f"Rows with un-castable Total: {bad_total}")
if bad_total > 0:
    raise ValueError(f"{bad_total} rows had non-numeric Total")

# Option 2: keep the original string column for inspection
typed = cleaned.withColumn("Total_d", col("Total").cast("double"))
# Now Total still has "abc" and Total_d has the cast result;
# you can join the two for reporting bad rows.

L’opzione 1 è la mossa giusta in produzione: fallisci forte quando la qualità dei dati cala, non perdere righe in silenzio. Aggiusta la soglia a piacere: a volte lo 0,01% di righe cattive è accettabile, a volte zero lo è.

Rinominare e selezionare in un’unica passata

Se stai già per proiettare le colonne, puoi rinominare nello stesso select:

projected = cleaned.select(
    col("OrderId"),
    col("CustomerId").alias("CustId"),     # rename via alias
    col("Total").cast("double").alias("Amount"),
    col("Country").alias("CountryCode"),
)

alias è il trucco rinomina-mentre-selezioni. Bello quando stai già facendo chirurgia sulle colonne e non vuoi che una catena separata di withColumnRenamed ti sporchi il codice.

selectExpr: la scorciatoia con stringa SQL

Per chi pensa in SQL, selectExpr ti permette di scrivere frammenti SQL:

shortcut = raw.selectExpr(
    "ord_id     AS OrderId",
    "CAST(cust_id AS INT) AS CustomerId",
    "CAST(total_str AS DOUBLE) AS Total",
    "ctry       AS Country",
    "CAST(ord_dt AS DATE) AS OrderDate",
)

Una chiamata, ogni rename e cast in un solo posto. Le stringhe sono Spark SQL vero, quindi hai tutta la sintassi di espressione di SQL: CASE WHEN, COALESCE, chiamate a funzione, tutto. Comodo. Lo svantaggio: perdi l’autocomplete di Python e il linter non può beccare i typo nelle stringhe. Uso selectExpr quando le trasformazioni hanno chiaramente forma SQL e select con col() quando hanno chiaramente forma Python.

Il pattern ETL completo

Ecco come si presenta tutto questo come un singolo job: il sandwich input/cleanup/output che è lo scheletro di qualsiasi pipeline PySpark reale:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("OrdersETL")
         .master("local[*]")
         .getOrCreate())

# 1. Read -- schema-on-read, everything string
raw = (spark.read
       .option("header", True)
       .csv("./data/orders_raw.csv"))

# 2. Rename to canonical column names
renamed = (raw
    .withColumnRenamed("ord_id",    "OrderId")
    .withColumnRenamed("cust_id",   "CustomerId")
    .withColumnRenamed("total_str", "Total")
    .withColumnRenamed("ctry",      "Country")
    .withColumnRenamed("ord_dt",    "OrderDate"))

# 3. Cast to real types
typed = (renamed
    .withColumn("OrderId",    col("OrderId").cast("int"))
    .withColumn("CustomerId", col("CustomerId").cast("int"))
    .withColumn("Total",      col("Total").cast("double"))
    .withColumn("OrderDate",  col("OrderDate").cast("date")))

# 4. Data quality check -- fail loud if too many NULLs appeared
bad = typed.where(
    col("OrderId").isNull() |
    col("Total").isNull() |
    col("OrderDate").isNull()
).count()
if bad > 0:
    print(f"WARNING: {bad} rows lost data in cast")

# 5. Drop scratch columns and select the final shape
final = typed.drop("scratch", "internal_flag").select(
    "OrderId", "CustomerId", "Total", "Country", "OrderDate"
)

# 6. Write Parquet, partitioned by country for downstream queries
(final
 .write
 .mode("overwrite")
 .partitionBy("Country")
 .parquet("./out/orders_clean"))

Sei step. Read, rename, cast, validate, project, write. Quella struttura si ripete in migliaia di pipeline con nomi di colonne diversi, sorgenti diverse, formati di output diversi, ma lo scheletro è lo stesso.

Errori comuni e come leggerli

I due messaggi di errore che vedrai più spesso lavorando con questi operatori:

AnalysisException: cannot resolve 'col_namè given input columns: [...]

Hai sbagliato un nome di colonna, o stai referenziando una colonna che è stata rinominata prima nella pipeline. L’errore stampa utilmente le colonne effettive tra parentesi quadre. Leggile con cura: di solito beccherai il tuo typo o ti renderai conto che stai guardando lo schema post-rename invece di quello pre-rename.

NULL silenziosi dopo un cast.

Tecnicamente non è un errore: è tutto il senso del tranello. La pipeline finisce, il conteggio righe è lo stesso, i valori sono sbagliati. La fix è lo step di validazione nel pattern 4 sopra. Conta sempre i NULL dopo un cast su colonne importanti, soprattutto quelle che vengono da sistemi esterni.

Esegui questo sulla tua macchina

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder.appName("Cleanup").master("local[*]").getOrCreate())

raw = spark.createDataFrame(
    [
        ("1001", "1", "59.00",  "NL", "2026-03-05", "debug-x"),
        ("1002", "1", "29.00",  "NL", "2026-03-18", "debug-y"),
        ("1003", "2", "abc",    "DE", "2026-03-10", "debug-z"),
    ],
    "ord_id STRING, cust_id STRING, total_str STRING, ctry STRING, ord_dt STRING, scratch STRING",
)

# 1. Chained renames
v1 = (raw
    .withColumnRenamed("ord_id",    "OrderId")
    .withColumnRenamed("cust_id",   "CustomerId")
    .withColumnRenamed("total_str", "Total")
    .withColumnRenamed("ctry",      "Country")
    .withColumnRenamed("ord_dt",    "OrderDate"))

# 2. Drop the scratch column
v2 = v1.drop("scratch")

# 3. Cast types -- note row 3 with "abc" becomes NULL
v3 = (v2
    .withColumn("OrderId",    col("OrderId").cast("int"))
    .withColumn("CustomerId", col("CustomerId").cast("int"))
    .withColumn("Total",      col("Total").cast("double"))
    .withColumn("OrderDate",  col("OrderDate").cast("date")))
v3.show()

# 4. Same pipeline via selectExpr
v_alt = raw.selectExpr(
    "CAST(ord_id AS INT) AS OrderId",
    "CAST(cust_id AS INT) AS CustomerId",
    "CAST(total_str AS DOUBLE) AS Total",
    "ctry AS Country",
    "CAST(ord_dt AS DATE) AS OrderDate",
)
v_alt.show()

# 5. Count rows with cast casualties
bad = v3.where(col("Total").isNull()).count()
print(f"Bad Total rows: {bad}")

Nota come la riga 3 si presenta con Total = NULL e nessun errore è stato sollevato da nessuna parte. È il tranello del NULL silenzioso in azione. Costruisciti l’abitudine di contare sempre i NULL dopo un cast su una colonna che conta: tre righe di codice difensivo oggi ti risparmiano un’investigazione di cinque ore per perdita dati tra due mesi.

Così chiude il Modulo 3. Sai leggere dati, proiettare colonne, filtrare righe, costruire espressioni, aggregare, ordinare e ripulire. È un kit completo dei fondamentali del DataFrame; da qui puoi scrivere un job PySpark reale e spedirlo. Il Modulo 4 parte con la prossima lezione con le join: inner, left, semi, anti, broadcast, e i modi di fallimento che fanno della join il motivo più comune per cui un job Spark va out of memory.

Cerca