Jak załadować plik lokalny w sc.textFile, zamiast HDFS


100

Postępuję zgodnie ze świetnym samouczkiem dotyczącym iskier

więc próbuję na 46 min: 00 s, aby załadować, README.mdale nie udaje mi się to, co robię, jest to:

$ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash
bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4
bash-4.1# ls README.md
README.md
bash-4.1# ./bin/spark-shell
scala> val f = sc.textFile("README.md")
14/12/04 12:11:14 INFO storage.MemoryStore: ensureFreeSpace(164073) called with curMem=0, maxMem=278302556
14/12/04 12:11:14 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.2 KB, free 265.3 MB)
f: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

jak mogę to załadować README.md?

Odpowiedzi:


177

Spróbuj wyraźnie określić sc.textFile("file:///path to the file/"). Błąd występuje, gdy jest ustawione środowisko Hadoop.

SparkContext.textFile wewnętrznie wywołuje org.apache.hadoop.mapred.FileInputFormat.getSplits, co z kolei używa, org.apache.hadoop.fs.getDefaultUrijeśli schemat jest nieobecny. Ta metoda odczytuje parametr „fs.defaultFS” konfiguracji Hadoop. Jeśli ustawisz zmienną środowiskową HADOOP_CONF_DIR, parametr jest zwykle ustawiony jako „hdfs: // ...”; w przeciwnym razie „file: //”.


Czy wiesz, jak to zrobić w Javie? Nie widzę metody. Bardzo frustrujące jest to, że nie ma łatwego sposobu na podanie ścieżki do załadowania pliku z prostego systemu plików.
Brad Ellis

odpowiadając sobie. Istnieje przełącznik --file, który jest przekazywany wraz z funkcją przesyłania iskry. Tak więc ścieżka pliku może być zakodowana na stałe lub jakkolwiek twoja konfiguracja jest skonfigurowana dla aplikacji, ale również sygnalizujesz tę ścieżkę. kiedy przesyłasz, aby wykonawcy mogli zobaczyć ścieżkę.
Brad Ellis,

24

odpowiedź gonbe jest doskonała. Ale nadal chcę o tym wspomnieć file:///= ~/../../, nie $SPARK_HOME. Mam nadzieję, że zaoszczędzi to trochę czasu nowicjuszom takim jak ja.


4
file:///jest katalogiem głównym systemu plików widzianym przez wykonującą maszynę JVM, a nie dwa poziomy powyżej folderu domowego. Format identyfikatora URI określony w dokumencie RFC 8089 to file://hostname/absolute/path. W przypadku lokalnym hostnamekomponent (uprawnienia) jest pusty.
Hristo Iliev

18

Chociaż Spark obsługuje ładowanie plików z lokalnego systemu plików, wymaga, aby pliki były dostępne w tej samej ścieżce we wszystkich węzłach w klastrze.

Niektóre sieciowe systemy plików, takie jak NFS, AFS i warstwa NFS MapR, są widoczne dla użytkownika jako zwykły system plików.

Jeśli Twoje dane znajdują się już w jednym z tych systemów, możesz użyć ich jako danych wejściowych, podając po prostu plik: // ścieżka; Spark obsłuży to, o ile system plików jest zamontowany w tej samej ścieżce w każdym węźle. Każdy węzeł musi mieć tę samą ścieżkę

 rdd = sc.textFile("file:///path/to/file")

Jeśli plik nie znajduje się jeszcze we wszystkich węzłach w klastrze, możesz załadować go lokalnie do sterownika bez przechodzenia przez platformę Spark, a następnie wywołać funkcję równoległego rozprowadzania zawartości do pracowników

Uważaj, aby umieścić file: // na początku i użyj „/” lub „\” zgodnie z systemem operacyjnym.


1
Czy istnieje sposób, w jaki Spark automatycznie kopiuje dane z katalogu $ SPARK_HOME do wszystkich węzłów obliczeniowych. A może musisz to zrobić ręcznie?
Matthias

gdzie jest kod źródłowy Spark obsługujący różne formaty systemów plików?
Saher Ahwal

12

Wystarczy, że określisz ścieżkę do pliku jako „plik: /// katalog / plik”

przykład:

val textFile = sc.textFile("file:///usr/local/spark/README.md")

12

Uwaga:

Upewnij się, że uruchamiasz Spark w trybie lokalnym podczas ładowania danych z local ( sc.textFile("file:///path to the file/")), w przeciwnym razie otrzymasz taki błąd Caused by: java.io.FileNotFoundException: File file:/data/sparkjob/config2.properties does not exist. Wykonywacze Becasuse, które działają na różnych pracownikach, nie znajdą tego pliku w swojej ścieżce lokalnej.


11

Jeśli plik znajduje się w głównym węźle Spark (np. W przypadku korzystania z AWS EMR), najpierw uruchom powłokę iskrową w trybie lokalnym.

