Załaduj plik CSV za pomocą Sparka


110

Jestem nowy w Spark i próbuję odczytać dane CSV z pliku za pomocą Spark. Oto co robię:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Spodziewałbym się, że to wywołanie da mi listę dwóch pierwszych kolumn mojego pliku, ale otrzymuję ten błąd:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

chociaż mój plik CSV ma więcej niż jedną kolumnę.

Odpowiedzi:


63

Czy na pewno wszystkie linie mają co najmniej 2 kolumny? Czy możesz spróbować czegoś takiego, żeby sprawdzić ?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternatywnie możesz wydrukować winowajcę (jeśli istnieje):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

To było to, jeden wiersz z tylko jedną kolumną, dziękuję.
Kernael

2
Lepiej przeanalizować za pomocą wbudowanej csvbiblioteki do obsługi wszystkich znaków ucieczki, ponieważ zwykłe dzielenie przecinkiem nie zadziała, jeśli, powiedzmy, w wartościach znajdują się przecinki.
sudo

4
Jest wiele narzędzi do analizowania csv, nie odkrywaj na nowo koła
Stephen

2
Ten kod zepsuje się, jeśli w cudzysłowie znajduje się przecinek. Przetwarzanie pliku csv jest bardziej skomplikowane niż zwykłe dzielenie na ",".
Alceu Costa,

To przerywa na przecinki. To jest bardzo złe.
rjurney

184

Spark 2.0.0+

Możesz bezpośrednio użyć wbudowanego źródła danych csv:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

lub

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

bez uwzględniania zależności zewnętrznych.

Spark <2.0.0 :

Zamiast ręcznego parsowania, co w ogólnym przypadku wcale nie jest trywialne, radziłbym spark-csv:

Upewnij się, że Spark CSV jest wliczone w ścieżce ( --packages, --jars, --driver-class-path)

I załaduj swoje dane w następujący sposób:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Obsługuje ładowanie, wnioskowanie o schemacie, porzucanie źle sformułowanych wierszy i nie wymaga przekazywania danych z języka Python do maszyny JVM.

Uwaga :

Jeśli znasz schemat, lepiej unikać wnioskowania o schemacie i przekazać go do DataFrameReader. Zakładając, że masz trzy kolumny - integer, double i string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

6
Jeśli to zrobisz, nie zapomnij dołączyć pakietu csv databricks podczas otwierania powłoki pyspark lub używania funkcji spark-submit. Na przykład pyspark --packages com.databricks:spark-csv_2.11:1.4.0(upewnij się, że zmieniłeś wersje databricks / spark na te, które masz zainstalowane).
Galen Long

Czy jest to csvContext czy sqlContext w pyspark? Ponieważ w scali potrzebujesz csvContext
Geoffrey Anderson

28
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())

użyj 'sep, a nie' separator 'w następujący sposób: df = spark.read.csv ("/ home / stp / test1.csv", header = True, sep = "|")
Grant Shannon

18

I jeszcze jedna opcja polegająca na odczytaniu pliku CSV za pomocą Pandas, a następnie zaimportowaniu Pandas DataFrame do Spark.

Na przykład:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

7
Dlaczego OP miałby robić na
iskrze,

Nie chcę instalować ani określać zależności na każdym klastrze iskrowym ...
SummerEla

Panda pozwala na fragmentowanie plików podczas czytania, więc nadal istnieje przypadek użycia, aby Pandy zajmowały się początkową analizą plików. Zobacz moją odpowiedź poniżej na kod.
abby sobh

Uwaga: Pandy również radzą sobie ze schematem kolumn w inny sposób niż iskra, zwłaszcza gdy w grę wchodzą spacje. Bezpieczniej jest po prostu załadować plik csv jako ciągi dla każdej kolumny.
AntiPawn79

@WoodChopper Możesz używać Pand jako UDF w Spark, prawda?
flow2k

16

Zwykłe dzielenie przecinkiem spowoduje również podzielenie przecinków, które znajdują się w polach (np. a,b,"1,2,3",c), Więc nie jest zalecane. Odpowiedź zero323 jest dobra, jeśli chcesz używać API DataFrames, ale jeśli chcesz trzymać się podstawowego Spark, możesz przeanalizować csv w podstawowym Pythonie za pomocą modułu csv :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDYCJA: Jak wspomniał @muon w komentarzach, potraktuje to nagłówek jak każdy inny wiersz, więc musisz go wyodrębnić ręcznie. Na przykład header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(pamiętaj, aby nie modyfikować, headerzanim filtr oceni). Ale w tym momencie prawdopodobnie lepiej będzie, jeśli użyjesz wbudowanego parsera csv.


1
Nie potrzebujesz Hive, aby używać DataFrames. Jeśli chodzi o Twoje rozwiązanie: a) Nie ma takiej potrzeby StringIO. csvmoże używać dowolnej iterowalnej b) __next__nie powinno być używane bezpośrednio i zakończy się niepowodzeniem w pustym wierszu. Spójrz na flatMap c) Byłoby o wiele bardziej efektywne w użyciu mapPartitionszamiast inicjowania czytelnika na każdej linii :)
zero323

