Spark - repartition () vs coalesce ()


254

Według Learning Spark

Pamiętaj, że podział danych na partycje jest dość kosztowną operacją. Spark ma również zoptymalizowaną wersję repartition()wywołania, coalesce()która pozwala uniknąć przenoszenia danych, ale tylko wtedy, gdy zmniejsza się liczbę partycji RDD.

Jedną różnicą, którą dostaję, jest to, że wraz repartition()z liczbą partycji można zwiększać / zmniejszać, ale z coalesce()liczbą partycji można tylko zmniejszać.

Jeśli partycje są rozproszone na wielu komputerach i coalesce()są uruchomione, w jaki sposób można uniknąć przenoszenia danych?

Odpowiedzi:


354

Pozwala to uniknąć pełnego losowania. Jeśli wiadomo, że liczba maleje, moduł wykonujący może bezpiecznie przechowywać dane na minimalnej liczbie partycji, przenosząc dane tylko z dodatkowych węzłów na węzły, które trzymaliśmy.

Więc pójdzie coś takiego:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Następnie coalesceprzejdź do 2 partycji:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Zauważ, że Węzeł 1 i Węzeł 3 nie wymagały przeniesienia oryginalnych danych.


115
Dzięki za odpowiedzi. Dokumentacja powinna lepiej powiedzieć minimize data movementzamiast avoiding data movement.
Praveen Sripati

12
Czy jest jakikolwiek przypadek, kiedy repartitionnależy użyć zamiast coalesce?
Niemand,

21
@Niemand Myślę, że obecna dokumentacja obejmuje to całkiem dobrze: github.com/apache/spark/blob/... Należy pamiętać, że wszystko, repartitionco trzeba , to wywołać coalescez shuffleparametrem ustawionym na true. Jeśli to pomoże, to daj mi znać.
Justin Pihony,

2
Czy można zmniejszyć liczbę istniejących plików partycji? Nie mam hdfs, ale mam problem z wieloma plikami.

2
podział będzie statystycznie wolniejszy, ponieważ nie wie, że się kurczy ... chociaż może mogliby to zoptymalizować. Wewnętrznie po prostu nazywa połączenie z shuffle = trueflagą
Justin Pihony

172

Odpowiedź Justina jest niesamowita i ta odpowiedź jest głębsza.

repartitionAlgorytm robi pełną shuffle i tworzy nowe partycje z danymi, które jest rozprowadzane równomiernie. Utwórzmy ramkę danych o liczbach od 1 do 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf zawiera 4 partycje na moim komputerze.

numbersDf.rdd.partitions.size // => 4

Oto jak dane są dzielone na partycje:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Zróbmy dokładnie losowanie z tą repartitionmetodą i uzyskaj te dane w dwóch węzłach.

val numbersDfR = numbersDf.repartition(2)

Oto sposób numbersDfRpartycjonowania danych na moim komputerze:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartitionMetoda sprawia, że nowe partycje i równomiernie rozprowadza dane w nowych partycji (rozkład danych jest więcej, nawet w przypadku większych zbiorów danych).

Różnica między coalesceirepartition

coalescewykorzystuje istniejące partycje, aby zminimalizować ilość danych, które są przetasowane. repartitiontworzy nowe partycje i wykonuje pełne losowanie. coalesceskutkuje partycjami o różnych ilościach danych (czasami partycjami, które mają wiele różnych rozmiarów) i repartitionskutkuje mniej więcej jednakowymi rozmiarami partycji.

Jest coalescealbo repartitionszybciej?

coalescemoże działać szybciej niż repartition, ale partycje o nierównej wielkości są zazwyczaj wolniejsze w pracy niż partycje o równej wielkości. Zwykle będziesz musiał ponownie podzielić zestawy danych po przefiltrowaniu dużego zestawu danych. Odkryłem, że repartitionogólnie jest szybszy, ponieważ Spark jest zbudowany do pracy z partycjami równej wielkości.

Uwaga: Z ciekawością zauważyłem, że podział może zwiększyć rozmiar danych na dysku . Pamiętaj, aby uruchamiać testy, gdy używasz partycji / łączenia na dużych zestawach danych.

Przeczytaj ten post na blogu, jeśli chcesz uzyskać więcej informacji.

Kiedy w praktyce użyjesz łączenia i podziału


8
Świetna odpowiedź @Powers, ale czy dane w partycji A i B nie są wypaczone? Jak jest równomiernie rozprowadzany?
anwartheravian

