PySpark, de la zero Lecția 18 / 60

Redenumire, drop, cast: operatorii uzuali de cleanup

withColumnRenamed, drop, cast si operatiile mici-dar-frecvente care formeaza jumatate din orice ETL real.

Articolele de blog Spark și prezentările de la conferințe arată mereu părțile glamuroase: window functions, broadcast joins, AQE, query-uri sub-secundă pe tabele de petabyte. Job-urile reale de ETL sunt 80% altceva: coloana se cheamă cust_id upstream și customer_id downstream, fișierul e plin de string-uri care ar trebui să fie int-uri, cineva a adăugat o coloană de debug acum trei release-uri pe care nimeni n-o scoate.

Lecția asta e trusa pentru mijlocul ăla plictisitor. withColumnRenamed, toDF, drop, cast, selectExpr. Operatori mici, folosiți de zeci de ori per pipeline, fiecare cu una sau două capcane care mușcă la scară.

Setup

Un DataFrame mic și murdar. Pretindem că a venit dintr-un CSV cu default-uri permisive: fiecare coloană citită ca string, numele luate direct din header-ul de export legacy:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, DateType

spark = (SparkSession.builder
         .appName("CleanupOps")
         .master("local[*]")
         .getOrCreate())

raw = spark.createDataFrame(
    [
        ("1001", "1", "59.00",  "NL", "2026-03-05", "debug-x"),
        ("1002", "1", "29.00",  "NL", "2026-03-18", "debug-y"),
        ("1003", "2", "149.00", "IT", "2026-02-15", "debug-z"),
        ("1004", "2", "89.50",  "IT", "2026-03-22", "debug-q"),
        ("1005", "3", "abc",    "DE", "2026-03-10", "debug-q"),    # bad number
        ("1006", "4", "42.42",  "RO", "not-a-date", "debug-q"),    # bad date
    ],
    "ord_id STRING, cust_id STRING, total_str STRING, ctry STRING, ord_dt STRING, scratch STRING",
)

raw.printSchema()
# root
#  |-- ord_id: string (nullable = true)
#  |-- cust_id: string (nullable = true)
#  |-- total_str: string (nullable = true)
#  |-- ctry: string (nullable = true)
#  |-- ord_dt: string (nullable = true)
#  |-- scratch: string (nullable = true)

Șase coloane, toate string, nume scurte ciudate, o coloană scratch rămasă din vreo veche sesiune de debugging și două rânduri care nu se vor casta curat. Realist.

Redenumire câte o coloană odată: withColumnRenamed

Calul de bătaie pentru o singură redenumire:

renamed = raw.withColumnRenamed("ord_id", "OrderId")
renamed.printSchema()
# root
#  |-- OrderId: string (nullable = true)
#  |-- cust_id: string (nullable = true)
#  ...

Două parametri: numele vechi, numele nou. Întoarce un DataFrame nou (totul în Spark e imutabil: raw rămâne neatins). Dacă numele vechi nu există, withColumnRenamed e tăcut un no-op: fără eroare, fără warning. Scrii greșit ord_id ca or_id și pipeline-ul rulează bine, dar coloana e în continuare ord_id downstream. Atenție la asta. Adaugă un assert "OrderId" in df.columns defensiv după o redenumire dacă e load-bearing.

Ca să redenumești mai multe coloane, înlănțuie apelurile:

renamed = (raw
    .withColumnRenamed("ord_id",   "OrderId")
    .withColumnRenamed("cust_id",  "CustomerId")
    .withColumnRenamed("total_str","Total")
    .withColumnRenamed("ctry",     "Country")
    .withColumnRenamed("ord_dt",   "OrderDate"))

Lizibil, explicit, greu de stricat. Pentru cinci coloane e ok. Pentru 50 devine plictisitor: există o scurtătură.

Scurtătura toDF(*new_names)

toDF întoarce un DataFrame nou cu aceleași date și numele de coloane pe care le pasezi, pozițional:

renamed = raw.toDF(
    "OrderId", "CustomerId", "Total", "Country", "OrderDate", "Scratch"
)

