RDD rozszerzają interfejs Serializowalny , więc nie jest to przyczyną niepowodzenia Twojego zadania. Teraz nie oznacza to, że możesz serializować RDD
z Spark i unikaćNotSerializableException
Spark to silnik przetwarzania rozproszonego, a jego główną abstrakcją jest odporny rozproszony zestaw danych ( RDD) ), który można postrzegać jako zbiór rozproszony. Zasadniczo elementy RDD są podzielone na węzły klastra, ale Spark wyodrębnia to od użytkownika, pozwalając użytkownikowi na interakcję z RDD (kolekcją) tak, jakby była lokalna.
Nie dostać się do zbyt wielu szczegółów, ale po uruchomieniu różne transformacje na RDD ( map
, flatMap
, filter
i inne), kod transformacja (zamknięcie) wynosi:
- zserializowany w węźle sterownika,
- wysłane do odpowiednich węzłów w klastrze,
- deserializowane,
- i ostatecznie wykonane w węzłach
Możesz oczywiście uruchomić to lokalnie (jak w twoim przykładzie), ale wszystkie te fazy (oprócz wysyłki przez sieć) nadal występują. [Pozwala to na wykrycie błędów nawet przed wdrożeniem do produkcji]
W drugim przypadku zdarza się, że wywołujesz metodę zdefiniowaną w klasie testing
z wnętrza funkcji mapy. Spark widzi to i ponieważ metody nie mogą być serializowane same, Spark próbuje serializować całą testing
klasę, aby kod nadal działał, gdy zostanie wykonany w innej maszynie JVM. Masz dwie możliwości:
Albo sprawisz, że testy klasowe będą serializowane, aby cała klasa mogła być serializowana przez Spark:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
lub utworzysz someFunc
funkcję zamiast metody (funkcje są obiektami w Scali), dzięki czemu Spark będzie mógł ją serializować:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
Podobny, ale nie ten sam problem z serializacją klas może Cię zainteresować i możesz go przeczytać w tej prezentacji Spark Summit 2013 .
Na marginesie, możesz przepisać rddList.map(someFunc(_))
na rddList.map(someFunc)
, są one dokładnie takie same. Zazwyczaj drugi jest preferowany, ponieważ jest mniej gadatliwy i czytelniejszy.
EDYCJA (2015-03-15): SPARK-5307 wprowadził SerializationDebugger, a Spark 1.3.0 jest pierwszą wersją, która go używa. Dodaje ścieżkę serializacji do NotSerializableException . Po napotkaniu NotSerializableException debugger odwiedza wykres obiektu, aby znaleźć ścieżkę do obiektu, którego nie można zserializować, i konstruuje informacje, aby pomóc użytkownikowi znaleźć obiekt.
W przypadku OP to jest drukowane na standardowe wyjście:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)