Dlaczego Scala i frameworki takie jak Spark i Scalding mają zarówno reduce
i foldLeft
? Więc jaka jest różnica między reduce
i fold
?
Dlaczego Scala i frameworki takie jak Spark i Scalding mają zarówno reduce
i foldLeft
? Więc jaka jest różnica między reduce
i fold
?
Odpowiedzi:
Dużą różnicą, o której nie wspomniano w żadnej innej odpowiedzi dotyczącej przepełnienia stosu odnoszącej się wyraźnie do tego tematu, jest to, że reduce
należy nadać monoid przemienny , tj. Operację, która jest zarówno przemienna, jak i asocjacyjna. Oznacza to, że operacja może być zrównoleglona.
To rozróżnienie jest bardzo ważne w przypadku Big Data / MPP / przetwarzania rozproszonego i całego powodu, dla którego w reduce
ogóle istnieje. Zbiór może zostać posiekany i reduce
może działać na każdym kawałku, a następnie reduce
może operować na wynikach każdego kawałka - w rzeczywistości poziom fragmentacji nie musi zatrzymywać się o jeden poziom głęboko. Moglibyśmy też posiekać każdy kawałek. Dlatego sumowanie liczb całkowitych na liście to O (log N), jeśli dana jest nieskończona liczba procesorów.
Jeśli spojrzysz tylko na podpisy, nie ma powodu reduce
do istnienia, ponieważ możesz osiągnąć wszystko, co możesz, korzystając reduce
z pliku foldLeft
. Funkcjonalność foldLeft
jest większa niż funkcjonalność reduce
.
Ale nie możesz zrównoleglać a foldLeft
, więc jego czas wykonania jest zawsze O (N) (nawet jeśli karmisz w przemiennym monoidzie). Dzieje się tak, ponieważ zakłada się, że operacja nie jest przemiennym monoidem, a zatem skumulowana wartość zostanie obliczona przez serię sekwencyjnych agregacji.
foldLeft
nie zakłada przemienności ani skojarzenia. To asocjatywność daje możliwość dzielenia kolekcji, a przemienność sprawia, że kumulacja jest łatwa, ponieważ porządek nie jest ważny (więc nie ma znaczenia, w jakiej kolejności agregować każdy z wyników z każdego fragmentu). Ściśle mówiąc, przemienność nie jest konieczna do zrównoleglenia, na przykład algorytmów sortowania rozproszonego, po prostu ułatwia logikę, ponieważ nie musisz nadawać porcjom kolejności.
Jeśli spojrzysz na dokumentację Spark dotyczącą reduce
tego, konkretnie mówi „... przemienny i asocjacyjny operator binarny”
http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Oto dowód, że reduce
NIE jest to tylko specjalny przypadekfoldLeft
scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds
Teraz jest trochę bliżej korzeni FP / matematycznych i trochę trudniej wyjaśnić. Reduce jest formalnie zdefiniowane jako część paradygmatu MapReduce, który zajmuje się kolekcjami bez porządku (multisets), Fold jest formalnie zdefiniowany w kategoriach rekursji (patrz katamorfizm), a tym samym przyjmuje strukturę / sekwencję dla kolekcji.
W fold
Scalding nie ma metody, ponieważ w (ścisłym) modelu programowania Map Reduce nie możemy zdefiniować, fold
ponieważ fragmenty nie mają kolejności i fold
wymagają tylko asocjatywności, a nie przemienności.
Mówiąc prościej, reduce
działa bez kolejności kumulacji, fold
wymaga kolejności kumulacji i to właśnie ta kolejność kumulacji wymaga wartości zerowej, a NIE istnienie wartości zerowej, która je odróżnia. Ściśle mówiąc, reduce
powinno działać na pustej kolekcji, ponieważ jej wartość zerową można wywnioskować, biorąc dowolną wartość, x
a następnie rozwiązując x op y = x
, ale to nie działa w przypadku operacji nieprzemiennej, ponieważ może istnieć odrębna wartość zerowa lewej i prawej strony (tj x op y != y op x
.). Oczywiście Scala nie zadaje sobie trudu, aby dowiedzieć się, jaka jest ta wartość zerowa, ponieważ wymagałoby to zrobienia jakiejś matematyki (która prawdopodobnie jest nieobliczalna), więc po prostu zgłasza wyjątek.
Wydaje się (jak to często bywa w etymologii), że to pierwotne znaczenie matematyczne zostało utracone, ponieważ jedyną oczywistą różnicą w programowaniu jest podpis. W rezultacie reduce
stał się synonimem fold
zamiast zachowywać jego oryginalne znaczenie z MapReduce. Teraz te terminy są często używane zamiennie i zachowują się tak samo w większości implementacji (ignorując puste kolekcje). Dziwność jest potęgowana przez osobliwości, jak w Spark, którymi teraz zajmiemy się.
Więc Spark nie mają fold
, ale kolejność, w jakiej wyniki cząstkowe (po jednym dla każdej partycji) są połączone (w momencie pisania) jest taka sama kolejność, w której zadania są zakończone - a więc zakaz deterministyczny. Dzięki @CafeFeed za wskazanie fold
zastosowań runJob
, które po przeczytaniu kodu zdałem sobie sprawę, że jest to niedeterministyczne. Dalsze zamieszanie jest spowodowane tym, że Spark ma treeReduce
ale nie treeFold
.
Istnieje różnica między, reduce
a fold
nawet po zastosowaniu do niepustych sekwencji. Ten pierwszy jest zdefiniowany jako część paradygmatu programowania MapReduce na kolekcjach o dowolnej kolejności ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) i należy założyć, że oprócz tego, że operatory są przemienne, asocjacyjne, aby dać deterministyczne wyniki. Ta ostatnia jest zdefiniowana w kategoriach katomorfizmów i wymaga, aby zbiory miały pojęcie sekwencji (lub były definiowane rekurencyjnie, jak listy połączone), a zatem nie wymagają operatorów przemiennych.
W praktyce ze względu na nie matematyczną naturę programowania reduce
i fold
tendencję do zachowywania się w ten sam sposób, albo poprawnie (jak w Scali), albo niepoprawnie (jak w Spark).
Moim zdaniem można by uniknąć zamieszania, gdyby użycie tego terminu fold
zostało całkowicie porzucone w Spark. Przynajmniej Spark ma notatkę w swojej dokumentacji:
Zachowuje się to nieco inaczej niż operacje zwijania implementowane dla kolekcji nierozproszonych w językach funkcjonalnych, takich jak Scala.
foldLeft
zawiera Left
w nazwie i dlaczego istnieje również metoda o nazwie fold
.
.par
, za (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)
każdym razem otrzymuję inne wyniki.
reallyFold
alfonsa choć, jak: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f)
, to nie musiałby f dojazdy.
Jeśli się nie mylę, mimo że Spark API tego nie wymaga, fold wymaga również, aby f było przemienne. Ponieważ kolejność, w jakiej partycje będą agregowane, nie jest gwarantowana. Na przykład w poniższym kodzie sortowany jest tylko pierwszy wydruk:
import org.apache.spark.{SparkConf, SparkContext}
object FoldExample extends App{
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Simple Application")
implicit val sc = new SparkContext(conf)
val range = ('a' to 'z').map(_.toString)
val rdd = sc.parallelize(range)
println(range.reduce(_ + _))
println(rdd.reduce(_ + _))
println(rdd.fold("")(_ + _))
}
Wydruk:
ABCDEFGHIJKLMNOPQRSTU VWXYZ
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)
kilka razy uruchomisz z 2+ rdzeniami, myślę, że zobaczysz, że generuje to losową kolejność (według partycji). Odpowiednio zaktualizowałem moją odpowiedź.
fold
w Apache Spark nie jest tym samym, co fold
w kolekcjach nierozproszonych. W rzeczywistości wymaga funkcji przemiennej, aby uzyskać deterministyczne wyniki:
Zachowuje się to nieco inaczej niż operacje zwijania implementowane dla kolekcji nierozproszonych w językach funkcjonalnych, takich jak Scala. Tę operację zawinięcia można zastosować do przegród indywidualnie, a następnie złożyć te wyniki w wynik końcowy, zamiast stosować zawinięcie do każdego elementu sekwencyjnie w określonej kolejności. W przypadku funkcji, które nie są przemienne, wynik może różnić się od zagięcia zastosowanego do kolekcji nierozdzielonej.
To zostało pokazane przez Mishael Rosenthal i sugeruje Make42 w swoim komentarzu .
Sugerowano, że obserwowane zachowanie jest związane z tym, HashPartitioner
kiedy w rzeczywistości parallelize
nie tasuje i nie używa HashPartitioner
.
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
Wyjaśnione:
Strukturafold
dla RDD
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
jest taka sama jak strukturareduce
dla RDD:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
gdzie runJob
jest wykonywany z pominięciem kolejności podziału i wymaga funkcji przemiennej.
foldPartition
i reducePartition
są równoważne pod względem kolejności przetwarzania i skutecznie (przez dziedziczenie i delegowanie) wdrażane przez reduceLeft
i w foldLeft
dniu TraversableOnce
.
Wniosek: fold
na RDD nie może zależeć od kolejności fragmentów i wymaga przemienności i asocjatywności .
fold
na RDD
s rzeczywiście jest naprawdę tak samo jak reduce
, ale ten nie przestrzega korzeniowych różnice matematyczne (zaktualizowałem moją odpowiedź będzie jeszcze bardziej jasne). Chociaż nie zgadzam się, że naprawdę potrzebujemy przemienności, pod warunkiem, że jest się pewnym, że cokolwiek robi ich partjoner, zachowuje porządek.
runJob
kodu widzę, że rzeczywiście dokonuje on łączenia zgodnie z zakończeniem zadania, a NIE kolejnością partycji. To ten kluczowy szczegół sprawia, że wszystko się układa. Ponownie zredagowałem moją odpowiedź i poprawiłem w ten sposób błąd, który wskazałeś. Czy mógłbyś usunąć swoją nagrodę, skoro jesteśmy teraz w porozumieniu?
Inną różnicą w Scalding jest użycie łączników w Hadoop.
Wyobraź sobie, że twoja operacja jest przemiennym monoidem, z redukcją zostanie zastosowana po stronie mapy również zamiast tasowania / sortowania wszystkich danych do reduktorów. W przypadku foldLeft tak nie jest.
pipe.groupBy('product) {
_.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
// reduce is .mapReduceMap in disguise
}
pipe.groupBy('product) {
_.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}
Zawsze dobrze jest zdefiniować swoje operacje jako monoidalne w Scalding.