$ spark-shell --master=local
scala> val df = spark.read.json("file:///usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Alternatywnie możesz najpierw skopiować plik do HDFS z lokalnego systemu plików, a następnie uruchomić Spark w jego domyślnym trybie (np. YARN w przypadku korzystania z AWS EMR), aby bezpośrednio odczytać plik.

$ hdfs dfs -mkdir -p /hdfs/spark/examples
$ hadoop fs -put /usr/lib/spark/examples/src/main/resources/people.json /hdfs/spark/examples
$ hadoop fs -ls /hdfs/spark/examples
Found 1 items
-rw-r--r--   1 hadoop hadoop         73 2017-05-01 00:49 /hdfs/spark/examples/people.json

$ spark-shell
scala> val df = spark.read.json("/hdfs/spark/examples/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

9

Mam plik o nazwie NewsArticle.txt na moim pulpicie.

W Spark wpisałem:

val textFile= sc.textFile(“file:///C:/Users/582767/Desktop/NewsArticle.txt”)

Musiałem zmienić wszystkie znaki \ na / dla ścieżki pliku.

Aby sprawdzić, czy zadziałało, wpisałem:

textFile.foreach(println)

Używam Windows 7 i nie mam zainstalowanego Hadoop.


5

Zostało to omówione na liście mailingowej Spark i prosimy o skierowanie tej wiadomości .

Powinieneś użyć hadoop fs -put <localsrc> ... <dst>kopiowania pliku do hdfs:

${HADOOP_COMMON_HOME}/bin/hadoop fs -put /path/to/README.md README.md

5

Zdarzyło mi się to w przypadku Sparka 2.3 z Hadoopem również zainstalowanym w wspólnym katalogu domowym użytkownika „hadoop”. Ponieważ zarówno Spark, jak i Hadoop zostały zainstalowane w tym samym wspólnym katalogu, Spark domyślnie traktuje schemat jako hdfsi zaczyna szukać plików wejściowych pod hdfs, jak określono fs.defaultFSw Hadoop's core-site.xml. W takich przypadkach musimy wyraźnie określić schemat jako file:///<absoloute path to file>.


0

Oto rozwiązanie tego błędu, który otrzymałem w klastrze Spark, który jest hostowany na platformie Azure w klastrze systemu Windows:

Załaduj surowy plik HVAC.csv, przeanalizuj go przy użyciu funkcji

data = sc.textFile("wasb:///HdiSamples/SensorSampleData/hvac/HVAC.csv")

Używamy (wasb: ///), aby umożliwić usłudze Hadoop dostęp do pliku magazynu Azure blogu, a trzy ukośniki są odniesieniem względnym do folderu kontenera uruchomionego węzła.

Na przykład: Jeśli ścieżka do pliku w Eksploratorze plików na pulpicie nawigacyjnym klastra Spark to:

sflcc1 \ sflccspark1 \ HdiSamples \ SensorSampleData \ hvac

Tak więc opisanie ścieżki jest następujące: sflcc1: to nazwa konta magazynu. sflccspark: to nazwa węzła klastra.

Dlatego odwołujemy się do nazwy bieżącego węzła klastra za pomocą odpowiednich trzech ukośników.

Mam nadzieję że to pomoże.


0

Jeśli próbujesz odczytać plik z formatu HDFS. próbując ustawić ścieżkę w SparkConf

 val conf = new SparkConf().setMaster("local[*]").setAppName("HDFSFileReader")
 conf.set("fs.defaultFS", "hdfs://hostname:9000")

Dodaj wcięcie 4 spacje / tabulatora do swojego kodu, aby został sformatowany jako kod. Z poważaniem
YakovL

0

Nie musisz używać sc.textFile (...), aby konwertować pliki lokalne na ramki danych. Jedną z opcji jest odczytanie lokalnego pliku wiersz po wierszu, a następnie przekształcenie go w zestaw danych Spark. Oto przykład dla komputera z systemem Windows w Javie:

StructType schemata = DataTypes.createStructType(
            new StructField[]{
                    createStructField("COL1", StringType, false),
                    createStructField("COL2", StringType, false),
                    ...
            }
    );

String separator = ";";
String filePath = "C:\\work\\myProj\\myFile.csv";
SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("MyApp").setMaster("local"));
JavaSparkContext jsc = new JavaSparkContext (sparkContext );
SQLContext sqlContext = SQLContext.getOrCreate(sparkContext );

List<String[]> result = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
    String line;
    while ((line = br.readLine()) != null) {
      String[] vals = line.split(separator);
      result.add(vals);
    }
 } catch (Exception ex) {
       System.out.println(ex.getMessage());
       throw new RuntimeException(ex);
  }
  JavaRDD<String[]> jRdd = jsc.parallelize(result);
  JavaRDD<Row> jRowRdd = jRdd .map(RowFactory::create);
  Dataset<Row> data = sqlContext.createDataFrame(jRowRdd, schemata);

Teraz możesz używać dataframe dataw swoim kodzie.


0

Wypróbowałem następujące i zadziałało z mojego lokalnego systemu plików .. Zasadniczo Spark może czytać ze ścieżki lokalnej, HDFS i AWS S3

listrdd=sc.textFile("file:////home/cloudera/Downloads/master-data/retail_db/products")

-6

próbować

val f = sc.textFile("./README.md")

scala> val f = sc.textFile("./README.md") 14/12/04 12:54:33 INFO storage.MemoryStore: ensureFreeSpace(81443) called with curMem=164073, maxMem=278302556 14/12/04 12:54:33 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 79.5 KB, free 265.2 MB) f: org.apache.spark.rdd.RDD[String] = ./README.md MappedRDD[5] at textFile at <console>:12 scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md at
Jas

Czy potrafisz zrobić pwdna muszli bashbash-4.1#
Soumya Simanta

bash-4.1 # pwd /usr/local/spark-1.1.0-bin-hadoop2.4
Jas

To działa dla mnie na iskrę bez hadoop / hdfs. Jednak wydaje się, że nie działa dla OP, ponieważ dał im zrzut błędów.
Paul,
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.