Python, de la zero Lecția 54 / 60

Proiect ML: o problema de clasificare, end to end

De la CSV brut la model deployat: lectiile Modulului 9 facute palpabile.

Un model care obține 0.92 AUC într-un Jupyter notebook și nu părăsește niciodată notebook-ul valorează zero. Un model care obține 0.85 AUC și stă în spatele unui endpoint HTTP pe care sisteme reale îl pot apela valorează bani reali. Distanța dintre cele două e unde se blochează majoritatea data scientists, și e unde engineering-ul din machine learning engineering chiar își face apariția.

Lecția asta e concluzia practică a Modulului 9. Vom lua o problemă reală de clasificare binară - customer churn - de la CSV brut la serviciu de predicție deployat. Setul de date e IBM Telco Customer Churn, disponibil gratuit pe Kaggle și în multe repo-uri oglindite. Aproximativ 7.000 de clienți, vreo 20 de feature-uri, o coloană binară Churn. Aceeași formă de problemă pe care ai vedea-o în detecția de fraudă, default-ul de credit, predicția de conversie sau orice întrebare de tipul „va face acest user X”.

Fiecare pas păstrează deliberat codul direct. Scopul nu e să maximizezi scorul de pe leaderboard, ci să demonstrezi workflow-ul de la cap la coadă cu cod pe care ai fi efectiv dispus să-l livrezi.

Pasul 1: încărcare și explorare

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

df = pd.read_csv("telco_churn.csv")
print(df.shape)
print(df.dtypes)
print(df["Churn"].value_counts(normalize=True))

Primele întrebări: care e distribuția țintei? Ce coloane sunt numerice vs categorice? Valori lipsă?

print(df.isna().sum())

# TotalCharges e citit ca object - capcana frecventa, are string-uri goale pentru clientii noi
df["TotalCharges"] = pd.to_numeric(df["TotalCharges"], errors="coerce")
print(df["TotalCharges"].isna().sum())  # ~11 randuri
df = df.dropna(subset=["TotalCharges"])

df["Churn"] = (df["Churn"] == "Yes").astype(int)
df = df.drop(columns=["customerID"])

Rata de churn e aproximativ 27%, dezechilibrul de clase e ușor, dar real. Bun de știut pentru stratificare mai târziu. Sanity check vizual rapid:

fig, axes = plt.subplots(1, 2, figsize=(10, 4))
df["tenure"].hist(by=df["Churn"], bins=30, ax=axes)
axes[0].set_title("tenure | Churn=0")
axes[1].set_title("tenure | Churn=1")
plt.tight_layout()
plt.show()

Clienții care fac churn au tenure mult mai scurt. Prior util, ne spune că un arbore va face probabil split pe tenure devreme.

Pasul 2: feature engineering

Majoritatea coloanelor sunt categorice cu două sau trei niveluri. Câteva feature-uri derivate pe care le sugerează cunoștințele de domeniu:

df["AvgChargePerMonth"] = df["TotalCharges"] / df["tenure"].replace(0, 1)
df["IsLongTermContract"] = (df["Contract"] != "Month-to-month").astype(int)
df["NumServices"] = (
    (df["PhoneService"] == "Yes").astype(int)
    + (df["MultipleLines"] == "Yes").astype(int)
    + (df["InternetService"] != "No").astype(int)
    + (df["OnlineSecurity"] == "Yes").astype(int)
    + (df["OnlineBackup"] == "Yes").astype(int)
    + (df["DeviceProtection"] == "Yes").astype(int)
    + (df["TechSupport"] == "Yes").astype(int)
    + (df["StreamingTV"] == "Yes").astype(int)
    + (df["StreamingMovies"] == "Yes").astype(int)
)

Acestea sunt ipoteze de domeniu: clienții care plătesc mult pe lună față de istoricul lor ar putea face churn, tipul de contract contează, profunzimea totală a serviciilor contează. Lăsăm modelul să decidă dacă sunt utile.

Pentru pasul de modelare vrem o separare curată între categorice și numerice ca să putem construi un column transformer:

y = df["Churn"]
X = df.drop(columns=["Churn"])

categorical = X.select_dtypes(include="object").columns.tolist()
numeric = X.select_dtypes(exclude="object").columns.tolist()
print(f"{len(categorical)} categorical, {len(numeric)} numeric")

Pasul 3: split stratificat

Split în trei: train, validation (pentru early stopping în timpul tuning-ului), test (atins o singură dată, la final).

from sklearn.model_selection import train_test_split

X_temp, X_test, y_temp, y_test = train_test_split(
    X, y, test_size=0.15, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.15 / 0.85, stratify=y_temp, random_state=42
)
print(X_train.shape, X_val.shape, X_test.shape)