Un singur apel, toate redenumite. Frumos, și periculos. toDF nu știe numele tale vechi. Dacă o versiune viitoare a export-ului upstream reordonează coloanele, coloana OrderId conține acum ce era înainte valorile cust_id. Corupție tăcută.

Folosește toDF doar când ești încrezător în ordinea coloanelor: de exemplu imediat după read.csv(..., header=False) unde controlezi schema, sau în fixtures de test. Pentru redenumiri pe o sursă externă, preferă lanțul explicit withColumnRenamed. Verbozitatea merită.

Un middle ground mai sigur, când ai multe redenumiri, e un loop programatic:

rename_map = {
    "ord_id":    "OrderId",
    "cust_id":   "CustomerId",
    "total_str": "Total",
    "ctry":      "Country",
    "ord_dt":    "OrderDate",
}

renamed = raw
for old, new in rename_map.items():
    renamed = renamed.withColumnRenamed(old, new)

Explicit, ordonat, ușor de citit la code review. Asta scriu eu când sunt mai mult de trei redenumiri.

Eliminarea coloanelor: drop

cleaned = renamed.drop("Scratch")

Scoate coloana. Drop pe mai multe coloane într-un singur apel:

cleaned = renamed.drop("Scratch", "Country", "OrderDate")

Două proprietăți importante:

  1. drop e idempotent. Apelul drop("not_a_column") e un no-op, fără eroare. Asta e de fapt drăguț: codul tău de cleanup continuă să funcționeze când export-ul upstream scoate o coloană pe care oricum o eliminai.
  2. drop e alternativa sigură la select când vrei aproape totul în afara unei sau două coloane. Să scrii df.select("a", "b", "c", ..., "z") ca să scoți o coloană e fragil; df.drop("y") e antiglonț.

Acestea fiind zise, când vrei să fii exact legat de schema ta de output, fiecare coloană listată, fără surprize, select explicit e mai bun. E diferența între „scoate coloanele pe care nu le vreau” și „păstrează exact coloanele pe care le vreau”. A doua e mai defensivă.

Casting: col(...).cast(...) și capcana NULL-ului tăcut

Redenumirea și drop-ul nu schimbă tipurile. Ca să transformi un string într-un int faci cast:

typed = (cleaned
    .withColumn("OrderId",     col("OrderId").cast("int"))
    .withColumn("CustomerId",  col("CustomerId").cast("int"))
    .withColumn("Total",       col("Total").cast("double"))
    .withColumn("OrderDate",   col("OrderDate").cast("date")))

typed.printSchema()
# root
#  |-- OrderId: integer (nullable = true)
#  |-- CustomerId: integer (nullable = true)
#  |-- Total: double (nullable = true)
#  |-- Country: string (nullable = true)
#  |-- OrderDate: date (nullable = true)

Poți pasa fie un string ("int", "double", "date", "timestamp", "string", "boolean", "long", "decimal(10,2)"), fie un obiect tip propriu-zis (IntegerType(), DoubleType(), etc.). Forma cu string e mai scurtă, forma cu obiect tip primește autocomplete în editor. Eu folosesc string-uri.

Acum capcana. Uită-te la rândul 5 (Total = "abc") și rândul 6 (OrderDate = "not-a-date"):

typed.show()
# +-------+----------+------+-------+----------+
# |OrderId|CustomerId| Total|Country| OrderDate|
# +-------+----------+------+-------+----------+
# |   1001|         1|  59.0|     NL|2026-03-05|
# |   1002|         1|  29.0|     NL|2026-03-18|
# |   1003|         2| 149.0|     IT|2026-02-15|
# |   1004|         2|  89.5|     IT|2026-03-22|
# |   1005|         3|  NULL|     DE|2026-03-10|   ← "abc" became NULL
# |   1006|         4| 42.42|     RO|      NULL|   ← "not-a-date" became NULL
# +-------+----------+------+-------+----------+

