PySpark, de la zero Lecția 2 / 60

Ideea MapReduce si de ce a contat

Lucrarea Google din 2004, modelul care a facut procesarea distribuita abordabila si de ce toata lumea a trecut mai departe intr-un deceniu.

Nu poți înțelege cu adevărat Spark fără să înțelegi lucrul pe care a fost construit să-l înlocuiască. Așa că înainte să ajungem la Spark însuși în lecția 3, trebuie să petrecem o lecție pe MapReduce, modelul de programare care a definit un întreg deceniu de procesare distribuită a datelor, și sistemul care, mai mult decât orice altceva, a făcut posibil ca un programator de aplicații obișnuit să facă muncă pe o mie de mașini fără să devină mai întâi expert în sisteme distribuite.

Dacă ai început să lucrezi cu date după, să zicem, 2018, s-ar putea să fi sărit complet peste MapReduce, în același mod în care cineva care învață JavaScript astăzi poate evita în mare parte jQuery. E ok, dar lasă o lacună. Cea mai mare parte a vocabularului datelor distribuite, map, reduce, shuffle, partition, job, stage, vine direct din MapReduce. Spark a moștenit modelul pe de-a-ntregul și apoi a optimizat părțile care dureau cel mai tare. Să știi ce a fost MapReduce, ce a făcut bine și ce a făcut catastrofal de prost îți spune cea mai mare parte din ce trebuie să știi despre de ce arată Spark așa cum arată.

Lucrarea din 2004

Povestea începe în decembrie 2004, la cel de-al șaselea simpozion USENIX privind Operating Systems Design and Implementation, unde doi ingineri Google pe nume Jeffrey Dean și Sanjay Ghemawat au prezentat o lucrare intitulată „MapReduce: Simplified Data Processing on Large Clusters”. E una dintre cele mai citate lucrări de sisteme din istoria informaticii, iar dacă n-ai citit-o niciodată, merită o seară: are unsprezece pagini, e scrisă pe înțelesul tuturor și e înșelător de simplă.

Contextul e important. Google în 2003 rula un întreg crawl web, un index de căutare, un sistem de reclame și o duzină de alte servicii interne pe ceea ce era deja o flotă la scară planetară de servere ieftine de uz general. Inginerii lor scriau tot timpul programe distribuite custom: ca să parseze log-uri, să construiască indici inversați, să numere lucruri, să sorteze lucruri, să dedupliceze lucruri. Fiecare dintre acele programe trebuia să rezolve independent aceleași probleme grele: cum împarți input-ul pe mașini, cum gestionezi inevitabilele eșecuri, cum coordonezi worker-ii, cum reasamblezi răspunsul. Fiecare program reinventa același schelet, prost.

Intuiția lui Dean și Ghemawat a fost că aproape toate aceste programe, deși arătau superficial diferit, se potriveau cu aceeași formă de bază. Iei un input mare, transformi fiecare înregistrare în una sau mai multe înregistrări intermediare (pasul map), grupezi toate înregistrările intermediare după o cheie (pasul shuffle, pe care lucrarea abia îl numește), iar apoi combini înregistrările pentru fiecare cheie într-un răspuns final (pasul reduce). Dacă le-ai putea da programatorilor un API în care scriu doar funcția de map și funcția de reduce, iar framework-ul gestionează restul, împărțirea, planificarea, shuffle-ul, toleranța la eșec, retry-urile, atunci un număr vast de programe distribuite ar deveni dintr-odată triviale de scris.

Asta e toată ideea. Map. Shuffle. Reduce.

Modelul în cuvinte simple

Un job MapReduce are trei faze logice.

Faza de map rulează în paralel pe mulți worker-i. Fiecare worker citește o bucată din input, de obicei un bloc din sistemul de fișiere distribuit, și rulează o funcție map furnizată de utilizator pe fiecare înregistrare. Funcția de map ia o înregistrare de input și emite zero, una sau mai multe perechi (cheie, valoare). Atât. Fără coordonare cu alți worker-i, fără stare partajată, fără variabile globale. Fiecare task de map e jenant de paralel și complet izolat.

Faza de shuffle e partea la care nu se gândește nimeni până nu o debug-ează la 2 dimineața. După ce sunt produse toate output-urile de map, framework-ul le grupează după cheie. Toate perechile (cheie, valoare) cu aceeași cheie, indiferent ce mapper le-a produs, sunt trimise la același reducer. Asta necesită mutarea datelor prin rețea, care e partea scumpă. În implementarea originală Google, output-urile mapper-ilor sunt scrise pe disc local, apoi trase de reducers prin rețea. Shuffle-ul e locul unde dispare cea mai mare parte din timpul real al unui job MapReduce.