Dziękuję bardzo za poprawki! Zanim zmienię odpowiedź, chcę się upewnić, że w pełni ją rozumiem. 1) Dlaczego rdd.mapPartitions(lambda x: csv.reader(x))działa, gdy rdd.map(lambda x: csv.reader(x))zgłasza błąd? Spodziewałem się, że obaj rzucą to samo TypeError: can't pickle _csv.reader objects. Wydaje się również, że mapPartitionsautomatycznie wywołuje jakiś odpowiednik "readlines" na csv.readerobiekcie, gdzie z map, musiałem wywołać __next__jawnie, aby uzyskać listy z csv.reader. 2) Gdzie flatMapwchodzi? Samo dzwonienie mapPartitionszadziałało dla mnie.
Galen Long

1
rdd.mapPartitions(lambda x: csv.reader(x))działa, ponieważ mapPartitionsoczekuje Iterableobiektu. Jeśli chcesz być wyraźny, możesz użyć wyrażenia ze zrozumieniem lub generatora. mapsam nie działa, ponieważ nie wykonuje iteracji po obiekcie. Stąd moja propozycja użycia, flatMap(lambda x: csv.reader([x]))która będzie iterować po czytelniku. Ale tutaj mapPartitionsjest znacznie lepiej.
zero323

1
zauważ, że to odczyta nagłówek jako wiersz danych, a nie jako nagłówek
mion

7

To jest w PYSPARKU

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

Następnie możesz sprawdzić

df.show(5)
df.count()

6

Jeśli chcesz załadować csv jako ramkę danych, możesz wykonać następujące czynności:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

U mnie to działało dobrze.


@GalenLong, jeśli nie masz nic przeciwko, czy możesz podzielić się już istniejącą odpowiedzią
Jeril

Dziwne, przysięgam, że w przypadku tego rozwiązania była inna odpowiedź. Może pomyliłem to z innym pytaniem. Mój błąd.
Galen Long

5

Jest to zgodne z tym, co początkowo sugerował JP Mercier na temat używania Pand, ale z dużą modyfikacją: jeśli wczytujesz dane do Pand w kawałkach, powinno to być bardziej plastyczne. Oznacza to, że możesz przeanalizować znacznie większy plik, niż Pandy może obsłużyć jako pojedynczy element i przekazać go do Spark w mniejszych rozmiarach. (To również odpowiada na komentarz dotyczący tego, dlaczego ktoś chciałby używać Sparka, skoro i tak mogą załadować wszystko do Pand).

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

5

Teraz jest też inna opcja dla dowolnego ogólnego pliku csv: https://github.com/seahboonsiew/pyspark-csv w następujący sposób:

Załóżmy, że mamy następujący kontekst

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Najpierw roześlij pyspark-csv.py do programów wykonawczych za pomocą SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Odczytaj dane CSV za pośrednictwem SparkContext i przekonwertuj je na DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

3

Jeśli twoje dane csv nie zawierają znaków nowej linii w żadnym z pól, możesz załadować swoje dane textFile()i przeanalizować je

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

2

Jeśli masz jeden lub więcej wierszy z mniejszą lub większą liczbą kolumn niż 2 w zbiorze danych, może wystąpić ten błąd.

Jestem też nowy w Pyspark i próbuję odczytać plik CSV. Poniższy kod zadziałał dla mnie:

W tym kodzie używam zestawu danych z kaggle, łącze to: https://www.kaggle.com/carrie1/ecommerce-data

1. Bez wspominania o schemacie:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

Teraz sprawdź kolumny: sdfData.columns

Wynik będzie:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

Sprawdź typ danych dla każdej kolumny:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

To da ramkę danych ze wszystkimi kolumnami z typem danych jako StringType

2. Ze schematem: Jeśli znasz schemat lub chcesz zmienić typ danych dowolnej kolumny w powyższej tabeli, użyj tego (powiedzmy, że mam następujące kolumny i chcę, aby były w określonym typie danych dla każdej z nich)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

Teraz sprawdź schemat dla typu danych każdej kolumny:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

Edytowano: możemy również użyć następującego wiersza kodu bez wyraźnego wspominania o schemacie:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

Wynik to:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

Wynik będzie wyglądał następująco:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

1

Podczas korzystania spark.read.csvuważam, że korzystając z opcji escape='"'i multiLine=Truezapewniam najbardziej spójne rozwiązanie do standardu CSV , az mojego doświadczenia wynika, że najlepiej działa z plikami CSV wyeksportowanymi z Arkuszy Google.

To jest,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

skąd pochodzi iskra? czy to import pyspark as sparkjest
Luk Aron

@LukAron W powłoce pyspark sparkjest już zainicjowany. W skrypcie przesłanym przez spark-submitmożesz utworzyć jego wystąpienie jako from pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate().
flow2k
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.