PySpark, from the ground up Lesson 20 / 60

Transformations vs actions: the dichotomy and the catalog

Every PySpark operation is either a transformation or an action. Knowing which is which is half of debugging.

Last lesson set up the rule: transformations describe work, actions trigger it. This lesson is the catalog. It’s worth memorizing, because every time you debug a slow Spark job you’ll be asking the same question: which line in this 80-line script was the action that triggered all of this?

Sometimes that’s obvious — the .write.parquet(...) at the bottom. Sometimes it isn’t — somebody put a .count() six lines above for “logging” and now your pipeline runs twice.

The definition, more precisely

A transformation is a method that returns a new DataFrame and adds nothing to the world other than a node in the logical plan. It’s a pure function call against a DataFrame description. No data moves. No executors light up. The driver gets a new DataFrame reference and you keep going.

An action is a method that returns something other than a DataFrame, or causes a side effect (typically a write to disk or a callback executed against rows). The action is the moment Spark says “okay, time to actually do all that work you described.” The driver compiles the logical plan into a physical plan, breaks it into stages, sends tasks to executors, collects results, and returns them to your code.

The “is this an action?” rule of thumb that’s right 95% of the time:

Does this method return something other than a DataFrame? If yes, it’s an action.

The other 5% is awkward edge cases (cache, persist, checkpoint, sometimes createOrReplaceTempView) which we’ll handle separately.

Catalog: common transformations

Every method here returns a DataFrame and triggers nothing. Listed roughly by frequency of use:

  • select(*cols) — project columns. Possibly the single most-used transformation.
  • selectExpr(*expr_strings) — same, with SQL-string expressions instead of col() objects.
  • where(cond) / filter(cond) — keep rows where the condition is true. Same operator, two names.
  • withColumn(name, expr) — add or replace a column.
  • withColumnRenamed(old, new) — rename a column. Covered last lesson.
  • drop(*cols) — remove columns.
  • join(other, on, how) — combine two DataFrames on key columns.
  • groupBy(*cols).agg(...) — group rows and aggregate. Note: groupBy alone returns a GroupedData, not a DataFrame; the .agg(...) is what brings you back to DataFrame territory. Both calls together are a single transformation.
  • orderBy(*cols) / sort(*cols) — sort rows. Same operator, two names.
  • distinct() — remove duplicate rows across all columns.
  • dropDuplicates(*cols) — remove duplicates considering only the listed columns (or all if none given).
  • union(other) / unionAll(other) — append rows from another DataFrame, by position.
  • unionByName(other, allowMissingColumns=False) — append rows by column name. Safer than union when schemas might differ.
  • intersect(other) — keep only rows present in both DataFrames.
  • exceptAll(other) / subtract(other) — remove rows present in another DataFrame.
  • repartition(n, *cols) — redistribute data into n partitions, optionally hashed by columns.
  • coalesce(n) — reduce partition count without a full shuffle.
  • sample(fraction, withReplacement=False, seed=None) — random subset of rows.
  • limit(n) — keep at most n rows. Lazy! It does not return rows; it returns a DataFrame whose plan ends with a LIMIT n. To get the rows out you still need an action.
  • na.fill(...), na.drop(...), na.replace(...) — null handling.
  • withWatermark(col, threshold) — for streaming.

Every one of these returns a DataFrame. None runs anything by itself.

Catalog: common actions

