Spark - załadować plik CSV jako DataFrame?


141

Chciałbym przeczytać plik CSV w Spark i przekonwertować go na DataFrame i zapisać w HDFS z df.registerTempTable("table_name")

Próbowałem:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Błąd, który otrzymałem:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Jakie jest właściwe polecenie, aby załadować plik CSV jako DataFrame w Apache Spark?


Odpowiedzi:


180

Spark-csv jest częścią podstawowych funkcji platformy Spark i nie wymaga osobnej biblioteki. Możesz więc po prostu zrobić na przykład

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

W scali (działa to w przypadku każdej wzmianki o separatorze formatowania „,” dla csv, „\ t” dla tsv itp.)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")


163

Przeanalizuj plik CSV i załaduj jako DataFrame / DataSet za pomocą Spark 2.x

Najpierw zainicjalizuj SparkSessionobiekt , który będzie domyślnie dostępny w powłokach jakospark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Użyj jednego z poniższych sposobów, aby załadować plik CSV jako DataFrame/DataSet

1. Zrób to w sposób programowy

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Aktualizacja: Dodanie wszystkich opcji z tego miejsca na wypadek, gdyby łącze zostało zerwane w przyszłości

  • ścieżka : lokalizacja plików. Podobnie jak Spark może akceptować standardowe wyrażenia globalizujące Hadoop.
  • nagłówek : gdy ustawione na true, pierwsza linia plików będzie używana do nadawania nazw kolumnom i nie będzie uwzględniona w danych. Wszystkie typy będą traktowane jako ciąg. Wartość domyślna to false.
  • separator : domyślnie kolumny są rozdzielane za pomocą, ale separator można ustawić na dowolny znak
  • cytat : domyślnie cudzysłów to „, ale można go ustawić na dowolny znak. Ograniczniki w cudzysłowach są ignorowane
  • escape : domyślnie znakiem ucieczki jest, ale można go ustawić na dowolny znak. Znaki cudzysłowu uciekającego są ignorowane
  • parserLib : domyślnie „ commons ” może być ustawione na „ univocity ”, aby używać tej biblioteki do analizowania CSV.
  • tryb : określa tryb analizowania. Domyślnie jest ZEZWOLENIE. Możliwe wartości to:
    • PERMISSIVE : próbuje przeanalizować wszystkie wiersze: wartości null są wstawiane w przypadku brakujących tokenów, a dodatkowe tokeny są ignorowane.
    • DROPMALFORMED : upuszcza linie, które mają mniej lub więcej tokenów niż oczekiwano lub tokenów, które nie pasują do schematu
    • FAILFAST : przerywa działanie z RuntimeException, jeśli napotka jakikolwiek zniekształcony zestaw znaków linii: domyślnie `` UTF-8 '', ale można ustawić inne prawidłowe nazwy zestawów znaków
  • inferSchema : automatycznie wnioskuje typy kolumn. Wymaga jednego dodatkowego przejścia przez dane i domyślnie ma wartość false. Komentarz: pomiń wiersze zaczynające się od tego znaku. Domyślnie jest to „#”. Wyłącz komentarze, ustawiając wartość null.
  • nullValue : określa ciąg, który wskazuje wartość null, wszystkie pola pasujące do tego ciągu zostaną ustawione jako null w DataFrame
  • dateFormat : określa ciąg, który wskazuje format daty używany podczas odczytywania dat lub znaczników czasu. Niestandardowe formaty dat są zgodne z formatami podanymi w java.text.SimpleDateFormat. Dotyczy to zarówno DateType, jak i TimestampType. Domyślnie jest to null, co oznacza próbę przeanalizowania czasu i daty przez java.sql.Timestamp.valueOf () i java.sql.Date.valueOf ().

2. Możesz również zrobić to w SQL

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Zależności :

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Wersja Spark <2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Zależności:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

czy ta sesja wymaga ula? Otrzymuję błędy ula.
Puneet

2
Nie ma potrzeby. Tylko spark-core_2.11i spark-sql_2.11od 2.0.1wersji jest w porządku. Jeśli to możliwe, dodaj komunikat o błędzie.
mrsrinivas

1
czy możemy przekonwertować plik rozdzielany potokami na ramkę danych?
Omkar

3
@OmkarPuttagunta: Tak, oczywiście! spróbuj czegoś takiego spark.read.format("csv").option("delimiter ", "|") ...
mrsrinivas

1
Inną opcją dla programmatic wayjest pozostawienie off .format("csv")i wymienić .load(...z .csv(.... optionMetoda należy do klasy DataFrameReader jako zwrócony przez readmetodę, gdzie loadi csvmetody zwracają dataframe więc nie może mieć opcje oznaczone na po wywołaniu. Ta odpowiedź jest dość dokładna, ale powinieneś podać link do dokumentacji, aby ludzie mogli zobaczyć wszystkie inne dostępne opcje CSV spark.apache.org/docs/latest/api/scala/ ... *): org.apache.spark.sql.DataFrame
Davos

17

Jest dla którego Hadoop to 2,6, a Spark to 1,6 i bez pakietu „databricks”.

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)

12

W przypadku Spark 2.0 poniżej opisano, jak czytać CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)

5
Czy jest różnica między spark.read.csv(path)i spark.read.format("csv").load(path)?
Eric

8

W Javie 1.8 Ten fragment kodu doskonale działa do odczytu plików CSV

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Jawa

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();

Chociaż może to być przydatne dla kogoś. Pytanie ma tag Scala.
OneCricketeer

5

Przetwarzanie pliku CSV wiąże się z wieloma wyzwaniami, sumuje się, jeśli rozmiar pliku jest większy, jeśli w wartościach kolumn znajdują się inne znaki niż angielski / ucieczka / separator / inne znaki, które mogą powodować błędy analizy.

