Dlaczego Scala i frameworki takie jak Spark i Scalding mają zarówno reducei foldLeft? Więc jaka jest różnica między reducei fold?
Dlaczego Scala i frameworki takie jak Spark i Scalding mają zarówno reducei foldLeft? Więc jaka jest różnica między reducei fold?
Odpowiedzi:
Dużą różnicą, o której nie wspomniano w żadnej innej odpowiedzi dotyczącej przepełnienia stosu odnoszącej się wyraźnie do tego tematu, jest to, że reducenależy nadać monoid przemienny , tj. Operację, która jest zarówno przemienna, jak i asocjacyjna. Oznacza to, że operacja może być zrównoleglona.
To rozróżnienie jest bardzo ważne w przypadku Big Data / MPP / przetwarzania rozproszonego i całego powodu, dla którego w reduceogóle istnieje. Zbiór może zostać posiekany i reducemoże działać na każdym kawałku, a następnie reducemoże operować na wynikach każdego kawałka - w rzeczywistości poziom fragmentacji nie musi zatrzymywać się o jeden poziom głęboko. Moglibyśmy też posiekać każdy kawałek. Dlatego sumowanie liczb całkowitych na liście to O (log N), jeśli dana jest nieskończona liczba procesorów.
Jeśli spojrzysz tylko na podpisy, nie ma powodu reducedo istnienia, ponieważ możesz osiągnąć wszystko, co możesz, korzystając reducez pliku foldLeft. Funkcjonalność foldLeftjest większa niż funkcjonalność reduce.
Ale nie możesz zrównoleglać a foldLeft, więc jego czas wykonania jest zawsze O (N) (nawet jeśli karmisz w przemiennym monoidzie). Dzieje się tak, ponieważ zakłada się, że operacja nie jest przemiennym monoidem, a zatem skumulowana wartość zostanie obliczona przez serię sekwencyjnych agregacji.
foldLeftnie zakłada przemienności ani skojarzenia. To asocjatywność daje możliwość dzielenia kolekcji, a przemienność sprawia, że kumulacja jest łatwa, ponieważ porządek nie jest ważny (więc nie ma znaczenia, w jakiej kolejności agregować każdy z wyników z każdego fragmentu). Ściśle mówiąc, przemienność nie jest konieczna do zrównoleglenia, na przykład algorytmów sortowania rozproszonego, po prostu ułatwia logikę, ponieważ nie musisz nadawać porcjom kolejności.
Jeśli spojrzysz na dokumentację Spark dotyczącą reducetego, konkretnie mówi „... przemienny i asocjacyjny operator binarny”
http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD
Oto dowód, że reduceNIE jest to tylko specjalny przypadekfoldLeft
scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par
scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds
scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds
Teraz jest trochę bliżej korzeni FP / matematycznych i trochę trudniej wyjaśnić. Reduce jest formalnie zdefiniowane jako część paradygmatu MapReduce, który zajmuje się kolekcjami bez porządku (multisets), Fold jest formalnie zdefiniowany w kategoriach rekursji (patrz katamorfizm), a tym samym przyjmuje strukturę / sekwencję dla kolekcji.
W foldScalding nie ma metody, ponieważ w (ścisłym) modelu programowania Map Reduce nie możemy zdefiniować, foldponieważ fragmenty nie mają kolejności i foldwymagają tylko asocjatywności, a nie przemienności.
Mówiąc prościej, reducedziała bez kolejności kumulacji, foldwymaga kolejności kumulacji i to właśnie ta kolejność kumulacji wymaga wartości zerowej, a NIE istnienie wartości zerowej, która je odróżnia. Ściśle mówiąc, reduce powinno działać na pustej kolekcji, ponieważ jej wartość zerową można wywnioskować, biorąc dowolną wartość, xa następnie rozwiązując x op y = x, ale to nie działa w przypadku operacji nieprzemiennej, ponieważ może istnieć odrębna wartość zerowa lewej i prawej strony (tj x op y != y op x.). Oczywiście Scala nie zadaje sobie trudu, aby dowiedzieć się, jaka jest ta wartość zerowa, ponieważ wymagałoby to zrobienia jakiejś matematyki (która prawdopodobnie jest nieobliczalna), więc po prostu zgłasza wyjątek.
Wydaje się (jak to często bywa w etymologii), że to pierwotne znaczenie matematyczne zostało utracone, ponieważ jedyną oczywistą różnicą w programowaniu jest podpis. W rezultacie reducestał się synonimem foldzamiast zachowywać jego oryginalne znaczenie z MapReduce. Teraz te terminy są często używane zamiennie i zachowują się tak samo w większości implementacji (ignorując puste kolekcje). Dziwność jest potęgowana przez osobliwości, jak w Spark, którymi teraz zajmiemy się.
Więc Spark nie mają fold, ale kolejność, w jakiej wyniki cząstkowe (po jednym dla każdej partycji) są połączone (w momencie pisania) jest taka sama kolejność, w której zadania są zakończone - a więc zakaz deterministyczny. Dzięki @CafeFeed za wskazanie foldzastosowań runJob, które po przeczytaniu kodu zdałem sobie sprawę, że jest to niedeterministyczne. Dalsze zamieszanie jest spowodowane tym, że Spark ma treeReduceale nie treeFold.
Istnieje różnica między, reducea foldnawet po zastosowaniu do niepustych sekwencji. Ten pierwszy jest zdefiniowany jako część paradygmatu programowania MapReduce na kolekcjach o dowolnej kolejności ( http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf ) i należy założyć, że oprócz tego, że operatory są przemienne, asocjacyjne, aby dać deterministyczne wyniki. Ta ostatnia jest zdefiniowana w kategoriach katomorfizmów i wymaga, aby zbiory miały pojęcie sekwencji (lub były definiowane rekurencyjnie, jak listy połączone), a zatem nie wymagają operatorów przemiennych.
W praktyce ze względu na nie matematyczną naturę programowania reducei foldtendencję do zachowywania się w ten sam sposób, albo poprawnie (jak w Scali), albo niepoprawnie (jak w Spark).
Moim zdaniem można by uniknąć zamieszania, gdyby użycie tego terminu foldzostało całkowicie porzucone w Spark. Przynajmniej Spark ma notatkę w swojej dokumentacji:
Zachowuje się to nieco inaczej niż operacje zwijania implementowane dla kolekcji nierozproszonych w językach funkcjonalnych, takich jak Scala.
foldLeftzawiera Leftw nazwie i dlaczego istnieje również metoda o nazwie fold.
.par, za (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _)każdym razem otrzymuję inne wyniki.
reallyFoldalfonsa choć, jak: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f), to nie musiałby f dojazdy.
Jeśli się nie mylę, mimo że Spark API tego nie wymaga, fold wymaga również, aby f było przemienne. Ponieważ kolejność, w jakiej partycje będą agregowane, nie jest gwarantowana. Na przykład w poniższym kodzie sortowany jest tylko pierwszy wydruk:
import org.apache.spark.{SparkConf, SparkContext}
object FoldExample extends App{
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Simple Application")
implicit val sc = new SparkContext(conf)
val range = ('a' to 'z').map(_.toString)
val rdd = sc.parallelize(range)
println(range.reduce(_ + _))
println(rdd.reduce(_ + _))
println(rdd.fold("")(_ + _))
}
Wydruk:
ABCDEFGHIJKLMNOPQRSTU VWXYZ
abcghituvjklmwxyzqrsdefnop
defghinopjklmqrstuvabcwxyz
sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _)kilka razy uruchomisz z 2+ rdzeniami, myślę, że zobaczysz, że generuje to losową kolejność (według partycji). Odpowiednio zaktualizowałem moją odpowiedź.
foldw Apache Spark nie jest tym samym, co foldw kolekcjach nierozproszonych. W rzeczywistości wymaga funkcji przemiennej, aby uzyskać deterministyczne wyniki:
Zachowuje się to nieco inaczej niż operacje zwijania implementowane dla kolekcji nierozproszonych w językach funkcjonalnych, takich jak Scala. Tę operację zawinięcia można zastosować do przegród indywidualnie, a następnie złożyć te wyniki w wynik końcowy, zamiast stosować zawinięcie do każdego elementu sekwencyjnie w określonej kolejności. W przypadku funkcji, które nie są przemienne, wynik może różnić się od zagięcia zastosowanego do kolekcji nierozdzielonej.
To zostało pokazane przez Mishael Rosenthal i sugeruje Make42 w swoim komentarzu .
Sugerowano, że obserwowane zachowanie jest związane z tym, HashPartitionerkiedy w rzeczywistości parallelizenie tasuje i nie używa HashPartitioner.
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
Wyjaśnione:
Strukturafold dla RDD
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
jest taka sama jak strukturareduce dla RDD:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
gdzie runJobjest wykonywany z pominięciem kolejności podziału i wymaga funkcji przemiennej.
foldPartitioni reducePartitionsą równoważne pod względem kolejności przetwarzania i skutecznie (przez dziedziczenie i delegowanie) wdrażane przez reduceLefti w foldLeftdniu TraversableOnce.
Wniosek: foldna RDD nie może zależeć od kolejności fragmentów i wymaga przemienności i asocjatywności .
foldna RDDs rzeczywiście jest naprawdę tak samo jak reduce, ale ten nie przestrzega korzeniowych różnice matematyczne (zaktualizowałem moją odpowiedź będzie jeszcze bardziej jasne). Chociaż nie zgadzam się, że naprawdę potrzebujemy przemienności, pod warunkiem, że jest się pewnym, że cokolwiek robi ich partjoner, zachowuje porządek.
runJobkodu widzę, że rzeczywiście dokonuje on łączenia zgodnie z zakończeniem zadania, a NIE kolejnością partycji. To ten kluczowy szczegół sprawia, że wszystko się układa. Ponownie zredagowałem moją odpowiedź i poprawiłem w ten sposób błąd, który wskazałeś. Czy mógłbyś usunąć swoją nagrodę, skoro jesteśmy teraz w porozumieniu?
Inną różnicą w Scalding jest użycie łączników w Hadoop.
Wyobraź sobie, że twoja operacja jest przemiennym monoidem, z redukcją zostanie zastosowana po stronie mapy również zamiast tasowania / sortowania wszystkich danych do reduktorów. W przypadku foldLeft tak nie jest.
pipe.groupBy('product) {
_.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
// reduce is .mapReduceMap in disguise
}
pipe.groupBy('product) {
_.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}
Zawsze dobrze jest zdefiniować swoje operacje jako monoidalne w Scalding.