stratify=y păstrează rata de churn consistentă în split-uri. Cu clase dezechilibrate asta nu e opțional, fără el poți nimeri un set de test cu o rată pozitivă vizibil diferită și evaluarea ta devine zgomotoasă.

Pasul 4: baseline cu logistic regression

Baseline-ul ritual. Construiește un Pipeline ca preprocesarea și modelul să fie cuplate:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, classification_report

preprocess = ColumnTransformer([
    ("num", StandardScaler(), numeric),
    ("cat", OneHotEncoder(handle_unknown="ignore", drop="if_binary"), categorical),
])

baseline = Pipeline([
    ("pre", preprocess),
    ("clf", LogisticRegression(max_iter=1000, class_weight="balanced")),
])

baseline.fit(X_train, y_train)
val_proba = baseline.predict_proba(X_val)[:, 1]
print(f"Baseline AUC = {roc_auc_score(y_val, val_proba):.3f}")

Observă class_weight="balanced", util pentru dezechilibrul ușor, deși pentru rată pozitivă de 27% poți să-l lași și implicit. drop="if_binary" pe OneHotEncoder scapă de coloana redundantă pe feature-urile yes/no.

AUC tipic pe acest set de date: în jur de 0.84. E numărul pe care orice alt model trebuie să-l bată.

Pasul 5: modele mai bune cu setări implicite sensibile

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

xgb = Pipeline([
    ("pre", preprocess),
    ("clf", XGBClassifier(
        n_estimators=500,
        max_depth=6,
        learning_rate=0.05,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        eval_metric="logloss",
    )),
])
xgb.fit(X_train, y_train)
print(f"XGB AUC = {roc_auc_score(y_val, xgb.predict_proba(X_val)[:, 1]):.3f}")

lgbm = Pipeline([
    ("pre", preprocess),
    ("clf", LGBMClassifier(
        n_estimators=500,
        learning_rate=0.05,
        num_leaves=31,
        random_state=42,
    )),
])
lgbm.fit(X_train, y_train)
print(f"LGBM AUC = {roc_auc_score(y_val, lgbm.predict_proba(X_val)[:, 1]):.3f}")

Pe Telco, ambele ansambluri scot de obicei 0.85-0.86, doar un fir de păr peste baseline-ul liniar. Asta e, apropo, unul dintre pattern-urile de care a avertizat Lecția 52: pe o problemă dominată de semnale aditive, modelele liniare regularizate țin pasul.

Pentru pedagogie, să ne prefacem că diferența contează și să tunăm.

Pasul 6: tuning cu Optuna

import optuna
from sklearn.model_selection import StratifiedKFold, cross_val_score

def objective(trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 200, 1000),
        "max_depth": trial.suggest_int("max_depth", 3, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
        "subsample": trial.suggest_float("subsample", 0.6, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
        "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
    }
    pipe = Pipeline([
        ("pre", preprocess),
        ("clf", XGBClassifier(**params, random_state=42, eval_metric="logloss")),
    ])
    cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    scores = cross_val_score(pipe, X_train, y_train, scoring="roc_auc", cv=cv, n_jobs=1)
    return scores.mean()

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=40, show_progress_bar=True)

print("Best AUC (CV):", study.best_value)
print("Best params:", study.best_params)

best_xgb = Pipeline([
    ("pre", preprocess),
    ("clf", XGBClassifier(**study.best_params, random_state=42, eval_metric="logloss")),
])
best_xgb.fit(X_train, y_train)
val_auc = roc_auc_score(y_val, best_xgb.predict_proba(X_val)[:, 1])
print(f"Tuned XGB val AUC = {val_auc:.3f}")

Patruzeci de trial-uri sunt suficiente ca să vezi îmbunătățiri semnificative. Pe acest set de date ai putea câștiga încă 0.005-0.01 AUC. Merită? Depinde de business. Ideea e workflow-ul.

Pasul 7: evaluează onest

Acum setul de test, atins pentru prima oară:

from sklearn.metrics import (
    confusion_matrix, classification_report,
    precision_recall_curve, roc_curve, auc,
)

test_proba = best_xgb.predict_proba(X_test)[:, 1]
test_pred = (test_proba > 0.5).astype(int)

print(f"Test AUC = {roc_auc_score(y_test, test_proba):.3f}")
print(classification_report(y_test, test_pred))
print(confusion_matrix(y_test, test_pred))

AUC singur ascunde detalii operaționale importante. Confusion matrix-ul și precision/recall pe clasă îți spun ce fel de greșeli face modelul. Pentru churn, false negatives (am spus „nu va face churn”, clientul a făcut churn) sunt de obicei mai scumpe decât false positives (am spus „va face churn”, clientul nu a făcut, am risipit un cupon de retenție).