Magia tkwi zatem w zastosowanych opcjach. Te, które działały dla mnie i mam nadzieję, że powinny obejmować większość przypadków skrajnych, są w kodzie poniżej:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

Mam nadzieję, że to pomoże. Więcej informacji: Używanie PySpark 2 do czytania CSV z kodem źródłowym HTML

Uwaga: powyższy kod pochodzi z interfejsu API Spark 2, w którym interfejs API do odczytu plików CSV jest dostarczany w pakiecie z wbudowanymi pakietami platformy Spark do zainstalowania.

Uwaga: PySpark jest opakowaniem Pythona dla platformy Spark i ma ten sam interfejs API co Scala / Java.


Dziękuję bardzo, uratowałeś mi życie: D
Khubaib Raza

4

Przykład Penny's Spark 2 to sposób na zrobienie tego w Spark2. Jest jeszcze jedna sztuczka: wygeneruj ten nagłówek, wykonując wstępne skanowanie danych, ustawiając opcję inferSchemanatrue

Tutaj więc, zakładając, że sparkjest to sesja iskrowa, którą skonfigurowałeś, jest operacja załadowania do pliku indeksu CSV wszystkich obrazów Landsat, które amazon hostuje na S3.

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

Zła wiadomość jest taka: uruchamia to skanowanie pliku; dla czegoś dużego, takiego jak ten spakowany plik CSV o wielkości 20 + MB, który może zająć 30 sekund w przypadku połączenia długodystansowego. Miej to na uwadze: lepiej ręcznie zakodować schemat, gdy już się pojawi.

(fragment kodu Apache Software License 2.0 licencjonowany w celu uniknięcia wszelkich niejasności; coś, co zrobiłem jako test demonstracyjny / test integracji integracji S3)


Nie widziałem tej metody csv ani przekazywania mapy do opcji. Uzgodnione, że zawsze lepiej jest podawać jawny schemat, inferSchema jest w porządku dla szybkiego i brudnego (aka nauki o danych), ale straszny dla ETL.
Davos,

2

W przypadku, gdy budujesz słoik w wersji 2.11 i Apache 2.0 lub nowszej.

Nie ma potrzeby tworzenia obiektu sqlContextlub sparkContext. Tylko SparkSessionprzedmiot wystarcza na wszystkie potrzeby.

Poniżej znajduje się mycode, który działa dobrze:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

Jeśli pracujesz w klastrze, po prostu zmień .master("local")na .master("yarn")podczas definiowania sparkBuilderobiektu

Omówiono to w Spark Doc: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html


To jest to samo, co istniejące odpowiedzi
mrsrinivas

0

Dodaj następujące zależności Spark do pliku POM:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

// Konfiguracja Spark:

val spark = SparkSession.builder (). master ("local"). appName ("Przykładowa aplikacja"). getOrCreate ()

// Przeczytaj plik csv:

val df = spark.read.option ("nagłówek", "prawda"). csv ("ŚCIEŻKA_PLIKU")

// Wyświetl wyjście

df.show ()


0

Aby odczytać ze ścieżki względnej w systemie, użyj metody System.getProperty, aby uzyskać bieżący katalog, a następnie użyj do załadowania pliku przy użyciu ścieżki względnej.

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

iskra: 2.4.4 scala: 2.11.12


0

W przypadku platformy Spark 2.4+, jeśli chcesz załadować plik csv z katalogu lokalnego, możesz użyć 2 sesji i załadować go do gałęzi. Pierwsza sesja powinna zostać utworzona z master () config jako „local [*]”, a druga sesja z włączonymi „yarn” i Hive.

Poniższy zadziałał dla mnie.

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

object testCSV { 

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()

    import spark_local.implicits._
    spark_local.sql("SET").show(100,false)
    val local_path="/tmp/data/spend_diversity.csv"  // Local file
    val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
    df_local.show(false)

    val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()

    import spark.implicits._
    spark.sql("SET").show(100,false)
    val df = df_local
    df.createOrReplaceTempView("lcsv")
    spark.sql(" drop table if exists work.local_csv ")
    spark.sql(" create table work.local_csv as select * from lcsv ")

   }

Po uruchomieniu z spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jarnim poszło dobrze i stworzyłem stół w ulu.


-1

Domyślnym formatem pliku jest Parquet z spark.read .. i odczytem pliku csv, dlatego otrzymujesz wyjątek. Określ format CSV za pomocą interfejsu API, którego próbujesz użyć


-1

Spróbuj tego, jeśli używasz Spark 2.0+

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")


For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Uwaga: - to działa dla każdego rozdzielanego pliku. Po prostu użyj opcji („separator”,), aby zmienić wartość.

Mam nadzieję, że to jest pomocne.


To jest to samo, co istniejące odpowiedzi
mrsrinivas

-1

Dzięki wbudowanemu Spark CSV możesz to łatwo zrobić dzięki nowemu obiektowi SparkSession dla Spark> 2.0.

val df = spark.
        read.
        option("inferSchema", "false").
        option("header","true").
        option("mode","DROPMALFORMED").
        option("delimiter", ";").
        schema(dataSchema).
        csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

Istnieje wiele opcji, które możesz ustawić.

  • header: czy twój plik zawiera linię nagłówka na górze
  • inferSchema: czy chcesz automatycznie wywnioskować schemat, czy nie. Domyślnie jest true. Zawsze wolę udostępniać schemat, aby zapewnić odpowiednie typy danych.
  • mode: tryb analizy, PERMISSIVE, DROPMALFORMED lub FAILFAST
  • delimiter: aby określić separator, domyślnie jest to przecinek (',')
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.