PySpark, de la zero Lecția 37 / 60

PySpark SQL: cand SQL bate sintaxa DataFrame

Inregistrarea de temp views, apelarea spark.sql() si cazurile in care string-ul SQL e sincer mai curat decat lantul DataFrame.

Modulul 7 începe aici, iar tema este optimizatorul: Catalyst. Înainte să-i deschidem capota, există o bucată din API pe care majoritatea cursurilor de PySpark o tratează superficial fiindcă pare un drum lateral: interfața SQL. Orice DataFrame poate fi transformat într-un tabel SQL cu o singură linie, interogat cu un string și transformat înapoi într-un DataFrame. Nu e un motor separat, nu e mai lent, nu e legacy. E același pipeline Catalyst care intră printr-o altă ușă.

Motivul pentru care contează în acest modul este că, odată ce poți scrie aceeași interogare în două feluri (DataFrame și SQL), începi să observi când fiecare e unealta potrivită. Iar acea decizie influențează direct cât de citibile sunt pipeline-urile tale peste șase luni, când cineva (probabil tu) le debughează la 11 PM.

Două uși, un singur motor

Ia orice DataFrame. Înregistrează-l ca temporary view. Interoghează-l cu SQL.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PySparkSQL")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [
        (1, "IT", 59.0,  "2024-03-15"),
        (2, "IT", 29.0,  "2024-03-15"),
        (3, "NL", 149.0, "2024-04-02"),
        (4, "NL", 89.5,  "2024-04-02"),
        (5, "DE", 12.0,  "2025-01-08"),
        (6, "DE", 240.0, "2025-01-09"),
    ],
    "order_id INT, country STRING, total DOUBLE, dt STRING",
)

orders.createOrReplaceTempView("orders")

by_country_sql = spark.sql("""
    SELECT country, SUM(total) AS revenue
    FROM orders
    GROUP BY country
    ORDER BY revenue DESC
""")

by_country_df = (orders
                 .groupBy("country")
                 .agg(F.sum("total").alias("revenue"))
                 .orderBy(F.col("revenue").desc()))

Atât by_country_sql cât și by_country_df sunt DataFrames. Trec prin același parser Catalyst, aceleași rescrieri de logical plan, același planner fizic, aceeași generare de cod Tungsten. Rulează .explain() pe fiecare și vei vedea planuri identice, fiindcă sunt planuri identice. String-ul SQL e doar un front end alternativ.

Acesta e cel mai important fapt al lecției: nu există nicio diferență de performanță între cele două API-uri. Niciuna. Nu mai alege unul în detrimentul celuilalt „pentru performanță”. Alege în funcție de ce se citește mai bine.

Dacă vrei să vezi echivalența cu ochii tăi, rulează .explain(mode="extended") pe ambele. Secțiunile == Parsed Logical Plan == vor diferi puțin: string-ul SQL pornește ca un UnresolvedRelation nerezolvat, apelul DataFrame pornește deja rezolvat, dar până ajungi la == Optimized Logical Plan == și == Physical Plan ==, cele două output-uri sunt identice până la ID-urile operatorilor (care sunt cosmetice de session-counter). Nu e o coincidență; e arhitectura. Metodele de construcție DataFrame construiesc un arbore de logical plan, la fel face și parserul SQL. După aceea, fiecare regulă de optimizare, fiecare estimare de cost, fiecare pas de generare de cod se întâmplă pe o singură reprezentare partajată.

Când SQL e clar mai bun

Există câteva cazuri în care string-ul SQL va fi, de fiecare dată, mai plăcut decât lanțul DataFrame echivalent.

Interogări multi-CTE. Spark SQL suportă sintaxa CTE completă: WITH a AS (...), b AS (...), c AS (...) SELECT ..., iar dacă transformarea ta e natural trei sau patru etape numite, exact așa o descrie SQL.

spark.sql("""
    WITH daily_revenue AS (
        SELECT dt, country, SUM(total) AS revenue
        FROM orders
        GROUP BY dt, country
    ),
    country_totals AS (
        SELECT country, SUM(revenue) AS country_revenue
        FROM daily_revenue
        GROUP BY country
    ),
    flagged AS (
        SELECT d.*, c.country_revenue,
               d.revenue / c.country_revenue AS share
        FROM daily_revenue d
        JOIN country_totals c USING (country)
    )
    SELECT * FROM flagged WHERE share > 0.1
""").show()

Versiunea DataFrame e un lanț de trei variabile intermediare, fiecare urmată de .alias("daily_revenue") și re-unită mai târziu. Funcționează, dar fiecare cititor trebuie să reconstruiască mental ceea ce e, în esență, scoping de bloc WITH.