Acea asimetrie argumentează pentru tuning-ul threshold-ului de clasificare departe de 0.5:

prec, rec, thresh = precision_recall_curve(y_test, test_proba)
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(thresh, prec[:-1], label="precision")
ax.plot(thresh, rec[:-1], label="recall")
ax.set_xlabel("threshold")
ax.legend()
ax.grid(True)
plt.show()

Citești curba și alegi un threshold care se potrivește cu raportul tău de cost de business. Poate 0.35: threshold mai mic, mai multe predicții de churn, recall mai mare, precision mai mică. Acceptabil dacă un apel de retenție costă 5$ și un client salvat valorează 200$.

Pasul 8: interpretează cu SHAP

Stakeholderii vor întreba „de ce a marcat modelul acest client?” Valorile SHAP îți dau o descompunere pe rând.

import shap

# Scoate clasificatorul antrenat din pipeline
clf = best_xgb.named_steps["clf"]
X_train_transformed = best_xgb.named_steps["pre"].transform(X_train)
X_test_transformed = best_xgb.named_steps["pre"].transform(X_test)

# Recupereaza numele feature-urilor din ColumnTransformer
feat_names = best_xgb.named_steps["pre"].get_feature_names_out()

explainer = shap.TreeExplainer(clf)
shap_values = explainer.shap_values(X_test_transformed)

# Vedere globala
shap.summary_plot(shap_values, X_test_transformed, feature_names=feat_names)

# Explicatie pentru un singur client
i = 0
shap.force_plot(
    explainer.expected_value,
    shap_values[i],
    X_test_transformed[i],
    feature_names=feat_names,
    matplotlib=True,
)

Summary plot-ul clasează feature-urile după cât de mult mișcă predicțiile în medie. Force plot-ul pentru un singur client îți spune „probabilitatea de churn a acestui client e 0.74 fiindcă contract=month-to-month a împins-o cu +0.18, tenure=3 a împins-o cu +0.12, …” A doua viziune e ce pui în fața unei echipe de prevenție a churn-ului, le spune ce levier să tragă.

Pasul 9: salvează pipeline-ul

Întregul pipeline - preprocesare plus model - salvat ca un singur obiect. Asta e ideea unui Pipeline.

import joblib
import json
from datetime import date

# Reantreneaza pe train+val pentru modelul final de productie
X_full = pd.concat([X_train, X_val])
y_full = pd.concat([y_train, y_val])
final_model = Pipeline([
    ("pre", preprocess),
    ("clf", XGBClassifier(**study.best_params, random_state=42, eval_metric="logloss")),
])
final_model.fit(X_full, y_full)

joblib.dump(final_model, "churn_model_v1.joblib")

# Documentarea schemei - contractul pentru apelanti
schema = {
    "model_version": "v1",
    "trained_on": str(date.today()),
    "test_auc": round(float(roc_auc_score(y_test, test_proba)), 3),
    "input_columns": {
        col: str(X[col].dtype) for col in X.columns
    },
    "categorical_levels": {
        col: sorted(X[col].dropna().unique().tolist()) for col in categorical
    },
}
with open("churn_model_v1.schema.json", "w") as f:
    json.dump(schema, f, indent=2, default=str)

Fișierul de schemă e critic. Fără el, peste șase luni, nimeni nu-și va aminti ce coloane așteaptă acest model, în ce ordine sau ce valori sunt valide pentru categorice. Salvează contractul.

Pasul 10: endpoint de servire minimal

Un serviciu FastAPI care încarcă pipeline-ul și prezice pe un record JSON postat:

# server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Literal, Optional
import pandas as pd
import joblib

app = FastAPI(title="Churn Predictor v1")
model = joblib.load("churn_model_v1.joblib")

class CustomerFeatures(BaseModel):
    gender: Literal["Male", "Female"]
    SeniorCitizen: int
    Partner: Literal["Yes", "No"]
    Dependents: Literal["Yes", "No"]
    tenure: int
    PhoneService: Literal["Yes", "No"]
    MultipleLines: Literal["Yes", "No", "No phone service"]
    InternetService: Literal["DSL", "Fiber optic", "No"]
    OnlineSecurity: Literal["Yes", "No", "No internet service"]
    OnlineBackup: Literal["Yes", "No", "No internet service"]
    DeviceProtection: Literal["Yes", "No", "No internet service"]
    TechSupport: Literal["Yes", "No", "No internet service"]
    StreamingTV: Literal["Yes", "No", "No internet service"]
    StreamingMovies: Literal["Yes", "No", "No internet service"]
    Contract: Literal["Month-to-month", "One year", "Two year"]
    PaperlessBilling: Literal["Yes", "No"]
    PaymentMethod: str
    MonthlyCharges: float
    TotalCharges: float