Cast-urile proaste întorc NULL tăcut. Fără excepție, fără mesaj de log. Job-ul tău rulează până la final și 0,4% din numere dispar. Ăsta e cel mai frecvent bug de ETL pe care îl văd la code review: cineva a avut încredere într-un cast pe date generate de utilizator și a pierdut o felie de venit.

Pattern-uri defensive:

# Option 1: count the casualties before continuing
bad_total = (cleaned
    .where(col("Total").isNotNull() & col("Total").cast("double").isNull())
    .count())
print(f"Rows with un-castable Total: {bad_total}")
if bad_total > 0:
    raise ValueError(f"{bad_total} rows had non-numeric Total")

# Option 2: keep the original string column for inspection
typed = cleaned.withColumn("Total_d", col("Total").cast("double"))
# Now Total still has "abc" and Total_d has the cast result;
# you can join the two for reporting bad rows.

Opțiunea 1 e mișcarea corectă pentru producție: pică zgomotos când scade calitatea datelor, nu pierde tăcut rânduri. Ajustează pragul după gust: uneori 0,01% rânduri proaste e acceptabil, uneori zero e regula.

Redenumire și selecție într-o singură trecere

Dacă oricum te pregătești să proiectezi coloanele, poți redenumi în același select:

projected = cleaned.select(
    col("OrderId"),
    col("CustomerId").alias("CustId"),     # rename via alias
    col("Total").cast("double").alias("Amount"),
    col("Country").alias("CountryCode"),
)

alias e trucul de redenumire-la-selecție. Drăguț când oricum faci chirurgie pe coloane și nu vrei un lanț separat de withColumnRenamed care să umple codul.

selectExpr: scurtătura cu string-uri SQL

Pentru cei care gândesc în SQL, selectExpr îți permite să scrii fragmente SQL:

shortcut = raw.selectExpr(
    "ord_id     AS OrderId",
    "CAST(cust_id AS INT) AS CustomerId",
    "CAST(total_str AS DOUBLE) AS Total",
    "ctry       AS Country",
    "CAST(ord_dt AS DATE) AS OrderDate",
)

Un singur apel, fiecare redenumire și cast într-un singur loc. String-urile sunt Spark SQL real, deci primești toată sintaxa de expresii SQL: CASE WHEN, COALESCE, apeluri de funcții, tot tacâmul. Convenabil. Dezavantajul: pierzi autocomplete-ul Python și linter-ul tău nu poate prinde typo-uri în string-uri. Folosesc selectExpr când transformările sunt clar de formă SQL și select cu col() când sunt clar de formă Python.

Pattern-ul ETL complet

Iată cum arată toate astea ca un singur job: sandwich-ul input/cleanup/output care e scheletul oricărui pipeline PySpark real:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("OrdersETL")
         .master("local[*]")
         .getOrCreate())

# 1. Read — schema-on-read, everything string
raw = (spark.read
       .option("header", True)
       .csv("./data/orders_raw.csv"))

# 2. Rename to canonical column names
renamed = (raw
    .withColumnRenamed("ord_id",    "OrderId")
    .withColumnRenamed("cust_id",   "CustomerId")
    .withColumnRenamed("total_str", "Total")
    .withColumnRenamed("ctry",      "Country")
    .withColumnRenamed("ord_dt",    "OrderDate"))

# 3. Cast to real types
typed = (renamed
    .withColumn("OrderId",    col("OrderId").cast("int"))
    .withColumn("CustomerId", col("CustomerId").cast("int"))
    .withColumn("Total",      col("Total").cast("double"))
    .withColumn("OrderDate",  col("OrderDate").cast("date")))

# 4. Data quality check — fail loud if too many NULLs appeared
bad = typed.where(
    col("OrderId").isNull() |
    col("Total").isNull() |
    col("OrderDate").isNull()
).count()
if bad > 0:
    print(f"WARNING: {bad} rows lost data in cast")

# 5. Drop scratch columns and select the final shape
final = typed.drop("scratch", "internal_flag").select(
    "OrderId", "CustomerId", "Total", "Country", "OrderDate"
)