Faza de reduce rulează și ea în paralel pe mulți worker-i. Fiecare reducer e responsabil pentru un subset de chei. Pentru fiecare cheie, primește lista completă de valori pe care orice mapper le-a emis pentru acea cheie și rulează o funcție reduce furnizată de utilizator pe acea listă pentru a produce output-ul final. Output-ul e scris înapoi în sistemul de fișiere distribuit.

Acesta e tot modelul. Două funcții, un shuffle, trei faze, totul paralel. Framework-ul decide câți mapper-i și reducers să ruleze, unde să-i ruleze, ce să facă atunci când unul dintre ei moare și cum să asambleze fișierele finale.

Un exemplu practic: word count

Fiecare tutorial MapReduce din 2004 încoace a folosit același exemplu, numărarea cât de des apare fiecare cuvânt într-un corpus mare, fiindcă captează modelul perfect, fără detalii străine. Hai să-l parcurgem la fel.

Imaginează-ți că input-ul e un director plin cu fișiere text în HDFS, totalizând să zicem 2 TB. Vrei, pentru fiecare cuvânt care apare oriunde în corpus, numărul de ori în care apare în toate fișierele.

Pseudocodul e cu adevărat de vreo patru linii, ceea ce e tot ideea.

map(filename, line):
    for word in line.split():
        emit(word, 1)

reduce(word, counts):
    emit(word, sum(counts))

Atât. Framework-ul citește input-ul, dă fiecare linie a fiecărui fișier unui mapper, iar mapper-ul emite (word, 1) pentru fiecare cuvânt pe care îl vede. Framework-ul face shuffle pe toate acele tupluri astfel încât toate perechile (the, 1) ajung la același reducer, toate perechile (spark, 1) ajung la alt reducer, și așa mai departe. Fiecare reducer își vede cheia atribuită plus un iterabil cu toate cifrele 1 care au fost emise pentru acea cheie, le adună și scrie totalul.

Dacă corpusul tău are 10 miliarde de cuvinte și 1 milion de cuvinte distincte, MapReduce va rula bucuros asta pe o mie de mașini, va recupera din câteva eșecuri de worker pe parcurs și îți va da înapoi un singur tabel deduplicat cu numărători de cuvinte. Programatorul a scris opt linii de pseudocod. Framework-ul a făcut restul.

Asta e descoperirea. Nu viteza, MapReduce-ul timpuriu nu era rapid în niciun sens absolut, ci abstracția. Procesarea distribuită a trecut de la „scrie un sistem RPC custom, un partitioner custom, un strat de retry custom și roagă-te” la „scrie o funcție de map și o funcție de reduce”. Dintr-odată, sute de ingineri Google care n-aveau nicio treabă cu scrierea de cod pentru sisteme distribuite scriau cod pentru sisteme distribuite, și majoritatea funcționa.

Hadoop: transformarea lucrării în open source

Lucrarea Google descria un sistem intern Google. Implementarea n-a fost lansată. Dar doi ingineri de la Yahoo, Doug Cutting și Mike Cafarella, lucrau deja la un crawler web open-source numit Nutch și se loveau de exact aceleași probleme de scalare cu care se lovise Google cu câțiva ani mai devreme. Au citit lucrarea. Au citit lucrarea înrudită despre Google File System din 2003. Iar în 2006 au extras părțile relevante din Nutch într-un proiect nou numit după elefantul de pluș al fiului lui Cutting: Hadoop.

Hadoop a fost, în esență, o clonă open-source a celor două sisteme fondatoare ale Google. Hadoop Distributed File System (HDFS) era analogul GFS: un mod de a stoca fișiere enorme pe multe mașini cu replicare pentru toleranță la eșec. Hadoop MapReduce era analogul MapReduce: un framework Java pentru a scrie funcții de map și reduce și a le rula pe un cluster.

Combinația, HDFS pentru stocare, MapReduce pentru calcul, a devenit platforma open-source implicită pentru big data timp de aproximativ un deceniu. Cloudera, Hortonworks, MapR și o duzină de distribuții mai mici și-au construit afaceri din împachetarea și suportul ei. Pe la 2012, fiecare companie din Fortune 500 care avea vreo pretenție de a fi „data-driven” avea un cluster Hadoop pe undeva într-un subsol, adesea aproape neutilizat, aproape întotdeauna operat de exact un consultant îngrozit pe nume Steve.

