Badam zachowanie Sparka, gdy dołączam do stołu do siebie. Używam Databricks.
Mój fikcyjny scenariusz to:
Odczytaj zewnętrzną tabelę jako ramkę danych A (pliki bazowe są w formacie delta)
Zdefiniuj ramkę danych B jako ramkę danych A z wybranymi tylko niektórymi kolumnami
Połącz ramki danych A i B w kolumnie 1 i kolumnie 2
(Tak, to nie ma większego sensu, po prostu eksperymentuję, aby zrozumieć mechanikę Sparka)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Moja pierwsza próba polegała na uruchomieniu kodu w obecnej postaci (próba 1). Następnie próbowałem partycjonować i buforować (próba 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
W końcu dokonałem podziału na partycje, posortowałem i zapisałem w pamięci podręcznej
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Odpowiednie wygenerowane dags są załączone.
Moje pytania to:
Dlaczego w próbie 1 tabela wydaje się buforowana, mimo że buforowanie nie zostało wyraźnie określone.
Dlaczego po InMemoreTableScan zawsze występuje inny węzeł tego typu.
Dlaczego w próbie 3 buforowanie wydaje się odbywać na dwóch etapach?
Dlaczego w próbie 3 WholeStageCodegen podąża za jednym (i tylko jednym) InMemoreTableScan.