Ponadto, jaki jest najlepszy sposób na uzyskanie rozmiaru partycji bez błędu OOM. Używam, rdd.glom().map(len).collect()ale daje dużo błędów OOM.
anwartheravian

8
@anwartheravian - Partycja A i partycja B mają różne rozmiary, ponieważ repartitionalgorytm nie rozdziela danych w równym stopniu dla bardzo małych zestawów danych. Kiedyś repartitionorganizowałem 5 milionów rekordów na 13 partycjach, a każdy plik miał od 89,3 MB do 89,6 MB - to całkiem równe!
Powers

1
@Power ten wygląd lepiej odpowiedź ze szczegółami.
Zielony,

1
To wyjaśnia różnicę znacznie lepiej. Dzięki!
Abhi

22

Należy tutaj dodać, że podstawową zasadą Spark RDD jest niezmienność. Podział lub łączenie utworzy nowy RDD. Podstawowy RDD będzie nadal istniał z pierwotną liczbą partycji. Jeśli przypadek użycia wymaga utrzymania RDD w pamięci podręcznej, to samo należy zrobić dla nowo utworzonego RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

niezłe! jest to krytyczne i przynajmniej dla tego doświadczonego programisty scala, nie oczywiste - tj. ani podział, ani koalescencja próby modyfikacji danych, tylko sposób ich dystrybucji między węzłami
doug

1
@Harikrishnan, więc jeśli dobrze zrozumiałem inne odpowiedzi, to zgodnie z nimi w przypadku koalesce Spark wykorzystuje istniejące partycje, ale ponieważ RDD jest niezmienne, czy możesz opisać, w jaki sposób Coalesce korzysta z istniejących partycji? Zgodnie z moim zrozumieniem pomyślałem, że Spark łączy nowe partycje z istniejącymi partycjami.
Explorer

Ale jeśli „stary” RDD nie jest już używany, jak wiadomo na wykresie wykonania, zostanie usunięty z pamięci, jeśli nie zostanie utrwalony, prawda?
Markus

15

repartition - zaleca się korzystanie z niego podczas zwiększania liczby partycji, ponieważ wiąże się to z tasowaniem wszystkich danych.

coalesce- zaleca się korzystanie z niego przy jednoczesnym zmniejszeniu liczby partycji. Na przykład, jeśli masz 3 partycje i chcesz je zmniejszyć do 2, coalesceprzeniesie dane trzeciej partycji do partycji 1 i 2. Partycja 1 i 2 pozostaną w tym samym kontenerze. Z drugiej strony repartitionbędzie tasować dane we wszystkich partycjach, dlatego użycie sieci między programami wykonawczymi będzie wysokie i wpłynie to na wydajność.

coalescedziała lepiej niż repartitionprzy zmniejszeniu liczby partycji.


Przydatne wyjaśnienie.
Narendra Maru,

11

Z kodu i dokumentacji kodu wynika, że coalesce(n)jest on taki sam jak coalesce(n, shuffle = false)i repartition(n)jest taki sam jakcoalesce(n, shuffle = true)

Tak więc, zarówno coalescei repartitionmogą być wykorzystane do zwiększenia liczby partycji

Za pomocą shuffle = truemożesz faktycznie połączyć się z większą liczbą partycji. Jest to przydatne, jeśli masz małą liczbę partycji, powiedzmy 100, potencjalnie z kilkoma nienormalnie dużymi partycjami.

Inną ważną uwagą do podkreślenia jest to, że jeśli drastycznie zmniejszysz liczbę partycji, powinieneś rozważyć użycie wersji losowejcoalesce (tak samo jak repartitionw tym przypadku). Umożliwi to wykonywanie obliczeń równolegle na partycjach nadrzędnych (wiele zadań).

Jeśli jednak wykonujesz drastyczne połączenie, na przykład numPartitions = 1, może to spowodować, że twoje obliczenia będą odbywać się na mniejszej liczbie węzłów niż chcesz (np. Jeden węzeł w przypadku numPartitions = 1). Aby tego uniknąć, możesz przejść shuffle = true. Spowoduje to dodanie kroku losowania, ale oznacza, że ​​bieżące górne partycje będą wykonywane równolegle (niezależnie od bieżącego partycjonowania).

Proszę również odnieść się do powiązanej odpowiedzi tutaj


10

Wszystkie odpowiedzi dodają świetnej wiedzy do tego często zadawanego pytania.

Zgodnie z tradycją osi czasu tego pytania, oto moje 2 centy.

Przekonałem się, że podział jest szybszy niż łączenie , w bardzo szczególnym przypadku.