Each of these returns something other than a DataFrame, or writes to disk:

  • show(n=20, truncate=True) — print rows to stdout. Returns None.
  • count() — number of rows. Returns an int.
  • collect() — pull all rows to the driver as a list of Row objects. Returns list[Row]. Dangerous: brings the entire DataFrame into driver memory.
  • take(n) — return the first n rows as a list. Returns list[Row].
  • first() — return the first row. Returns a Row.
  • head(n=1) — same as take when n > 1, same as first when n == 1.
  • tail(n) — last n rows. Returns list[Row]. Available in Spark 3+.
  • toPandas() — convert to a Pandas DataFrame. Like collect(), this materializes everything on the driver.
  • toLocalIterator() — iterate over rows on the driver, one partition at a time. Slower than collect() but bounded memory.
  • foreach(func) / foreachPartition(func) — apply a side-effecting function to each row or partition. Used for writing to external sinks not covered by write.*.
  • write.format(...).save(...) — and the shortcuts: write.parquet(...), write.csv(...), write.json(...), write.orc(...), write.saveAsTable(...), write.insertInto(...). The DataFrameWriter family. Every one of these is an action — the moment you call .save(), .parquet(), etc., Spark runs the pipeline and writes the results.
  • describe() and summary() — compute summary statistics. Both return DataFrames, but each call internally triggers a full pass to compute statistics, so they behave like actions in terms of cost. (Technically transformations that contain hidden actions inside their construction. A pedantic edge case.)

The pattern: if it gives you a number, a list, a print to stdout, or a file on disk, it’s an action.

Reading a real pipeline

Here’s a pipeline. Find the action.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder
         .appName("OrdersETL")
         .master("local[*]")
         .getOrCreate())

orders = spark.read.option("header", True).csv("./data/orders.csv")

customers = spark.read.option("header", True).csv("./data/customers.csv")

joined = orders.join(customers, on="CustomerId", how="left")

filtered = joined.where(col("Country") == "IT")

withVat = filtered.withColumn(
    "TotalWithVat",
    col("Total").cast("double") * 1.22
)

agg = (withVat
    .groupBy("CustomerId", "Country")
    .agg(F.sum("TotalWithVat").alias("LifetimeValue"),
         F.count("*").alias("Orders")))

ranked = agg.orderBy(F.desc("LifetimeValue"))

(ranked
 .write
 .mode("overwrite")
 .parquet("./out/customer_ltv"))

Every line from the read calls down to ranked = ... is a transformation. The whole chain is just plan-building. The read calls don’t even open the files; they just register metadata.

The write.parquet(...) is the action. That’s the line where Spark actually goes to disk, reads both CSVs, performs the join, filter, withColumn, groupBy, aggregation, and sort, and writes the Parquet output. One job, triggered by one action.

If you opened the Spark UI right after running this script, you’d see exactly one job listed. The job page would show several stages (one per shuffle boundary — more on that next lesson) and the line of your code that submitted it: the write.parquet.

When there are several actions, count the jobs

df = (spark.read.parquet("./big_table")
      .where(col("Country") == "IT")
      .withColumn("Year", F.year("OrderDate")))

print("Row count:", df.count())                 # job 1
df.show(5)                                       # job 2
df.write.parquet("./out/it_orders")             # job 3

Three actions, three jobs. The Parquet read happens three times unless you .cache() df first. Each job re-walks the plan from scratch. This is the “I called .count() and waited three minutes” problem from last lesson, multiplied by three.

If you’re writing production code, count the actions in your pipeline. Each one is a full execution. Two actions on the same intermediate DataFrame is two full pipelines unless you cache.

The awkward middle category

A few methods don’t fit cleanly. They’re worth knowing because they show up in real code and confuse people.

cache() and persist()

These are technically transformations. They return a DataFrame and don’t run anything by themselves. What they do is set a flag on the DataFrame: “next time an action runs against this DataFrame, after computing, keep the result in memory (or memory + disk, for persist) so subsequent actions can reuse it.”

expensive = (spark.read.parquet("./big_table")
             .where(col("Country") == "IT")
             .join(other_big_table, "CustomerId"))

expensive.cache()        # transformation: marks for caching, runs nothing

expensive.count()        # action: runs the pipeline, materializes the cache
expensive.show()         # action: reads from cache, fast
expensive.write...       # action: reads from cache, fast

The .cache() call alone does no work. The next action runs the pipeline and stores the result. Subsequent actions skip re-computation.

You’ll sometimes see code that does df.cache().count() — the count() is a deliberate “warm up the cache” action that forces materialization. Common idiom in performance-tuned pipelines. We’ll cover the full caching playbook in lesson 23.

checkpoint()