Ecosistemul Hadoop a crescut enorm. Hive (SQL peste MapReduce). Pig (un limbaj de scripting peste MapReduce). HBase (o bază de date NoSQL peste HDFS). Oozie (un scheduler de workflow pentru înlănțuirea job-urilor MapReduce). YARN (un resource manager pentru rularea mai multor framework-uri pe același cluster, introdus în Hadoop 2.0 în 2012). Cele mai multe dintre acestea sunt încă prezente în 2026 într-o formă sau alta, deși de obicei în roluri mult reduse. Hive în particular supraviețuiește ca strat de metadate, Hive Metastore, mult după ce aproape nimeni nu mai rulează query-uri Hive prin MapReduce.

De ce a trecut toată lumea mai departe

Pe la 2014, fisurile din MapReduce erau imposibil de ignorat. Modelul era elegant, dar implementarea originală avea trei proprietăți care făceau utilizarea lui din ce în ce mai dureroasă.

Totul are două stage-uri. Map apoi reduce. Acela e tot API-ul. Munca reală cu date aproape niciodată nu arată ca exact un map și un reduce. Un pipeline tipic e: citește input, filtrează, parsează, fă join cu un tabel de lookup, grupează, agregă, sortează, fă join din nou, scrie output. În MapReduce, fiecare dintre acele operații devine propriul ei job, cu propriul pas de map, propriul pas de reduce și propria scriere în HDFS între ele. Un pipeline care ar trebui să fie un query de cinci pași devine un lanț de șapte job-uri MapReduce separate orchestrate de Oozie, fiecare un program Java de sine stătător cu o sută de linii de boilerplate.

Starea intermediară lovește discul între fiecare job. Acesta e ucigașul. Output-ul unui job MapReduce e mereu scris în HDFS, în trei exemplare (factorul de replicare implicit de 3), iar apoi următorul job din lanț îl citește înapoi. Dacă pipeline-ul tău are șase stage-uri, ai scris datele pe disc de șase ori și le-ai citit înapoi de cinci ori. Pe discurile rotative din 2010, cu lățimea de bandă a rețelei din 2010, așa funcționau lucrurile, iar oamenii acceptau asta. Pe la 2014, cu SSD-uri și rețea de 10 gigabit, era vizibil absurd.

Algoritmii iterativi sunt catastrofal de lenți. Învățarea automată, algoritmii pe grafuri și orice altceva care rulează aceeași logică pe aceleași date de mai multe ori, k-means clustering, PageRank, regresie logistică, fac toate asta. Fiecare iterație e un job MapReduce întreg, ceea ce înseamnă că fiecare iterație scrie întregul dataset în HDFS și îl citește înapoi. Un job de k-means clustering care ar trebui să dureze 30 de secunde durează 30 de minute, aproape totul fiind I/O de disc pentru date care nici măcar nu s-au schimbat între iterații. Acesta a fost punctul de durere specific care a motivat Spark.

Au fost și alte plângeri: API-ul Java era prolix, modurile de eșec erau obscure, fișierele de configurare erau XML interminabil, overhead-ul de pornire JVM făcea job-urile mici disproporționat de lente, dar problema cea mai profundă era arhitecturală. MapReduce era un model proiectat în 2003 în jurul presupunerilor de hardware din 2003, iar până în 2014 acele presupuneri nu mai erau valabile.

Soluția, după cum probabil ghicești deja, era de a construi un sistem care să țină starea intermediară în memorie între operații, în loc să o scrie pe disc de fiecare dată. Să-i lase pe programatori să înlănțuie câte transformări vor într-un singur job logic, motorul decizând câte stage-uri fizice sunt cu adevărat necesare. Să facă iterația ieftină prin caching-ul seturilor de lucru în RAM. Să nu se mai prefacă că fiecare calcul e exact un map plus un reduce.

Acel sistem a fost Spark, și e subiectul lecției 3.

Pentru lecturi suplimentare: lucrarea originală MapReduce a lui Dean si Ghemawat e cu adevărat una dintre cele mai citibile lucrări de sisteme publicate vreodată și merită citită chiar dacă nu vei scrie niciodată un job MapReduce în viața ta. Documentația proiectului Apache Hadoop e încă întreținută, în principal pentru părțile (HDFS, YARN, Metastore) care au supraviețuit tranziției. Ambele sunt context util pentru restul cursului ăsta, chiar dacă nu vom scrie niciun cod MapReduce real.

Caută