W mojej aplikacji, gdy szacowana liczba plików jest niższa niż określony próg, podział na partycje działa szybciej.

Oto co mam na myśli

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

W powyższym fragmencie, jeśli moje pliki miały mniej niż 20, łączenie trwało wieczność, a podział był znacznie szybszy, więc powyższy kod.

Oczywiście ta liczba (20) będzie zależeć od liczby pracowników i ilości danych.

Mam nadzieję, że to pomaga.


6

Podział : Przetasuj dane w NOWĄ liczbę partycji.

Na przykład. Początkowa ramka danych jest podzielona na 200 partycji.

df.repartition(500): Dane zostaną przetasowane z 200 partycji do nowych 500 partycji.

Koalescencja : Przetasuj dane do istniejącej liczby partycji.

df.coalesce(5): Dane zostaną przetasowane z pozostałych 195 partycji do 5 istniejących partycji.


4

Chciałbym dodać do odpowiedzi Justina i Mocy, że -

repartitionzignoruje istniejące partycje i utworzy nowe. Możesz więc użyć go do naprawy przesunięcia danych. Możesz wymienić klucze partycji, aby zdefiniować dystrybucję. Skośność danych jest jednym z największych problemów w przestrzeni problemów „dużych zbiorów danych”.

coalescebędzie działać z istniejącymi partycjami i przetasować ich podzbiór. Nie może naprawić przesunięcia danych tak bardzo jak repartitionrobi. Dlatego nawet jeśli jest tańszy, może nie być tym, czego potrzebujesz.


3

Do wszystkich wspaniałych odpowiedzi, które chciałbym dodać, repartitionjest to jedna z najlepszych opcji skorzystania z równoległości danych. Chociaż coalescedaje tanią opcję zmniejszenia partycji i jest bardzo przydatny podczas zapisywania danych na HDFS lub innym zlewie, aby skorzystać z dużych zapisów.

Uznałem to za przydatne przy zapisywaniu danych w formacie parkietu, aby uzyskać pełną przewagę.


2

Dla kogoś, kto miał problemy z wygenerowaniem pojedynczego pliku csv z PySpark (AWS EMR) jako wyjściem i zapisaniem go na s3, pomógł podział partycji. Powodem jest to, że łączenie nie może wykonać pełnego przetasowania, ale podział może. Zasadniczo można zwiększać lub zmniejszać liczbę partycji za pomocą podziału, ale można tylko zmniejszyć liczbę partycji (ale nie 1) za pomocą koalescencji. Oto kod dla każdego, kto próbuje napisać csv z AWS EMR do s3:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')

0

W prosty sposób COALESCE: - służy tylko do zmniejszenia liczby partycji, bez przetasowania danych po prostu kompresuje partycje

ZMIANA: - służy zarówno do zwiększania, jak i zmniejszania liczby partycji, ale tasowanie ma miejsce

Przykład:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

Oba działają dobrze

Ale idziemy ogólnie o te dwie rzeczy, gdy potrzebujemy zobaczyć dane wyjściowe w jednym klastrze, idziemy z tym.


9
Nastąpi również przepływ danych z Coalese.
sun_dare

0

Ale także powinieneś się upewnić, że dane, które nadchodzą, powinny być wysoce skonfigurowane, jeśli masz do czynienia z dużymi danymi. Ponieważ wszystkie dane zostaną załadowane do tych węzłów, może to prowadzić do wyjątku pamięci. Chociaż naprawa jest kosztowna, wolę z niej korzystać. Ponieważ tasuje i równo rozprowadza dane.

Mądrze jest wybierać między łączeniem i dzieleniem.


0

repartitionAlgorytm robi pełną Shuffle danych i tworzy równe wielkości partycji danych.coalescełączy istniejące partycje, aby uniknąć pełnego losowania.

Program Coalesce działa dobrze w przypadku pobierania RDD z dużą liczbą partycji i łączenia partycji w pojedynczym węźle roboczym w celu uzyskania ostatecznej RDD z mniejszą liczbą partycji.

Repartitionprzetasuje dane w RDD, aby uzyskać ostateczną liczbę żądanych partycji. Partycjonowanie DataFrames wydaje się szczegółem implementacji niskiego poziomu, którym powinien zarządzać framework, ale nim nie jest. Filtrując duże ramki danych na mniejsze, prawie zawsze powinieneś podzielić dane na partycje. Prawdopodobnie będziesz często filtrował duże ramki danych na mniejsze, więc przyzwyczaj się do partycjonowania.

Przeczytaj ten post na blogu, jeśli chcesz uzyskać więcej informacji.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.