Window functions cu mai multe frame-uri. Sintaxa SQL OVER (PARTITION BY ... ORDER BY ... ROWS BETWEEN ...) este, sincer, exact pentru ce au fost concepute window functions. Le vom acoperi cum trebuie în lecția următoare, dar o interogare care folosește trei window-uri diferite în același SELECT e semnificativ mai îngrijită în SQL decât în DataFrame API, unde trebuie să declari fiecare specificație Window separat și s-o folosești în .over(...).

Cod care în mare parte este SQL. Când transformarea e „interogarea asta în warehouse, apoi salvează”, iar la mijloc nu există control flow Python, scrierea ei ca lanțuri DataFrame înseamnă doar a traduce SQL în Python și a-i cere următorului cititor să-l traducă înapoi. Sari peste pasul ăsta.

Când DataFrames sunt clar mai bune

Reversul: dacă te trezești construind un string SQL cu f-string-uri și apeluri .join(), lucrezi împotriva curentului.

Generare programatică. O listă de coloane de agregat, un dict de redenumiri, un pipeline condus de configurare: acestea sunt acasă în Python, nenorocite într-un string SQL.

metrics = ["revenue", "qty", "discount", "tax"]
agg_exprs = [F.sum(c).alias(f"total_{c}") for c in metrics]

(orders
 .groupBy("country")
 .agg(*agg_exprs)
 .show())

Încercarea de a asambla asta drept "SELECT country, " + ", ".join(...) + " FROM ..." funcționează, dar invită bug-uri de injection și se citește ca un template engine.

Integrare strânsă cu control flow. Dacă pasul următor depinde de o verificare împotriva unui count, sau iterezi peste o listă de surse, sau trebuie să faci branch în funcție de schemă, ești în Python. Rămâi în Python.

df = spark.read.parquet(path)

if "country" in df.columns:
    df = df.withColumn("country", F.upper("country"))

df.groupBy("country").count().show()

Poți exprima logică condițională în SQL, dar se văd cusăturile.

Referințe de coloane type-safe. F.col("revenue") + F.col("tax") supraviețuiește unei redenumiri în IDE și trece prin lint. String-ul "revenue + tax" nu. Pe un codebase de lungă durată, stilul cu referință la coloană își merită prețul.

Expresii de coloane reutilizabile. Un predicat complex folosit în trei interogări e o singură variabilă în lumea DataFrame:

is_active_paying = (F.col("status") == "active") & (F.col("plan") != "free")

active_orders   = orders.filter(is_active_paying)
active_users    = users.filter(is_active_paying)
active_sessions = sessions.filter(is_active_paying)

Echivalentul SQL e predicatul copy-pasted în trei clauze WHERE, cu tot riscul de drift pe care îl implică asta. Catalyst nu te va salva de a tasta același lucru de trei ori puțin greșit.

Amestecul celor două

Nu trebuie să te angajezi. Cele mai ergonomice pipeline-uri pe care le-am scris iau un DataFrame, îl înregistrează, rulează o bucată de SQL fiindcă acea parte e sincer mai curată și predau rezultatul înapoi sintaxei DataFrame pentru bucățile pe care Python le face mai bine:

raw = spark.read.parquet("s3://bucket/orders/")
raw.createOrReplaceTempView("raw_orders")

cleaned = spark.sql("""
    WITH dedup AS (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY ingested_at DESC) AS rn
        FROM raw_orders
    )
    SELECT order_id, country, total, dt
    FROM dedup
    WHERE rn = 1
""")

# Inapoi la DataFrame pentru imbogatire programatica
for col in ["country", "dt"]:
    cleaned = cleaned.withColumn(col, F.trim(col))

cleaned.write.mode("overwrite").parquet("s3://bucket/orders_cleaned/")

Folosește-o pe oricare exprimă pasul cel mai clar.

Inspectarea a ceea ce e înregistrat

spark.catalog e mânerul tău pentru ce views și tabele cunoaște Spark chiar acum.

spark.catalog.listTables()
# [Table(name='orders', database=None, description=None, tableType='TEMPORARY', ...)]

spark.catalog.listColumns("orders")
# [Column(name='order_id', description=None, dataType='int', nullable=True, ...), ...]

spark.catalog.dropTempView("orders")

Util în notebook-uri când ai înregistrat trei views și ai uitat care e care, și util în scripturi ca o verificare de bun simț înainte ca o interogare să ruleze.

Temp views, global temp views și managed tables

Trei niveluri de „permanent”. Alege-l pe cel care se potrivește cu durata de viață pe care o vrei efectiv.

Temp view. df.createOrReplaceTempView("orders"). Trăiește pentru SparkSession. Când sesiunea se termină, view-ul dispare. Limitat la sesiunea care l-a înregistrat: o altă SparkSession în aceeași aplicație nu-l poate vedea. Aceasta e valoarea implicită și răspunsul corect în 95% din cazuri.