class Prediction(BaseModel):
    churn_probability: float
    will_churn: bool
    threshold: float = 0.35

@app.post("/predict", response_model=Prediction)
def predict(features: CustomerFeatures):
    df = pd.DataFrame([features.dict()])
    df["AvgChargePerMonth"] = df["TotalCharges"] / df["tenure"].replace(0, 1)
    df["IsLongTermContract"] = (df["Contract"] != "Month-to-month").astype(int)
    df["NumServices"] = (
        (df["PhoneService"] == "Yes").astype(int)
        + (df["MultipleLines"] == "Yes").astype(int)
        + (df["InternetService"] != "No").astype(int)
        + (df["OnlineSecurity"] == "Yes").astype(int)
        + (df["OnlineBackup"] == "Yes").astype(int)
        + (df["DeviceProtection"] == "Yes").astype(int)
        + (df["TechSupport"] == "Yes").astype(int)
        + (df["StreamingTV"] == "Yes").astype(int)
        + (df["StreamingMovies"] == "Yes").astype(int)
    )
    try:
        proba = float(model.predict_proba(df)[0, 1])
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
    return Prediction(
        churn_probability=proba,
        will_churn=proba > 0.35,
    )

@app.get("/health")
def health():
    return {"status": "ok", "model_version": "v1"}

Rulează cu:

uv add fastapi uvicorn pydantic joblib pandas xgboost scikit-learn
uvicorn server:app --reload --port 8000

Testează:

curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{"gender":"Female","SeniorCitizen":0,"Partner":"Yes","Dependents":"No","tenure":1,"PhoneService":"No","MultipleLines":"No phone service","InternetService":"DSL","OnlineSecurity":"No","OnlineBackup":"Yes","DeviceProtection":"No","TechSupport":"No","StreamingTV":"No","StreamingMovies":"No","Contract":"Month-to-month","PaperlessBilling":"Yes","PaymentMethod":"Electronic check","MonthlyCharges":29.85,"TotalCharges":29.85}'

Vei primi înapoi un JSON cu churn_probability și un boolean. Acela e un model deployabil.

Câteva lucruri pe care le-ai adăuga pentru producție și pe care le-am lăsat afară pentru lungime: validare de input față de schema salvată, structured logging al fiecărei predicții (input + output) pentru monitorizarea de drift, un endpoint Prometheus /metrics, tracing la nivel de request și aceeași funcție de feature engineering importată dintr-un modul partajat în loc să fie copiată între antrenare și servire. Ultimul e cel mai important. Train/serve skew, feature-urile tale de antrenare și de servire sunt calculate de cod diferit care nu se înțelege subtil, e cea mai comună cauză a „modelul a mers grozav la evaluare, de ce e prost în producție”. Factorizează mereu feature engineering-ul într-o funcție pe care atât notebook-ul de antrenare, cât și endpoint-ul de servire o importă.

Ce ai, totul împreună

E un proiect complet. Date brute încărcate, curățate, feature-uri construite, split cu stratificare, baseline antrenat, modele mai bune antrenate, tuning cu Optuna, evaluare pe un set de test păstrat deoparte, interpretat cu SHAP, pipeline și schemă salvate și un endpoint HTTP ridicat. Fiecare pas se potrivește cu o unealtă din Modulul 9.

Lucrul pe care nimeni nu-l spune data scientists juniori: modelarea în sine e poate 20% din muncă. Celelalte 80% sunt încărcarea, curățarea, split-ul, evaluarea, salvarea și servirea. Odată ce ai schela aia, poți schimba modelele aproape trivial. Schela e activul.

Ce urmează: deep learning

Asta încheie Modulul 9. Am stat ferm în lumea modelelor tabulare, liniar, arbore, ansamblu. Cel mai adânc model pe care l-am atins are încă butoane interpretabile clare. Modulul 10 trece la deep learning: PyTorch, antrenarea de rețele neuronale, transformere și părțile din ML modern care nu se potrivesc confortabil într-un apel model.fit(X, y). Toolchain diferit, stil diferit de debugging, moduri diferite de eșec. Dar multe dintre aceleași lecții de workflow din acest modul - disciplina train/test, baseline mai întâi, salvează pipeline-ul, ai grijă la skew - se transferă direct.

Ne vedem în Modulul 10.


Referințe: ghidul de utilizator scikit-learn, documentația Optuna (https://optuna.org/), documentația SHAP (https://shap.readthedocs.io/), Telco customer churn dataset (IBM, disponibil pe Kaggle). Consultat 2026-05-01.

Caută