PySpark, from the ground up Lesson 8 / 60

Your first SparkSession

The entry point to any PySpark job. What a SparkSession is, the configurations that matter, and what `local[*]` actually means.

Every PySpark program — every notebook cell, every batch job, every streaming pipeline — starts with the same three lines.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyJob").getOrCreate()

That spark object is the door into everything Spark can do. DataFrames, SQL, streaming, ML, the catalogue, configuration — all of it hangs off the SparkSession. If you understand what those three lines actually configure, you’ve understood the foundation of every Spark job you’ll ever write.

This lesson takes that incantation apart, piece by piece.

What a SparkSession actually is

Pre-Spark 2.0, the API was three separate objects: SparkContext (cluster connection, RDDs), SQLContext (DataFrames and SQL), and HiveContext (Hive-flavoured SQL). To do anything non-trivial you needed all three, and the pattern of constructing them was painful.

Spark 2.0 unified them into SparkSession. Internally, the SparkSession still owns a SparkContext (you can reach it via spark.sparkContext), but you almost never need to touch it directly.

The SparkSession is a singleton per JVM. getOrCreate() returns the existing one if it exists, or builds a new one if it doesn’t. In a notebook this matters: if you run the cell twice, you get the same session both times, with the configuration from the first call. Configuration set on later calls to builder is silently ignored. We’ll see how to deal with that.

The standard incantation, expanded

Here’s the full builder pattern with every parameter we care about:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("RuneholdSalesETL")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.driver.memory", "4g")
         .getOrCreate())

Three things to know about the builder:

  1. It’s a fluent API — every .appName(), .master(), .config() returns the builder again, so you chain them. The wrapping parentheses let you split across lines without ugly backslashes.
  2. The order doesn’t matter. appName before master is identical to master before appName.
  3. getOrCreate() actually constructs the session and returns it. Until you call it, you have a builder, not a session.

Now let’s go through each piece.

appName — naming the job

.appName("RuneholdSalesETL")

This name shows up:

  • In the Spark UI title bar (localhost:4040).
  • In the cluster manager (YARN ResourceManager, Kubernetes pod name, Spark master web UI) when you submit to a real cluster.
  • In log lines and event logs.
  • In Databricks/EMR job listings.

Pick a name that’s specific. MyApp, Test, Spark are useless when you’re staring at five concurrent jobs in production. Use something like runehold-orders-daily-etl or customer-churn-feature-build. It costs nothing and saves you the next time something goes wrong at 3am.

master — where the executors live

.master("local[*]")

The master URL tells Spark where to run the executor processes. This is the single most important configuration choice.

The values you’ll actually use:

URLMeaning
localOne thread, in the driver JVM. Useful for tiny smoke tests; no parallelism at all.
local[N]N threads, where N is an integer. local[4] = four executor threads.
local[*]One thread per CPU core on the machine. Best for development.
local[*, 3]Same as above, but retry failed tasks up to 3 times.
spark://host:7077Standalone Spark cluster master.
yarnHadoop YARN cluster — config picked up from HADOOP_CONF_DIR.
k8s://https://api.cluster.local:6443Kubernetes cluster.
mesos://host:5050Mesos. (Deprecated; you won’t see new deployments.)

In all local modes the executors run inside the same JVM as the driver. There’s no network. No serialisation across the wire. It’s one process pretending to be a cluster. This is great for development — fast startup, easy debugging, no cluster needed — but it means your “it works on my laptop” results don’t always translate to a real cluster, where shuffles cross the network and serialisation matters.

For everything in this course, local[*] is what you want.

A note on don’t set master in code if you’ll deploy to a cluster. When you spark-submit to YARN or Kubernetes, the cluster manager passes --master yarn (or whatever) on the command line. If your code also calls .master("local[*]"), the hardcoded value wins and your “production” job runs locally inside the gateway machine, which is bad. The pattern most teams use:

# Don't hardcode master in production code
spark = (SparkSession.builder
         .appName("RuneholdSalesETL")
         # No .master() — pick it up from spark-submit / SPARK_MASTER env
         .getOrCreate())

For local development, override on the command line: spark-submit --master local[*] my_job.py. Or set the MASTER environment variable. We’ll come back to this in Module 7.

The configs that matter on day one

.config(key, value) sets any of the hundreds of spark.* properties. There’s a full list at spark.apache.org/docs/latest/configuration.html. Most of them you’ll never touch. A handful you’ll set in every job.

spark.sql.shuffle.partitions

Default: 200. The number of partitions Spark uses for shuffle operations — joins, aggregations, distincts, anything that has to redistribute data across executors.

200 is the value Databricks engineers picked years ago because it works “okay” on a typical 50-node production cluster. On your laptop with 8 cores, 200 partitions of a 10MB DataFrame means each partition has 50KB and Spark spends more time scheduling tasks than doing work. Your df.groupBy(...).count() takes 30 seconds for no good reason.

Rule of thumb for local dev: set it to your core count, or 2× your core count.

.config("spark.sql.shuffle.partitions", "8")

In production, the right value is roughly total_data_in_GB * 2 or cores * 3, depending on your job shape. We’ll cover tuning in Module 9.

spark.sql.adaptive.enabled

Default: true in Spark 3.2+. Enables Adaptive Query Execution (AQE), which lets Spark re-plan a query at runtime based on actual data sizes from the previous stage. AQE will coalesce tiny shuffle partitions, switch from sort-merge to broadcast joins when one side turns out to be small, and skew-handle automatically.

It is, in 99% of cases, free performance. Leave it on. Set it explicitly for older Spark versions and for clarity:

.config("spark.sql.adaptive.enabled", "true")

spark.driver.memory

Default: 1g. The heap size of the driver JVM. The driver holds the query plan, the schema catalogue, broadcast variables, and the results of .collect()/.toPandas().

If you ever see java.lang.OutOfMemoryError: Java heap space and you weren’t doing a giant collect(), the driver is probably too small. On a dev laptop with 16GB of RAM, give it 4g:

.config("spark.driver.memory", "4g")

This config has to be set before the JVM starts, which means before the SparkSession is built. If you try spark.conf.set("spark.driver.memory", "4g") after getOrCreate(), it’s silently ignored — the JVM is already running with 1GB.

spark.executor.memory

Default: 1g. The heap size of each executor JVM. In local[*] mode this is mostly cosmetic because the driver and executors share a JVM. In real cluster mode it’s the most important tuning knob you have.

For local dev you can leave it alone or set it explicitly:

.config("spark.executor.memory", "2g")

Other configs you’ll see in the wild

  • spark.serializer — usually set to org.apache.spark.serializer.KryoSerializer for performance. Java serialiser is the default and is slow.
  • spark.sql.session.timeZone — defaults to the JVM timezone. Set explicitly to UTC to avoid silent timezone-conversion bugs.
  • spark.sql.legacy.timeParserPolicyCORRECTED is the modern parser, LEGACY is Spark 2.x behaviour. Pick one and document the choice.

Reading and changing configs at runtime

Once the session is up, you can read any config:

print(spark.conf.get("spark.sql.shuffle.partitions"))
# 8

print(spark.conf.get("spark.app.name"))
# RuneholdSalesETL

You can change runtime configs (anything that doesn’t require a JVM restart):

spark.conf.set("spark.sql.shuffle.partitions", 16)

But not JVM-level configs like spark.driver.memory. Those need to be set at builder time.

To list everything:

for k, v in spark.sparkContext.getConf().getAll():
    print(f"{k} = {v}")

You’ll see fifty-odd settings, most of which Spark filled in with defaults.

The Spark UI

The single most useful debugging tool in Spark.

print(spark.sparkContext.uiWebUrl)
# http://192.168.1.10:4040

While the SparkSession is alive, point your browser at that URL. You get:

  • Jobs — every action you’ve run, with stages and tasks, runtime, and a DAG.
  • Stages — task-level breakdown, where to look for skew.
  • Storage — what you’ve cached and how much memory it’s using.
  • SQL / DataFrame — the query plan, both logical and physical, with stage timings.
  • Environment — every config that’s actually in effect.
  • Executors — memory, GC, task counts per executor.

If port 4040 is taken (e.g. another SparkSession is already running), Spark binds to 4041. Then 4042. The actual port is in spark.sparkContext.uiWebUrl.

The UI dies the moment you call spark.stop(). To keep history around after a job finishes, configure the Spark History Server — but that’s a Module 8 topic.

A complete annotated template

Here’s what I actually paste at the top of every script and notebook:

from pyspark.sql import SparkSession

# Build the session. Order of .config() calls doesn't matter; getOrCreate() is what triggers JVM startup.
spark = (SparkSession.builder
         # Job identification — visible in Spark UI and cluster manager
         .appName("runehold-orders-daily-etl")

         # Where executors run. local[*] = one per core. Don't set in prod.
         .master("local[*]")

         # Shuffle parallelism. Default 200 is wrong for laptops.
         .config("spark.sql.shuffle.partitions", "8")

         # Driver heap. Set BEFORE JVM start. 4g is plenty for dev.
         .config("spark.driver.memory", "4g")

         # AQE: free performance on Spark 3.2+
         .config("spark.sql.adaptive.enabled", "true")
         .config("spark.sql.adaptive.coalescePartitions.enabled", "true")

         # Avoid silent timezone bugs
         .config("spark.sql.session.timeZone", "UTC")

         # Faster serialisation
         .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

         .getOrCreate())

# Quiet down the noisy startup logs
spark.sparkContext.setLogLevel("WARN")

# Sanity check
print(f"Spark version: {spark.version}")
print(f"Spark UI:      {spark.sparkContext.uiWebUrl}")
print(f"App ID:        {spark.sparkContext.applicationId}")

# Quick smoke DataFrame
df = spark.range(0, 1_000_000).selectExpr("id", "id % 7 AS modulo")
df.groupBy("modulo").count().orderBy("modulo").show()

spark.stop()

Run it. You should see Spark’s version, a UI URL, an application ID, and a 7-row table of counts. The whole script takes about 5 seconds on a modern laptop.

The “session already exists” trap

In a notebook, if you run a cell that builds a SparkSession with local[4], then change it to local[*] and re-run — the second call hits getOrCreate(), finds the existing session, and returns it unchanged. Your new master setting is ignored. Same for any .config() call.

Two ways out:

# Option 1: stop the existing session first
spark.stop()
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Option 2: clear the active session and rebuild
SparkSession.builder.appName("X").getOrCreate().stop()
SparkSession._instantiatedSession = None
spark = SparkSession.builder.master("local[*]").getOrCreate()

Most of the time, restarting the Python kernel is simpler and less error-prone.

What we have now

You can build a SparkSession from scratch, set the configurations that actually matter, read them back, find the UI, and avoid the singleton trap. Every lesson from here on starts with the assumption that you have a spark variable to work with.

Next lesson: actually putting data into Spark. CSV, JSON, Parquet — three formats with three different default behaviours, and the schema-on-read tradeoff that decides whether your job takes 10 seconds or 10 minutes.

Search