# 6. Write Parquet, partitioned by country for downstream queries
(final
 .write
 .mode("overwrite")
 .partitionBy("Country")
 .parquet("./out/orders_clean"))

Șase pași. Citește, redenumește, cast, validează, proiectează, scrie. Structura se repetă pe mii de pipeline-uri, cu nume de coloane diferite, surse diferite, formate de output diferite, dar scheletul e același.

Erori frecvente și cum să le citești

Cele două mesaje de eroare pe care le vei vedea cel mai des când lucrezi cu acești operatori:

AnalysisException: cannot resolve 'col_name' given input columns: [...]

Ai făcut typo la un nume de coloană sau referențiezi o coloană care a fost redenumită mai devreme în pipeline. Eroarea afișează util coloanele reale între paranteze. Citește-le cu atenție: de obicei vei prinde typo-ul sau vei realiza că te uiți la schema post-rename în loc de cea pre-rename.

NULL-uri tăcute după cast.

Tehnic, nu e o eroare: ăsta e tot rostul capcanei. Pipeline-ul termină, row count-ul e același, valorile sunt greșite. Soluția e pasul de validare din pattern-ul 4 de mai sus. Numără mereu NULL-urile după un cast pe coloane importante, mai ales pe cele care vin din sisteme externe.

Rulează asta pe propria mașină

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder.appName("Cleanup").master("local[*]").getOrCreate())

raw = spark.createDataFrame(
    [
        ("1001", "1", "59.00",  "NL", "2026-03-05", "debug-x"),
        ("1002", "1", "29.00",  "NL", "2026-03-18", "debug-y"),
        ("1003", "2", "abc",    "DE", "2026-03-10", "debug-z"),
    ],
    "ord_id STRING, cust_id STRING, total_str STRING, ctry STRING, ord_dt STRING, scratch STRING",
)

# 1. Chained renames
v1 = (raw
    .withColumnRenamed("ord_id",    "OrderId")
    .withColumnRenamed("cust_id",   "CustomerId")
    .withColumnRenamed("total_str", "Total")
    .withColumnRenamed("ctry",      "Country")
    .withColumnRenamed("ord_dt",    "OrderDate"))

# 2. Drop the scratch column
v2 = v1.drop("scratch")

# 3. Cast types — note row 3 with "abc" becomes NULL
v3 = (v2
    .withColumn("OrderId",    col("OrderId").cast("int"))
    .withColumn("CustomerId", col("CustomerId").cast("int"))
    .withColumn("Total",      col("Total").cast("double"))
    .withColumn("OrderDate",  col("OrderDate").cast("date")))
v3.show()

# 4. Same pipeline via selectExpr
v_alt = raw.selectExpr(
    "CAST(ord_id AS INT) AS OrderId",
    "CAST(cust_id AS INT) AS CustomerId",
    "CAST(total_str AS DOUBLE) AS Total",
    "ctry AS Country",
    "CAST(ord_dt AS DATE) AS OrderDate",
)
v_alt.show()

# 5. Count rows with cast casualties
bad = v3.where(col("Total").isNull()).count()
print(f"Bad Total rows: {bad}")

Observă cum rândul 3 apare cu Total = NULL și nicio eroare nu a fost ridicată nicăieri. Aceea e capcana NULL-ului tăcut în acțiune. Construiește-ți obiceiul să numeri mereu NULL-urile după un cast pe o coloană care contează: trei linii de cod defensiv azi te scapă de o investigație de cinci ore pe pierdere de date peste două luni.

Asta închide Modulul 3. Poți citi date, proiecta coloane, filtra rânduri, construi expresii, agrega, sorta și curăța. Aia e o trusă completă de DataFrame-fundamentals; de aici poți scrie un job PySpark real și să-l livrezi. Modulul 4 începe în lecția următoare cu joins: inner, left, semi, anti, broadcast, plus modurile de eșec care fac dintr-un join cel mai frecvent motiv pentru care un job Spark rămâne fără memorie.

Caută