Cuts the lineage by physically saving the DataFrame to disk and starting a new lineage from that snapshot. Returns a DataFrame. Technically a transformation, but in practice it triggers a job to write the checkpoint. Treat it like an action for cost purposes.

createOrReplaceTempView(name) and createGlobalTempView(name)

These return None, so by the rule of thumb they look like actions. They aren’t. They register the DataFrame’s plan under a name in the SQL catalog so you can reference it from spark.sql(...). Nothing executes. The view is just another node in the query graph.

printSchema(), columns, dtypes, schema

All metadata. None of them touches data. Free. (Don’t be fooled by printSchema() being a method call with parentheses — it’s reading from the logical plan, not the data.)

explain()

Prints the query plan without running it. Free. Use it liberally.

How the Spark UI tells the story

Open the Spark UI (default http://localhost:4040 for a local session). The Jobs tab lists every action that ran, with the line of code that triggered it and the stages it broke into. The UI is your primary debugging tool from here on. Two habits worth building:

  1. After running any pipeline, glance at the Jobs tab. If the job count is more than the number of actions you intended, you’ve got a hidden action somewhere — look for stray .count() or .show() calls.
  2. Click into the slow stage. If one task is 10x longer than the others, you have skew. If the input size is 100 GB but you expected 10 MB, your filter didn’t push down.

A few practical consequences

Action placement matters for memory. collect() and toPandas() pull everything to the driver. On a 50 GB DataFrame this will OOM your driver and crash the session. Use take(n) to peek, show(n) to inspect, and only collect() when you’ve already filtered down to small result sets.

Repeated actions repeat all work. Three actions on the same intermediate DataFrame is three full pipelines unless you cache.

limit(n).show() is cheap. limit is a transformation; show runs the action with the limit in the plan. Spark won’t read the whole input — it’ll read partitions until it has n rows and stop. Good for prototyping against big tables.

count() after a filter is not free. It still walks the entire input file. If you only need to know “are there any rows?”, df.limit(1).take(1) is cheaper.

The two failure modes I’ve seen most often: the “logger that costs $300 a day” (somebody adds print(f"rows: {df.count()}") halfway through a 2 TB pipeline for visibility, doubling its cost), and the “tests pass, prod fails” (a .collect() works fine on 6 in-memory rows in tests and OOMs the driver on 80 GB in production). Both are catalog-knowledge problems. If you know .count() is an action and .collect() materializes on the driver, you don’t write either.

Run this on your own machine

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

spark = (SparkSession.builder.appName("Catalog").master("local[*]").getOrCreate())

orders = spark.createDataFrame(
    [(1, "IT", 59.0), (2, "IT", 29.0), (3, "NL", 149.0),
     (4, "IT", 89.5), (5, "DE", 14.0), (6, "IT", 42.42)],
    "OrderId INT, Country STRING, Total DOUBLE",
)

# Pure transformation chain — instant
chain = (orders
    .where(col("Country") == "IT")
    .withColumn("WithVat", col("Total") * 1.22)
    .groupBy("Country")
    .agg(F.sum("WithVat").alias("Total"),
         F.count("*").alias("N")))

# 1. Inspect the plan without running anything
chain.explain()

# 2. Action: show() runs the whole chain
chain.show()

# 3. Multiple actions => repeated work
print("count:", chain.count())     # full re-execution
print("count:", chain.count())     # full re-execution again

# 4. Cache, then multiple actions => one execution + cache reads
chain.cache()
chain.count()      # materializes the cache
chain.count()      # cache hit, fast
chain.show()       # cache hit, fast

# 5. Open http://localhost:4040 and look at the Jobs tab.
# You should see one job per action and only one full pipeline execution
# after the cache warmup.

Open the UI alongside this and watch each action add a job to the list. Then experiment: comment out the .cache() line and re-run; count the jobs again. The number of jobs is exactly the number of actions, every time.

Next lesson: the DAG. What Spark actually builds when an action triggers, why a job has multiple stages, where the boundaries are, and how the lineage graph survives executor failures.

Search