The Spark blog posts and conference talks always feature the glamorous bits: window functions, broadcast joins, AQE, sub-second query times on petabyte tables. Real ETL jobs are 80% something else: the column is called cust_id upstream and customer_id downstream, the file is full of strings that should be ints, somebody added a debug column three releases ago that nobody removes.
This lesson is the toolbox for that boring middle. withColumnRenamed, toDF, drop, cast, selectExpr. Tiny operators, used dozens of times per pipeline, with one or two traps each that bite when you scale.
Setup
A messy little DataFrame. Pretend it came from a CSV with permissive defaults — every column read in as string, names taken straight from the legacy export header:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, DateType
spark = (SparkSession.builder
.appName("CleanupOps")
.master("local[*]")
.getOrCreate())
raw = spark.createDataFrame(
[
("1001", "1", "59.00", "NL", "2026-03-05", "debug-x"),
("1002", "1", "29.00", "NL", "2026-03-18", "debug-y"),
("1003", "2", "149.00", "IT", "2026-02-15", "debug-z"),
("1004", "2", "89.50", "IT", "2026-03-22", "debug-q"),
("1005", "3", "abc", "DE", "2026-03-10", "debug-q"), # bad number
("1006", "4", "42.42", "RO", "not-a-date", "debug-q"), # bad date
],
"ord_id STRING, cust_id STRING, total_str STRING, ctry STRING, ord_dt STRING, scratch STRING",
)
raw.printSchema()
# root
# |-- ord_id: string (nullable = true)
# |-- cust_id: string (nullable = true)
# |-- total_str: string (nullable = true)
# |-- ctry: string (nullable = true)
# |-- ord_dt: string (nullable = true)
# |-- scratch: string (nullable = true)
Six columns, all string, weird short names, a leftover scratch column from someone’s old debugging session, and two rows that won’t cast cleanly. Realistic.
Renaming one column at a time: withColumnRenamed
The single-rename workhorse:
renamed = raw.withColumnRenamed("ord_id", "OrderId")
renamed.printSchema()
# root
# |-- OrderId: string (nullable = true)
# |-- cust_id: string (nullable = true)
# ...
Two parameters: old name, new name. Returns a new DataFrame (everything in Spark is immutable — raw is unchanged). If the old name doesn’t exist, withColumnRenamed is silently a no-op — no error, no warning. You misspell ord_id as or_id and your pipeline runs fine but the column is still named ord_id downstream. Watch for that. Add a defensive assert "OrderId" in df.columns after a rename if it’s load-bearing.
To rename several columns, chain calls:
renamed = (raw
.withColumnRenamed("ord_id", "OrderId")
.withColumnRenamed("cust_id", "CustomerId")
.withColumnRenamed("total_str","Total")
.withColumnRenamed("ctry", "Country")
.withColumnRenamed("ord_dt", "OrderDate"))
Readable, explicit, hard to mess up. For five columns that’s fine. For 50 it gets tedious — there’s a shortcut.
The toDF(*new_names) shortcut
toDF returns a new DataFrame with the same data and the column names you pass in, positionally:
renamed = raw.toDF(
"OrderId", "CustomerId", "Total", "Country", "OrderDate", "Scratch"
)
One call, all renamed. Beautiful — and dangerous. toDF doesn’t know your old names. If a future version of the upstream export reorders the columns, your OrderId column now contains what used to be cust_id values. Silent corruption.
Only use toDF when you’re confident about the column order — for example, immediately after read.csv(..., header=False) where you control the schema, or in test fixtures. For renames against an external source, prefer the explicit withColumnRenamed chain. The verbosity is worth it.
A safer middle ground when you have many renames is a programmatic loop:
rename_map = {
"ord_id": "OrderId",
"cust_id": "CustomerId",
"total_str": "Total",
"ctry": "Country",
"ord_dt": "OrderDate",
}
renamed = raw
for old, new in rename_map.items():
renamed = renamed.withColumnRenamed(old, new)
Explicit, ordered, easy to read in a code review. This is what I write when there are more than three renames.
Dropping columns: drop
cleaned = renamed.drop("Scratch")
Drops the column. Multi-column drop in one call:
cleaned = renamed.drop("Scratch", "Country", "OrderDate")
Two important properties:
dropis idempotent. Callingdrop("not_a_column")is a no-op, no error. That’s actually nice — your cleanup code keeps working when the upstream export removes a column you were dropping anyway.dropis the safe alternative toselectwhen you mostly want everything except one or two columns. Writingdf.select("a", "b", "c", ..., "z")to remove one column is brittle;df.drop("y")is bulletproof.
That said, when you want to be exact about your output schema — every column listed, no surprises — explicit select is better. It’s the difference between “remove the columns I don’t want” and “keep exactly the columns I do want.” The second is more defensive.
Casting: col(...).cast(...) and the silent-NULL trap
Renaming and dropping don’t change types. To turn a string into an int you cast:
typed = (cleaned
.withColumn("OrderId", col("OrderId").cast("int"))
.withColumn("CustomerId", col("CustomerId").cast("int"))
.withColumn("Total", col("Total").cast("double"))
.withColumn("OrderDate", col("OrderDate").cast("date")))
typed.printSchema()
# root
# |-- OrderId: integer (nullable = true)
# |-- CustomerId: integer (nullable = true)
# |-- Total: double (nullable = true)
# |-- Country: string (nullable = true)
# |-- OrderDate: date (nullable = true)
You can pass either a string ("int", "double", "date", "timestamp", "string", "boolean", "long", "decimal(10,2)") or an actual type object (IntegerType(), DoubleType(), etc.). String form is shorter, type-object form gets editor autocomplete. I use strings.
Now the trap. Look at row 5 (Total = "abc") and row 6 (OrderDate = "not-a-date"):
typed.show()
# +-------+----------+------+-------+----------+
# |OrderId|CustomerId| Total|Country| OrderDate|
# +-------+----------+------+-------+----------+
# | 1001| 1| 59.0| NL|2026-03-05|
# | 1002| 1| 29.0| NL|2026-03-18|
# | 1003| 2| 149.0| IT|2026-02-15|
# | 1004| 2| 89.5| IT|2026-03-22|
# | 1005| 3| NULL| DE|2026-03-10| ← "abc" became NULL
# | 1006| 4| 42.42| RO| NULL| ← "not-a-date" became NULL
# +-------+----------+------+-------+----------+
Bad casts return NULL silently. No exception, no log message. Your job runs to completion and 0.4% of your numbers vanish. This is the single most common ETL bug I see in code reviews — somebody trusted a cast on user-generated data and lost a slice of revenue.
Defensive patterns:
# Option 1: count the casualties before continuing
bad_total = (cleaned
.where(col("Total").isNotNull() & col("Total").cast("double").isNull())
.count())
print(f"Rows with un-castable Total: {bad_total}")
if bad_total > 0:
raise ValueError(f"{bad_total} rows had non-numeric Total")
# Option 2: keep the original string column for inspection
typed = cleaned.withColumn("Total_d", col("Total").cast("double"))
# Now Total still has "abc" and Total_d has the cast result;
# you can join the two for reporting bad rows.
Option 1 is the right move for production: fail loud when data quality drops, don’t silently lose rows. Adjust the threshold to taste — sometimes 0.01% bad rows is acceptable, sometimes zero is.
Renaming and selecting in one pass
If you’re already going to project columns, you can rename in the same select:
projected = cleaned.select(
col("OrderId"),
col("CustomerId").alias("CustId"), # rename via alias
col("Total").cast("double").alias("Amount"),
col("Country").alias("CountryCode"),
)
alias is the rename-while-selecting trick. Nice when you’re doing column surgery anyway and don’t want a separate withColumnRenamed chain to clutter the code.
selectExpr: the SQL-string shortcut
For people who think in SQL, selectExpr lets you write SQL fragments:
shortcut = raw.selectExpr(
"ord_id AS OrderId",
"CAST(cust_id AS INT) AS CustomerId",
"CAST(total_str AS DOUBLE) AS Total",
"ctry AS Country",
"CAST(ord_dt AS DATE) AS OrderDate",
)
One call, every rename and cast in one place. The strings are real Spark SQL, so you get all of SQL’s expression syntax — CASE WHEN, COALESCE, function calls, the lot. Convenient. The downside: you lose Python autocomplete and your linter can’t catch typos in the strings. I use selectExpr when the transformations are clearly SQL-shaped and select with col() when they’re clearly Python-shaped.
The full ETL pattern
Here’s what all this actually looks like as one job — the input/cleanup/output sandwich that’s the bones of any real PySpark pipeline:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder
.appName("OrdersETL")
.master("local[*]")
.getOrCreate())
# 1. Read — schema-on-read, everything string
raw = (spark.read
.option("header", True)
.csv("./data/orders_raw.csv"))
# 2. Rename to canonical column names
renamed = (raw
.withColumnRenamed("ord_id", "OrderId")
.withColumnRenamed("cust_id", "CustomerId")
.withColumnRenamed("total_str", "Total")
.withColumnRenamed("ctry", "Country")
.withColumnRenamed("ord_dt", "OrderDate"))
# 3. Cast to real types
typed = (renamed
.withColumn("OrderId", col("OrderId").cast("int"))
.withColumn("CustomerId", col("CustomerId").cast("int"))
.withColumn("Total", col("Total").cast("double"))
.withColumn("OrderDate", col("OrderDate").cast("date")))
# 4. Data quality check — fail loud if too many NULLs appeared
bad = typed.where(
col("OrderId").isNull() |
col("Total").isNull() |
col("OrderDate").isNull()
).count()
if bad > 0:
print(f"WARNING: {bad} rows lost data in cast")
# 5. Drop scratch columns and select the final shape
final = typed.drop("scratch", "internal_flag").select(
"OrderId", "CustomerId", "Total", "Country", "OrderDate"
)
# 6. Write Parquet, partitioned by country for downstream queries
(final
.write
.mode("overwrite")
.partitionBy("Country")
.parquet("./out/orders_clean"))
Six steps. Read, rename, cast, validate, project, write. That structure repeats across thousands of pipelines with different column names, different sources, different output formats — but the bones are the same.
Common errors and how to read them
The two error messages you’ll see most often when working with these operators:
AnalysisException: cannot resolve 'col_name' given input columns: [...]
You typo’d a column name, or you’re referencing a column that was renamed earlier in the pipeline. The error helpfully prints the actual columns in the brackets. Read them carefully — usually you’ll spot your typo or realize you’re looking at the post-rename schema instead of the pre-rename one.
Silent NULLs after cast.
Not technically an error — that’s the whole point of the trap. The pipeline finishes, the row count is the same, the values are wrong. The fix is the validation step in pattern 4 above. Always count NULLs after a cast on important columns, especially the ones that come from external systems.
Run this on your own machine
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
spark = (SparkSession.builder.appName("Cleanup").master("local[*]").getOrCreate())
raw = spark.createDataFrame(
[
("1001", "1", "59.00", "NL", "2026-03-05", "debug-x"),
("1002", "1", "29.00", "NL", "2026-03-18", "debug-y"),
("1003", "2", "abc", "DE", "2026-03-10", "debug-z"),
],
"ord_id STRING, cust_id STRING, total_str STRING, ctry STRING, ord_dt STRING, scratch STRING",
)
# 1. Chained renames
v1 = (raw
.withColumnRenamed("ord_id", "OrderId")
.withColumnRenamed("cust_id", "CustomerId")
.withColumnRenamed("total_str", "Total")
.withColumnRenamed("ctry", "Country")
.withColumnRenamed("ord_dt", "OrderDate"))
# 2. Drop the scratch column
v2 = v1.drop("scratch")
# 3. Cast types — note row 3 with "abc" becomes NULL
v3 = (v2
.withColumn("OrderId", col("OrderId").cast("int"))
.withColumn("CustomerId", col("CustomerId").cast("int"))
.withColumn("Total", col("Total").cast("double"))
.withColumn("OrderDate", col("OrderDate").cast("date")))
v3.show()
# 4. Same pipeline via selectExpr
v_alt = raw.selectExpr(
"CAST(ord_id AS INT) AS OrderId",
"CAST(cust_id AS INT) AS CustomerId",
"CAST(total_str AS DOUBLE) AS Total",
"ctry AS Country",
"CAST(ord_dt AS DATE) AS OrderDate",
)
v_alt.show()
# 5. Count rows with cast casualties
bad = v3.where(col("Total").isNull()).count()
print(f"Bad Total rows: {bad}")
Notice how row 3 shows up with Total = NULL and no error was raised anywhere. That’s the silent-NULL trap in action. Build the habit of always counting NULLs after a cast on a column that matters — three lines of defensive code today saves a five-hour data-loss investigation in two months.
That closes Module 3. You can read data, project columns, filter rows, build expressions, aggregate, sort, and clean up. That’s a full DataFrame-fundamentals toolkit; from here you can write a real PySpark job and ship it. Module 4 starts next lesson with joins — inner, left, semi, anti, broadcast, and the failure modes that make a join the most common reason a Spark job runs out of memory.