Oryginalną odpowiedź dotyczącą kodu można znaleźć poniżej.
Przede wszystkim musisz rozróżnić różne typy API, z których każdy ma własne względy wydajności.
RDD API
(czyste struktury Pythona z orkiestracją opartą na JVM)
Jest to komponent, na który najbardziej wpłynie wydajność kodu Pythona i szczegóły implementacji PySpark. Chociaż wydajność Pythona raczej nie będzie stanowić problemu, jest przynajmniej kilka czynników, które należy wziąć pod uwagę:
- Narzut komunikacji JVM. Praktycznie wszystkie dane przychodzące i wychodzące z modułu wykonawczego Pythona muszą być przekazywane przez gniazdo i proces roboczy JVM. Chociaż jest to stosunkowo wydajna komunikacja lokalna, nadal nie jest bezpłatna.
Wykonywacze oparte na procesach (Python) a programy wykonawcze oparte na wątkach (pojedyncza maszyna JVM, wiele wątków) (Scala). Każdy moduł wykonawczy Pythona działa we własnym procesie. Jako efekt uboczny zapewnia silniejszą izolację niż jej odpowiednik JVM i pewną kontrolę nad cyklem życia modułu wykonawczego, ale potencjalnie znacznie większe zużycie pamięci:
- ślad pamięci interpretera
- ślad załadowanych bibliotek
- mniej wydajne nadawanie (każdy proces wymaga własnej kopii transmisji)
Wydajność samego kodu Pythona. Ogólnie rzecz biorąc, Scala jest szybsza niż Python, ale będzie się różnić w zależności od zadania. Ponadto masz wiele opcji, w tym JIT, takie jak Numba , rozszerzenia C ( Cython ) lub specjalistyczne biblioteki, takie jak Theano . Wreszcie, jeśli nie używasz ML / MLlib (lub po prostu stosu NumPy) , rozważ użycie PyPy jako alternatywnego interpretera. Zobacz SPARK-3094 .
- Konfiguracja PySpark zapewnia
spark.python.worker.reuse
opcję, której można użyć do wyboru między rozwidlaniem procesu Pythona dla każdego zadania a ponownym wykorzystaniem istniejącego procesu. Ta ostatnia opcja wydaje się być przydatna do uniknięcia kosztownego zbierania śmieci (jest to bardziej wrażenie niż wynik systematycznych testów), podczas gdy pierwsza (domyślna) jest optymalna w przypadku drogich transmisji i importu.
- Zliczanie odwołań, używane jako pierwsza metoda wyrzucania elementów bezużytecznych w CPythonie, działa całkiem dobrze z typowymi obciążeniami Spark (przetwarzanie strumieniowe, brak cykli referencyjnych) i zmniejsza ryzyko długich przerw w GC.
MLlib
(mieszane wykonanie Pythona i JVM)
Podstawowe zagadnienia są prawie takie same jak wcześniej, z kilkoma dodatkowymi kwestiami. Podczas gdy podstawowe struktury używane w MLlib są zwykłymi obiektami RDD Pythona, wszystkie algorytmy są wykonywane bezpośrednio przy użyciu Scali.
Oznacza to dodatkowy koszt konwersji obiektów Pythona na obiekty Scala i odwrotnie, zwiększone zużycie pamięci i kilka dodatkowych ograniczeń, które omówimy później.
W chwili obecnej (Spark 2.x) interfejs API oparty na RDD jest w trybie konserwacji i ma zostać usunięty w Spark 3.0 .
DataFrame API i Spark ML
(Wykonanie JVM z kodem Python ograniczone do sterownika)
To prawdopodobnie najlepszy wybór do standardowych zadań przetwarzania danych. Ponieważ kod Pythona jest głównie ograniczony do wysokopoziomowych operacji logicznych na sterowniku, nie powinno być różnicy w wydajności między Pythonem i Scalą.
Jedynym wyjątkiem jest użycie wierszy UDF Python, które są znacznie mniej wydajne niż ich odpowiedniki w Scali. Chociaż istnieje pewna szansa na ulepszenia (nastąpił znaczny rozwój w Spark 2.0.0), największym ograniczeniem jest pełne przejście w obie strony między wewnętrzną reprezentacją (JVM) a interpreterem Pythona. Jeśli to możliwe, powinieneś preferować kompozycję wbudowanych wyrażeń ( przykład . Zachowanie UDF w Pythonie zostało ulepszone w Spark 2.0.0, ale nadal jest nieoptymalne w porównaniu z wykonaniem natywnym.
Może to ulec poprawie w przyszłości i znacznie się poprawiło wraz z wprowadzeniem wektoryzowanych UDF (SPARK-21190 i dalsze rozszerzenia) , które wykorzystują Arrow Streaming do wydajnej wymiany danych z zerową deserializacją. W przypadku większości aplikacji można po prostu zignorować dodatkowe koszty.
Upewnij się również, że unikasz niepotrzebnego przesyłania danych między DataFrames
a RDDs
. Wymaga to kosztownej serializacji i deserializacji, nie wspominając o transferze danych do iz interpretera Pythona.
Warto zauważyć, że wywołania Py4J mają dość duże opóźnienia. Obejmuje to proste połączenia, takie jak:
from pyspark.sql.functions import col
col("foo")
Zwykle nie powinno to mieć znaczenia (narzut jest stały i nie zależy od ilości danych), ale w przypadku miękkich aplikacji czasu rzeczywistego można rozważyć buforowanie / ponowne użycie opakowań Java.
GraphX i Spark DataSets
Na razie (Spark 1.6 2.1) żaden z nich nie zapewnia PySpark API, więc można powiedzieć, że PySpark jest nieskończenie gorszy niż Scala.
GraphX
W praktyce rozwój GraphX zatrzymał się prawie całkowicie, a projekt jest obecnie w trybie konserwacji, a powiązane zgłoszenia JIRA są zamknięte, ponieważ nie zostaną naprawione . Biblioteka GraphFrames udostępnia alternatywną bibliotekę przetwarzania wykresów z powiązaniami języka Python.
Zestaw danych
Mówiąc subiektywnie, Datasets
w Pythonie nie ma zbyt wiele miejsca na statyczne wpisywanie danych, a nawet jeśli obecna implementacja Scali jest zbyt uproszczona i nie zapewnia takich samych korzyści wydajnościowych jak DataFrame
.
Streaming
Z tego, co do tej pory widziałem, zdecydowanie polecam używanie Scali zamiast Pythona. Może się to zmienić w przyszłości, jeśli PySpark otrzyma wsparcie dla strumieni ustrukturyzowanych, ale obecnie API Scala wydaje się być znacznie bardziej niezawodne, wszechstronne i wydajne. Moje doświadczenie jest dość ograniczone.
Strukturalne przesyłanie strumieniowe w Spark 2.x wydaje się zmniejszać przepaść między językami, ale na razie jest jeszcze w początkowej fazie. Niemniej jednak API oparte na RDD jest już określane jako „starsze przesyłanie strumieniowe” w dokumentacji Databricks (data dostępu 2017-03-03), więc rozsądne jest oczekiwanie dalszych wysiłków na rzecz unifikacji.
Zagadnienia dotyczące braku wydajności
Parzystość funkcji
Nie wszystkie funkcje platformy Spark są udostępniane za pośrednictwem interfejsu PySpark API. Sprawdź, czy potrzebne części są już zaimplementowane i spróbuj zrozumieć możliwe ograniczenia.
Jest to szczególnie ważne, gdy używasz MLlib i podobnych kontekstów mieszanych (zobacz Wywoływanie funkcji Java / Scala z zadania ). Aby być uczciwym, niektóre części API PySpark, na przykład mllib.linalg
, zapewniają bardziej wszechstronny zestaw metod niż Scala.
Projekt API
PySpark API ściśle odzwierciedla jego odpowiednik w Scali i jako taki nie jest dokładnie Pythonic. Oznacza to, że mapowanie między językami jest dość łatwe, ale jednocześnie kod Pythona może być znacznie trudniejszy do zrozumienia.
Złożona architektura
Przepływ danych w PySpark jest stosunkowo złożony w porównaniu z czystym wykonaniem JVM. O wiele trudniej jest uzasadnić programy PySpark lub debugować. Co więcej, przynajmniej podstawowa znajomość Scali i JVM w ogóle jest koniecznością.
Spark 2.x i nowszy
Ciągłe przejście w kierunku Dataset
API, z zamrożonym API RDD, przynosi zarówno możliwości, jak i wyzwania dla użytkowników Pythona. Podczas gdy wysokopoziomowe części API są znacznie łatwiejsze do ujawnienia w Pythonie, bardziej zaawansowane funkcje są prawie niemożliwe do bezpośredniego użycia .
Ponadto natywne funkcje Pythona nadal są obywatelami drugiej kategorii w świecie SQL. Miejmy nadzieję, że poprawi się to w przyszłości dzięki serializacji Apache Arrow ( obecne wysiłki dotyczą danych,collection
ale serde UDF jest celem długoterminowym ).
W przypadku projektów silnie zależnych od bazy kodu Pythona, alternatywne alternatywy w czystym Pythonie (takie jak Dask lub Ray ) mogą być interesującą alternatywą.
Nie musi to być jedno kontra drugie
Interfejs API Spark DataFrame (SQL, Dataset) zapewnia elegancki sposób integracji kodu Scala / Java w aplikacji PySpark. Można użyć DataFrames
do udostępnienia danych natywnemu kodowi maszyny JVM i odczytania wyników. Wyjaśniłem niektóre opcje gdzie indziej , a działający przykład pętli Python-Scala znajdziesz w artykule Jak używać klasy Scala w Pyspark .
Można go dodatkowo rozszerzyć, wprowadzając typy zdefiniowane przez użytkownika (zobacz Jak zdefiniować schemat dla typu niestandardowego w Spark SQL? ).
Co jest nie tak z kodem podanym w pytaniu
(Zastrzeżenie: punkt widzenia Pythonisty. Najprawdopodobniej przegapiłem kilka sztuczek Scala)
Po pierwsze, w Twoim kodzie jest jedna część, która w ogóle nie ma sensu. Jeśli masz już (key, value)
pary utworzone za pomocą zipWithIndex
lub enumerate
jaki jest sens w tworzeniu łańcucha tylko po to, aby go potem podzielić? flatMap
nie działa rekurencyjnie, więc możesz po prostu utworzyć krotki i pominąć map
jakiekolwiek śledzenie .
Inną częścią, która wydaje mi się problematyczna, jest reduceByKey
. Ogólnie rzecz biorąc, reduceByKey
jest przydatne, jeśli zastosowanie funkcji agregującej może zmniejszyć ilość danych, które muszą być przetasowane. Ponieważ po prostu łączysz łańcuchy, nie ma tu nic do zyskania. Ignorując rzeczy niskiego poziomu, takie jak liczba odniesień, ilość danych, które musisz przesłać, jest dokładnie taka sama jak w przypadku groupByKey
.
Normalnie nie rozwodziłbym się nad tym, ale o ile wiem, jest to wąskie gardło w twoim kodzie Scala. Łączenie ciągów znaków w JVM jest dość kosztowną operacją (zobacz na przykład: Czy konkatenacja ciągów w scali jest tak kosztowna, jak w Javie? ). Oznacza to, że coś takiego, _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
co jest odpowiednikiem tego input4.reduceByKey(valsConcat)
w kodzie, nie jest dobrym pomysłem.
Jeśli chcesz tego uniknąć groupByKey
, możesz spróbować użyć aggregateByKey
z StringBuilder
. Coś podobnego do tego powinno załatwić sprawę:
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
ale wątpię, czy jest to warte całego zamieszania.
Mając na uwadze powyższe, przepisałem Twój kod w następujący sposób:
Scala :
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Python :
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
Wyniki
W local[6]
trybie (procesor Intel (R) Xeon (R) E3-1245 V2 @ 3,40 GHz) z 4 GB pamięci na moduł wykonawczy (n = 3):
- Scala - średnia: 250,00s, odch.stod .: 12.49
- Python - średnia: 246,66s, stdev: 1,15
Jestem prawie pewien, że większość tego czasu spędza na tasowaniu, serializowaniu, deserializacji i innych zadaniach drugorzędnych. Dla zabawy, oto naiwny jednowątkowy kod w Pythonie, który wykonuje to samo zadanie na tej maszynie w mniej niż minutę:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])