Chochlik. WSKAZÓWKA :
Ilekroć masz inicjalizację ciężką, którą należy wykonać raz dla wielu RDD
elementów, a nie raz na RDD
element, i jeśli tej inicjalizacji, takiej jak tworzenie obiektów z biblioteki innej firmy, nie można serializować (tak, aby Spark mógł przesyłać ją przez klaster do węzły robocze), użyj mapPartitions()
zamiast
map()
. mapPartitions()
przewiduje, że inicjalizacja jest wykonywana raz na zadanie robocze / wątek / partycję zamiast RDD
na przykład raz dla każdego elementu danych : patrz poniżej.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList
connection.close()
newPartition.iterator
})
Q2. nie flatMap
zachowują się jak mapy czy jak mapPartitions
?
Tak. proszę zobaczyć przykład 2 z flatmap
... nie wymaga wyjaśnień.
Q1. Jaka jest różnica między RDD map
imapPartitions
map
działa z funkcją używaną na poziomie elementu, podczas gdy
mapPartitions
wykonuje funkcję na poziomie podziału.
Przykładowy scenariusz : jeśli mamy 100K elementów w określonejRDD
partycji, to uruchomimy funkcję używaną przez transformację odwzorowania 100K razy, gdy używamymap
.
I odwrotnie, jeśli użyjemy, mapPartitions
wtedy wywołamy określoną funkcję tylko raz, ale przekażemy wszystkie 100K rekordów i odzyskamy wszystkie odpowiedzi w jednym wywołaniu funkcji.
Nastąpi wzrost wydajności, ponieważ map
wiele razy działa na określonej funkcji, zwłaszcza jeśli za każdym razem funkcja robi coś kosztownego, czego nie musiałaby robić, gdybyśmy przekazali wszystkie elementy naraz (w przypadku mappartitions
).
mapa
Stosuje funkcję transformacji do każdego elementu RDD i zwraca wynik jako nowy RDD.
Warianty aukcji
def map [U: ClassTag] (f: T => U): RDD [U]
Przykład:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
Jest to wyspecjalizowana mapa wywoływana tylko raz dla każdej partycji. Cała zawartość odpowiednich partycji jest dostępna jako sekwencyjny strumień wartości za pośrednictwem argumentu wejściowego (Iterarator [T]). Funkcja niestandardowa musi zwracać kolejny Iterator [U]. Połączone iteratory wyników są automatycznie konwertowane na nowy RDD. Należy zauważyć, że w następującym wyniku brakuje krotek (3,4) i (6,7) z powodu wybranego przez nas podziału.
preservesPartitioning
wskazuje, czy funkcja wejściowa zachowuje partycjoner, co powinno być, false
chyba że jest to para RDD, a funkcja wejściowa nie modyfikuje klawiszy.
Warianty aukcji
def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], zachowujePartycjonowanie: Boolean = false): RDD [U]
Przykład 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Przykład 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
Powyższy program można również napisać przy użyciu flatMap w następujący sposób.
Przykład 2 z użyciem flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Wniosek:
mapPartitions
transformacja jest szybsza niż map
ponieważ wywołuje twoją funkcję raz / partycję, a nie raz / element.
Dalsza lektura: foreach Vs foreachPartments Kiedy używać Co?