Aktualizacja
Ta odpowiedź jest nadal ważne i pouczające, chociaż rzeczy są teraz lepiej od 2.2 / 2.3, który dodaje wbudowane wsparcie dla enkodera Set
, Seq
, Map
, Date
, Timestamp
, i BigDecimal
. Jeśli trzymasz się tworzenia typów tylko z klasami przypadków i zwykłymi typami Scala, powinieneś być w porządku z tylko niejawnym w SQLImplicits
.
Niestety, praktycznie nic nie zostało dodane, aby w tym pomóc. Wyszukiwanie @since 2.0.0
w Encoders.scala
lub SQLImplicits.scala
znajdowanie rzeczy głównie związanych z typami prymitywnymi (i pewne modyfikacje klas przypadków). A więc pierwszą rzeczą do powiedzenia: obecnie nie ma naprawdę dobrego wsparcia dla niestandardowych koderów klas . Po usunięciu tego z drogi, oto kilka sztuczek, które wykonują tak dobrą robotę, jak możemy kiedykolwiek mieć nadzieję, biorąc pod uwagę to, co obecnie mamy do dyspozycji. Z góry zastrzeżenie: to nie zadziała idealnie i dołożę wszelkich starań, aby wszystkie ograniczenia były jasne i z góry.
Na czym dokładnie polega problem
Jeśli chcesz utworzyć zestaw danych, Spark „wymaga kodera (do konwersji obiektu JVM typu T na wewnętrzną reprezentację Spark SQL i z niej), który jest generalnie tworzony automatycznie za pomocą implicits z a SparkSession
lub może być tworzony jawnie przez wywołanie metod statycznych on Encoders
”(zaczerpnięte z dokumentacjicreateDataset
). Koder przyjmie postać, w Encoder[T]
której T
jest typem, który kodujesz. Pierwszą sugestią jest dodanie import spark.implicits._
(co daje te niejawne kodery), a drugą sugestią jest jawne przekazanie niejawnego kodera przy użyciu tego zestawu funkcji związanych z koderem.
Nie ma kodera dostępnego dla zwykłych klas, więc
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
poda następujący niejawny powiązany błąd czasu kompilacji:
Nie można znaleźć kodera dla typu przechowywanego w zestawie danych. Typy pierwotne (Int, String itp.) I Typy produktów (klasy przypadków) są obsługiwane przez importowanie sqlContext.implicits._ Obsługa serializacji innych typów zostanie dodana w przyszłych wersjach
Jeśli jednak zawiniesz dowolny typ, którego właśnie użyłeś, aby uzyskać powyższy błąd w jakiejś rozszerzonej klasie Product
, błąd myląco zostanie opóźniony do czasu wykonania, więc
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Kompiluje się dobrze, ale kończy się niepowodzeniem w czasie wykonywania z
java.lang.UnsupportedOperationException: nie znaleziono kodera dla MyObj
Powodem tego jest to, że kodery, które Spark tworzy za pomocą implicitów, są w rzeczywistości tworzone tylko w czasie wykonywania (przez ponowne sprawdzenie scali). W tym przypadku wszystkie kontrole Spark w czasie kompilacji polegają na tym, że najbardziej zewnętrzna klasa rozszerza Product
(co robią wszystkie klasy przypadków) i dopiero w czasie wykonywania zdaje sobie sprawę, że nadal nie wie, co zrobić MyObj
(ten sam problem występuje, gdy próbuję a Dataset[(Int,MyObj)]
- Spark czeka, aż runtime włączy się MyObj
). Oto główne problemy, które pilnie wymagają rozwiązania:
- niektóre klasy, które rozszerzają
Product
kompilację, mimo że zawsze ulegają awarii w czasie wykonywania i
- nie ma sposobu na przekazanie niestandardowych koderów dla typów zagnieżdżonych (nie mam możliwości przekazania Spark koderowi tylko po to
MyObj
, aby wiedział, jak zakodować Wrap[MyObj]
lub (Int,MyObj)
).
Po prostu użyj kryo
Rozwiązaniem, które wszyscy sugerują, jest użycie kryo
kodera.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Jednak szybko staje się to nudne. Zwłaszcza, jeśli Twój kod manipuluje różnymi zbiorami danych, łączy się, grupuje itp. W rezultacie uzyskujesz kilka dodatkowych implikacji. Dlaczego więc po prostu nie założyć, że robi to wszystko automatycznie?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
A teraz wygląda na to, że mogę zrobić prawie wszystko, co chcę (poniższy przykład nie zadziała w miejscu, w spark-shell
którym spark.implicits._
jest automatycznie importowany)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Albo prawie. Problem polega na tym, że użycie kryo
prowadzi do Spark po prostu przechowuje każdy wiersz w zestawie danych jako płaski obiekt binarny. Na map
, filter
, foreach
że to wystarczy, ale dla takich operacji join
, Spark naprawdę potrzebuje tych mają być rozdzielone na kolumnach. Po sprawdzeniu schematu pod kątem d2
lub d3
widać, że jest tylko jedna kolumna binarna:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Częściowe rozwiązanie dla krotek
Tak więc, używając magii implikacji w Scali (więcej w 6.26.3 Overloading Resolution ), mogę stworzyć serię implikacji, które wykonają jak najlepszą robotę, przynajmniej w przypadku krotek, i będą dobrze działać z istniejącymi implikacjami:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Następnie, uzbrojony w te implikacje, mogę sprawić, że powyższy przykład zadziała, aczkolwiek z pewnymi zmianami nazw kolumn
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
I jeszcze nie zorientowali się, jak uzyskać oczekiwane nazwiska krotka ( _1
, _2
...) domyślnie bez zmiany nazwy ich - jeśli ktoś chce się bawić z tym, to jest, gdy nazwa "value"
zostanie wprowadzona, a to jest, gdy krotka nazwy są zwykle dodawane. Jednak kluczową kwestią jest to, że mam teraz ładny strukturalny schemat:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Podsumowując, to obejście:
- pozwala nam uzyskać oddzielne kolumny dla krotek (abyśmy mogli ponownie dołączyć do krotek, yay!)
- możemy znowu polegać na implikacjach (więc nie ma potrzeby przechodzenia w
kryo
każdym miejscu)
- jest prawie całkowicie wstecznie kompatybilny z
import spark.implicits._
( wymaga zmiany nazwy)
- nie nie pozwól nam przyłączyć się na
kyro
serializacji kolumnach binarnych, nie mówiąc już o tych pól może mieć
- ma nieprzyjemny efekt uboczny zmiany nazwy niektórych kolumn krotki na „wartość” (w razie potrzeby można to cofnąć, konwertując
.toDF
, określając nowe nazwy kolumn i konwertując z powrotem do zestawu danych - a nazwy schematów wydają się być zachowane dzięki złączeniom , gdzie są najbardziej potrzebne).
Częściowe rozwiązanie dla zajęć w ogóle
Ten jest mniej przyjemny i nie ma dobrego rozwiązania. Jednak teraz, gdy mamy powyższe rozwiązanie krotki, mam przeczucie, że rozwiązanie niejawnej konwersji z innej odpowiedzi będzie również nieco mniej bolesne, ponieważ możesz przekonwertować bardziej złożone klasy na krotki. Następnie, po utworzeniu zestawu danych, prawdopodobnie zmieniłbyś nazwy kolumn, używając podejścia Dataframe. Jeśli wszystko pójdzie dobrze, jest to naprawdę poprawa, ponieważ mogę teraz wykonywać połączenia na polach moich zajęć. Gdybym użył tylko jednego płaskiego kryo
serializatora binarnego , nie byłoby to możliwe.
Oto przykład, który ma wszystkiego po trochu: Mam klasy MyObj
, która ma pola typów Int
, java.util.UUID
oraz Set[String]
. Pierwsza dba o siebie. Drugi, chociaż mógłbym serializować za pomocą, kryo
byłby bardziej przydatny, gdyby był przechowywany jako a String
(ponieważ UUID
s są zwykle czymś, przeciwko czemu chcę się przyłączyć). Trzeci tak naprawdę należy do kolumny binarnej.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Teraz mogę utworzyć zbiór danych z ładnym schematem przy użyciu tej maszyny:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Schemat pokazuje mi kolumny I z właściwymi nazwami i dwoma pierwszymi rzeczami, które mogę połączyć.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
ExpressionEncoder
przy użyciu serializacji JSON? W moim przypadku krotki nie ujdą na sucho, a kryo daje mi kolumnę binarną ..