Global temp view. df.createGlobalTempView("orders") (sau createOrReplaceGlobalTempView). Trăiește pentru întreaga aplicație Spark: fiecare SparkSession din JVM îl poate vedea. Trăiește în baza de date specială global_temp, deci îl interoghezi ca SELECT * FROM global_temp.orders. Util când ai mai multe sesiuni care partajează o aplicație Spark și trebuie să pasezi un view între ele. Aproape nimeni nu lovește acest caz.

Managed table. df.write.saveAsTable("warehouse.orders"). Persistat în metastore (Hive metastore, Glue, Unity Catalog), iar fișierele subiacente scrise în directorul warehouse. Supraviețuiește restart-urilor. Supraviețuiește cluster-elor. Asta vrei pentru tabele pe care le vei interoga mâine, săptămâna viitoare, dintr-un alt job, dintr-o altă echipă. Modulul 6 a folosit asta pentru bucketing: același mecanism.

# Limitat la sesiune - dispare cand sesiunea se termina
df.createOrReplaceTempView("session_local")

# Limitat la aplicatie - supravietuieste intre sesiuni in aceasta aplicatie
df.createGlobalTempView("app_wide")
spark.sql("SELECT * FROM global_temp.app_wide").show()

# Persistat in metastore - supravietuieste cluster-ului
df.write.mode("overwrite").saveAsTable("analytics.orders")

Modelul mental: temp view = variabilă locală, global temp view = variabilă la nivel de modul, managed table = fișier pe disc.

Un mic exemplu lucrat de la cap la coadă

Hai să facem pattern-ul multi-CTE pe DataFrame-ul orders de mai devreme și să confirmăm că ambele API-uri produc același plan.

orders.createOrReplaceTempView("orders")

# Forma SQL
sql_result = spark.sql("""
    WITH country_revenue AS (
        SELECT country, SUM(total) AS revenue
        FROM orders
        GROUP BY country
    )
    SELECT country, revenue
    FROM country_revenue
    WHERE revenue > 50
    ORDER BY revenue DESC
""")

# Forma DataFrame
df_result = (orders
             .groupBy("country")
             .agg(F.sum("total").alias("revenue"))
             .filter(F.col("revenue") > 50)
             .orderBy(F.col("revenue").desc()))

sql_result.explain(mode="formatted")
df_result.explain(mode="formatted")

Cele două output-uri explain diferă doar prin numere de ID cosmetice. Același plan fizic, același număr de shuffle-uri, același timp de ceas pe perete. Alegerea între ele ține pur și simplu de cine va citi acest cod peste trei luni.

Rulează asta pe propriul tău calculator

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("PySparkSQLDemo")
         .master("local[*]")
         .getOrCreate())

orders = spark.createDataFrame(
    [(i, ["IT", "NL", "DE"][i % 3], float(i * 7 % 200), f"2024-{(i % 12) + 1:02d}-01")
     for i in range(1, 50)],
    "order_id INT, country STRING, total DOUBLE, dt STRING",
)

orders.createOrReplaceTempView("orders")

# Multi-CTE in SQL
spark.sql("""
    WITH monthly AS (
        SELECT country, SUBSTR(dt, 1, 7) AS month, SUM(total) AS revenue
        FROM orders
        GROUP BY country, SUBSTR(dt, 1, 7)
    ),
    ranked AS (
        SELECT *, RANK() OVER (PARTITION BY month ORDER BY revenue DESC) AS rk
        FROM monthly
    )
    SELECT * FROM ranked WHERE rk = 1 ORDER BY month
""").show()

# Aceeasi logica, partial DataFrame, partial SQL
df = (orders
      .withColumn("month", F.substring("dt", 1, 7))
      .groupBy("country", "month")
      .agg(F.sum("total").alias("revenue")))

df.createOrReplaceTempView("monthly")
spark.sql("""
    SELECT * FROM (
        SELECT *, RANK() OVER (PARTITION BY month ORDER BY revenue DESC) AS rk
        FROM monthly
    ) WHERE rk = 1
""").show()

# Inspecteaza catalogul
for t in spark.catalog.listTables():
    print(t.name, t.tableType)

Rulează amândouă. Răsfoiește amândouă. Pe oricare o găsești mai ușor de citit mâine dimineață e cea potrivită pentru echipa ta.

Acel RANK() OVER (PARTITION BY ... ORDER BY ...) pe care tocmai l-ai scris este o window function: subiectul lecției următoare. Window functions sunt a doua cea mai utilă unealtă din trusa SQL după GROUP BY, iar odată ce le ai, încetezi să mai apelezi la self-join-uri ca să calculezi pattern-uri de tip „rândul N versus rândul N-1”.


Referințe: ghidul de programare Apache Spark SQL (https://spark.apache.org/docs/latest/sql-programming-guide.html) și referința API Spark Catalog. Consultat 2026-05-01.

Caută