Spark, optymalnie dzieląc jeden RDD na dwa


10

Mam duży zestaw danych, który muszę podzielić na grupy zgodnie z określonymi parametrami. Chcę, aby zadanie przebiegło tak wydajnie, jak to możliwe. Mogę sobie wyobrazić dwa sposoby

Opcja 1 - Utwórz mapę z oryginalnego RDD i filtra

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Opcja 2 - Filtruj bezpośrednio oryginalny RDD

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Metoda pięści musi powtarzać wszystkie rekordy oryginalnego zestawu danych 3 razy, przy czym drugi musi to zrobić tylko dwa razy, jednak w normalnych okolicznościach iskra robi pewne zakulisowe tworzenie wykresu, więc mogłem sobie wyobrazić, że są skutecznie zrobione w ten sam sposób. Moje pytania są następujące: a.) Czy jedna metoda jest bardziej wydajna od drugiej, czy też budowanie wykresu iskierkowego czyni je równoważnymi b.) Czy można to zrobić w jednym przebiegu


Znalazłem też siebie z bardzo podobnym problemem i tak naprawdę nie znalazłem rozwiązania. Ale to, co faktycznie się dzieje, nie jest jasne z tego kodu, ponieważ iskra ma „leniwą ocenę” i podobno jest w stanie wykonać tylko to, co naprawdę musi wykonać, a także połączyć mapy, filtry i wszystko, co można zrobić razem. Być może to, co opisujesz, może się zdarzyć w jednym przejściu. Jednak nie znam wystarczająco leniwych mechanizmów oceny. Właśnie zauważyłem .cache (). Może jest sposób na zrobienie tylko jednego .cache () i uzyskanie pełnych wyników?
user3780968

Odpowiedzi:


9

Przede wszystkim powiem ci, że nie jestem ekspertem od Spark; Używam go dość często w ciągu ostatnich kilku miesięcy i myślę, że teraz to rozumiem, ale mogę się mylić.

Odpowiadając na pytania:

a.) są równoważne, ale nie w taki sposób, w jaki je widzisz; Jeśli zastanawiasz się, Spark nie zoptymalizuje wykresu, ale customMapperw obu przypadkach nadal będzie wykonywany dwukrotnie; wynika to z faktu, że w przypadku iskry, rdd1i rdd2są to dwa całkowicie różne RDD, i zbuduje wykres transformacji oddolny, zaczynając od liści; więc opcja 1 przełoży się na:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Jak powiedziałeś, customMapperjest wykonywany dwa razy (ponadto rddInbędzie również czytany dwukrotnie, co oznacza, że ​​jeśli pochodzi z bazy danych, może być jeszcze wolniejszy).

b.) istnieje sposób, musisz po prostu przenieść się cache()w odpowiednie miejsce:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Robiąc to, mówimy iskrze, że może przechowywać częściowe wyniki mappedRdd; użyje tych częściowych wyników zarówno dla, jak rdd1i dla rdd2. Z iskrowego punktu widzenia jest to równoważne z:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.