Zaimportuj zawartość pliku csv do ramek danych pyspark


13

Jak mogę zaimportować plik .csv do ramek danych pyspark? Próbowałem nawet odczytać plik csv w Pandas, a następnie przekonwertować go na ramkę danych Spark za pomocą createDataFrame, ale nadal pojawia się jakiś błąd. Czy ktoś może mnie przez to poprowadzić? Powiedz mi też, jak mogę zaimportować plik xlsx? Próbuję zaimportować zawartość CSV do ramek danych Panda, a następnie przekonwertować ją na ramki danych Spark, ale wyświetla błąd:

"Py4JJavaError" An error occurred while calling o28.applySchemaToPythonRDD. : java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 

Mój kod to:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 
sqlc=SQLContext(sc) 
df=pd.read_csv(r'D:\BestBuy\train.csv') 
sdf=sqlc.createDataFrame(df) 

1
Jeśli masz komunikat o błędzie, powinieneś go opublikować; najprawdopodobniej zawiera ważne informacje pomocne w debugowaniu sytuacji.
jagartner

Usiłuję zaimportować zawartość csv do ramek danych pand, a następnie przekonwertować ją na ramki danych iskry .... ale wyświetla błąd coś takiego jak „Py4JJavaError” Wystąpił błąd podczas wywoływania o28.applySchemaToPythonRDD. : java.lang.RuntimeException: java.lang.RuntimeException: Nie można utworzyć instancji org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
neha

a mój kod to -> z importu pyspark SparkContext z importu pyspark.sql import SQLContext import pand jako pd sqlc = SQLContext (sc) df = pd.read_csv (r'D: \ BestBuy \ train.csv ') sdf = sqlc.createDataFrame (df) ----> Błąd
neha

1
Witamy w DataScience.SE! Edytuj oryginalny post zamiast dodawać komentarze.
Emre

ścieżka pliku musi znajdować się w HDFS, wtedy tylko u można uruchomić dane
Prakash Reddy

Odpowiedzi:


13

„Jak mogę zaimportować plik .csv do ramek danych pyspark?” -- Istnieje wiele sposobów, aby to zrobić; najprostszym byłoby uruchomienie pyspark z modułem Spark-csv Databrick. Możesz to zrobić, uruchamiając pyspark

pyspark --packages com.databricks:spark-csv_2.10:1.4.0

następnie możesz wykonać następujące kroki:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv')

Inną metodą byłoby odczytanie z pliku tekstowego jako pliku rdd

myrdd = sc.textFile("yourfile.csv").map(lambda line: line.split(","))

Następnie przekształć dane, aby każdy element miał poprawny format schematu (tj. Ints, Strings, Floats itp.). Będziesz wtedy chciał użyć

>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = sqlContext.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
...    StructField("name", StringType(), True),
...    StructField("age", IntegerType(), True)])
>>> df3 = sqlContext.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]

Odniesienie: http://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.Row

„Powiedz mi też, jak mogę zaimportować plik xlsx?” - Pliki Excel nie są używane w „Big Data”; Spark jest przeznaczony do użytku z dużymi plikami lub bazami danych. Jeśli masz plik Excel o rozmiarze 50 GB, robisz coś źle. Excel nawet nie byłby w stanie otworzyć pliku o takim rozmiarze; z mojego doświadczenia wynika, że ​​wszystko powyżej 20 MB i Excel umiera.


Myślę, że może być problem z powyższym podejściem RDD: pola mogą zawierać znaki nowej linii (aczkolwiek otoczone podwójnymi cudzysłowami), a mianowicie, tools.ietf.org/html/rfc4180#section-2 .
flow2k

możesz użyć narzędzi do konwersji pliku xlsx do csv (takie jak gnumeric lub api open office). możesz normalnie przeprowadzić
analizę

2

Następujące działało dla mnie dobrze:

from pyspark.sql.types import *
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True)]
pd_df = pd.read_csv("<inputcsvfile>")
sp_df = spark.createDataFrame(pd_df, schema=schema)

1

Mam w moim katalogu lokalnym plik „temp.csv”. Stamtąd za pomocą lokalnej instancji wykonuję następujące czynności:

>>> from pyspark import SQLContext
>>> from pyspark.sql import Row
>>> sql_c = SQLContext(sc)
>>> d0 = sc.textFile('./temp.csv')
>>> d0.collect()
[u'a,1,.2390', u'b,2,.4390', u'c,3,.2323']
>>> d1 = d0.map(lambda x: x.split(',')).map(lambda x: Row(label = x[0], number = int(x[1]), value = float(x[2])))
>>> d1.take(1)
[Row(label=u'a', number=1, value=0.239)]
>>> df = sql_c.createDataFrame(d1)
>>> df_cut = df[df.number>1]
>>> df_cut.select('label', 'value').collect()
[Row(label=u'b', value=0.439), Row(label=u'c', value=0.2323)]

Więc d0 to nieprzetworzony plik tekstowy, który wysyłamy do iskry RDD. Aby stworzyć ramkę danych, chcesz rozdzielić plik csv i uczynić każdy wpis typem wiersza, tak jak ja podczas tworzenia d1. Ostatnim krokiem jest utworzenie ramki danych z RDD.


0

Możesz użyć pakietu spark-csv firmy DataBricks, który automatycznie wykonuje wiele czynności, takich jak dbanie o nagłówek, używanie znaków zmiany znaczenia, automatyczne wnioskowanie o schemacie itp. Począwszy od Spark 2.0 jest wbudowana funkcja do obsługi CSV.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.