Chochlik. WSKAZÓWKA :
Ilekroć masz inicjalizację ciężką, którą należy wykonać raz dla wielu RDDelementów, a nie raz na RDDelement, i jeśli tej inicjalizacji, takiej jak tworzenie obiektów z biblioteki innej firmy, nie można serializować (tak, aby Spark mógł przesyłać ją przez klaster do węzły robocze), użyj mapPartitions()zamiast
map(). mapPartitions()przewiduje, że inicjalizacja jest wykonywana raz na zadanie robocze / wątek / partycję zamiast RDDna przykład raz dla każdego elementu danych : patrz poniżej.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList
connection.close()
newPartition.iterator
})
Q2. nie flatMapzachowują się jak mapy czy jak mapPartitions?
Tak. proszę zobaczyć przykład 2 z flatmap... nie wymaga wyjaśnień.
Q1. Jaka jest różnica między RDD mapimapPartitions
mapdziała z funkcją używaną na poziomie elementu, podczas gdy
mapPartitionswykonuje funkcję na poziomie podziału.
Przykładowy scenariusz : jeśli mamy 100K elementów w określonejRDDpartycji, to uruchomimy funkcję używaną przez transformację odwzorowania 100K razy, gdy używamymap.
I odwrotnie, jeśli użyjemy, mapPartitionswtedy wywołamy określoną funkcję tylko raz, ale przekażemy wszystkie 100K rekordów i odzyskamy wszystkie odpowiedzi w jednym wywołaniu funkcji.
Nastąpi wzrost wydajności, ponieważ mapwiele razy działa na określonej funkcji, zwłaszcza jeśli za każdym razem funkcja robi coś kosztownego, czego nie musiałaby robić, gdybyśmy przekazali wszystkie elementy naraz (w przypadku mappartitions).
mapa
Stosuje funkcję transformacji do każdego elementu RDD i zwraca wynik jako nowy RDD.
Warianty aukcji
def map [U: ClassTag] (f: T => U): RDD [U]
Przykład:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
Jest to wyspecjalizowana mapa wywoływana tylko raz dla każdej partycji. Cała zawartość odpowiednich partycji jest dostępna jako sekwencyjny strumień wartości za pośrednictwem argumentu wejściowego (Iterarator [T]). Funkcja niestandardowa musi zwracać kolejny Iterator [U]. Połączone iteratory wyników są automatycznie konwertowane na nowy RDD. Należy zauważyć, że w następującym wyniku brakuje krotek (3,4) i (6,7) z powodu wybranego przez nas podziału.
preservesPartitioningwskazuje, czy funkcja wejściowa zachowuje partycjoner, co powinno być, falsechyba że jest to para RDD, a funkcja wejściowa nie modyfikuje klawiszy.
Warianty aukcji
def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], zachowujePartycjonowanie: Boolean = false): RDD [U]
Przykład 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Przykład 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
Powyższy program można również napisać przy użyciu flatMap w następujący sposób.
Przykład 2 z użyciem flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Wniosek:
mapPartitionstransformacja jest szybsza niż mapponieważ wywołuje twoją funkcję raz / partycję, a nie raz / element.
Dalsza lektura: foreach Vs foreachPartments Kiedy używać Co?