Wyodrębnij wartości kolumn Dataframe jako List w Apache Spark


86

Chcę przekonwertować kolumnę z ciągiem ramki danych na listę. To, co mogę znaleźć w DataframeAPI, to RDD, więc najpierw próbowałem przekonwertować go z powrotem na RDD, a następnie zastosować toArrayfunkcję do RDD. W takim przypadku długość i SQL działają dobrze. Jednak wynik, który otrzymałem z RDD, ma nawiasy kwadratowe wokół każdego takiego elementu [A00001]. Zastanawiałem się, czy istnieje odpowiedni sposób przekonwertowania kolumny na listę lub sposób na usunięcie nawiasów kwadratowych.

Wszelkie sugestie będą mile widziane. Dziękuję Ci!


Odpowiedzi:


117

Powinno to zwrócić zbiór zawierający jedną listę:

dataFrame.select("YOUR_COLUMN_NAME").rdd.map(r => r(0)).collect()

Bez mapowania otrzymujesz po prostu obiekt Row, który zawiera każdą kolumnę z bazy danych.

Pamiętaj, że prawdopodobnie dostaniesz listę dowolnego typu. ÏJeśli chcesz określić typ wyniku, możesz użyć .asInstanceOf [YOUR_TYPE] w r => r(0).asInstanceOf[YOUR_TYPE]mapowaniu

PS dzięki automatycznej konwersji możesz pominąć .rddczęść.


3
Z jakiegoś dziwnego powodu działa odwrotnie (Spark 2.1.0) collect().map(r => r(0))- czy ta kolejność ma jakieś wady?
Boern

Może być wolniejsze - Twoje rozwiązanie najpierw zbiera wszystkie dane na sterowniku, a następnie wykonuje mapowanie na sterowniku (bez pomocy wykonawców), wykorzystując tylko moc obliczeniową pojedynczego sterownika.
Niemand

72

Z Spark 2.x i Scala 2.11

Wymyśliłbym 3 możliwe sposoby konwersji wartości określonej kolumny na Listę.

Wspólne fragmenty kodu dla wszystkich podejść

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.getOrCreate    
import spark.implicits._ // for .toDF() method

val df = Seq(
    ("first", 2.0),
    ("test", 1.5), 
    ("choose", 8.0)
  ).toDF("id", "val")

Podejście 1

df.select("id").collect().map(_(0)).toList
// res9: List[Any] = List(one, two, three)

Co się teraz stanie? Zbieramy dane do Kierowcy collect()i wybieramy element zerowy z każdego rekordu.

To nie może być doskonały sposób na zrobienie tego. Poprawmy to następnym podejściem.


Podejście 2

df.select("id").rdd.map(r => r(0)).collect.toList 
//res10: List[Any] = List(one, two, three)

Jak to jest lepsze? Rozłożyliśmy obciążenie związane z transformacją mapy na pracowników, a nie na jednego kierowcę.

Wiem, rdd.map(r => r(0))że nie wydaje ci się elegancka. Więc zajmijmy się tym w następnym podejściu.


Podejście 3

df.select("id").map(r => r.getString(0)).collect.toList 
//res11: List[String] = List(one, two, three)

Tutaj nie konwertujemy DataFrame na RDD. Spójrz na mapto, że nie zaakceptuje r => r(0)(lub _(0)) jak poprzednie podejście z powodu problemów z koderem w DataFrame. Skończ więc z używaniem r => r.getString(0)i zostanie to rozwiązane w następnych wersjach Spark.

Wniosek

Wszystkie opcje dają taką samą wydajność, ale 2 i 3 są efektywne, w końcu trzecia jest skuteczna i elegancka (tak mi się wydaje).

Notatnik Databricks


24

Wiem, że odpowiedź udzielona i poproszona jest założona dla Scala, więc podaję tylko mały fragment kodu Pythona na wypadek, gdyby użytkownik PySpark był ciekawy. Składnia jest podobna do podanej odpowiedzi, ale aby poprawnie wyświetlić listę, muszę odwołać się do nazwy kolumny po raz drugi w funkcji mapowania i nie potrzebuję instrukcji select.

tj. DataFrame, zawierająca kolumnę o nazwie „Raw”

Aby uzyskać wartość każdego wiersza w „Raw” połączoną jako listę, gdzie każdy wpis jest wartością wiersza z „Raw”, po prostu używam:

MyDataFrame.rdd.map(lambda x: x.Raw).collect()

4
Daje to listę obiektów Row. A jeśli chcesz listę wartości?
ThatDataGuy

Daje to listę wartości.
abby sobh

Dzięki za udostępnienie tego! To działa dla mnie świetnie, zastanawiając się, czy istnieje sposób, aby to przyspieszyć, działa dość wolno
Mojgan Mazouchi

5

W Scali i Spark 2+ spróbuj tego (zakładając, że nazwa Twojej kolumny to „s”): df.select('s).as[String].collect


3
sqlContext.sql(" select filename from tempTable").rdd.map(r => r(0)).collect.toList.foreach(out_streamfn.println) //remove brackets

działa idealnie


1
List<String> whatever_list = df.toJavaRDD().map(new Function<Row, String>() {
    public String call(Row row) {
        return row.getAs("column_name").toString();
    }
}).collect();

logger.info(String.format("list is %s",whatever_list)); //verification

Ponieważ nikt nie podał żadnego rozwiązania w java (prawdziwy język programowania), możesz mi później podziękować


0
from pyspark.sql.functions import col

df.select(col("column_name")).collect()

tutaj collect to funkcje, które z kolei konwertują je na listę. Uważaj na korzystanie z listy w ogromnym zbiorze danych. Spowoduje to obniżenie wydajności. Warto sprawdzić dane.



0

Zaktualizowane rozwiązanie, które zawiera listę:

dataFrame.select("YOUR_COLUMN_NAME").map(r => r.getString(0)).collect.toList
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.