Jak odczytać wiele plików tekstowych w jednym RDD?


179

Chcę odczytać kilka plików tekstowych z lokalizacji hdfs i wykonać mapowanie w iteracji za pomocą Spark.

JavaRDD<String> records = ctx.textFile(args[1], 1); jest w stanie odczytać tylko jeden plik na raz.

Chcę odczytać więcej niż jeden plik i przetworzyć je jako pojedynczy RDD. W jaki sposób?

Odpowiedzi:


298

Możesz określić całe katalogi, używać symboli wieloznacznych, a nawet CSV katalogów i symboli wieloznacznych. Na przykład:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

Jak zauważa Nick Chammas, jest to ujawnienie Hadoopa FileInputFormati dlatego działa również z Hadoop (i Scalding).


10
Tak, jest to najwygodniejszy sposób otwierania wielu plików jako pojedynczego RDD. Interfejs API tutaj jest tylko ujawnieniem FileInputFormat API Hadoop , więc obowiązują wszystkie te same Pathopcje.
Nick Chammas

7
sc.wholeTextFilesprzydaje się w przypadku danych, które nie są rozdzielane
liniami

1
Dziwne jest jednak to, że jeśli to zrobisz i określisz paralelizm, powiedzmy, sc.textFile(multipleCommaSeparatedDirs,320)że prowadzi on do 19430całkowitej liczby zadań zamiast 320... zachowuje się tak, unionco prowadzi również do szalonej liczby zadań z bardzo niskiej równoległości
lisak

2
W końcu odkryłem, jak działa to dopasowanie wzorca plików złych stackoverflow.com/a/33917492/306488, więc nie muszę już ograniczać przecinkiem
lisak

@femibyte Nie sądzę, ale nie wiem, dlaczego chcesz znać nazwę pliku w każdej innej sytuacji niż dla wholeTextFiles. Jaki jest twój przypadek użycia? Mogę wymyślić obejście, pod warunkiem że używasz tej samej liczby partycji co pliki ...
samthebest,

35

Użyj unionw następujący sposób:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

Następnie bigRddjest RDD ze wszystkimi plikami.


Dziękuję chmurze, w ten sposób mogę odczytać wszystkie pliki, które chcę, ale jeden! Ale wciąż muszę pisać wiele rzeczy ...
gsamaras

30

Możesz użyć pojedynczego wywołania textFile, aby odczytać wiele plików. Scala:

sc.textFile(','.join(files)) 

5
i identyczna składnia python
patricksurry

8
Myślę, że to tylko składnia python. Odpowiednikiem Scali byłbysc.textFile(files.mkString(","))
Davos,

9

Możesz tego użyć

Najpierw możesz zdobyć bufor / listę ścieżek S3:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Teraz przekaż ten obiekt List do następującego fragmentu kodu, uwaga: sc jest obiektem SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Teraz masz ostateczną Unified RDD, tj. Df

Opcjonalnie, a także można podzielić na partycje w jednym BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Ponowne partycjonowanie zawsze działa: D


Czy to nie znaczy, że lista plików musi być stosunkowo mała? Nie miliony plików.
Mathieu Longtin

2
Czy możemy zrównoleglić operację odczytu wymienionych plików? coś w rodzaju sc .parallelize?
lazywiz

1
@MathieuLongtin: Jeśli możesz zastosować wykrywanie partycji w kodzie Spark, świetnie będzie, jeśli będziesz musiał zrobić to samo. Kiedyś otwierałem pliki 10k w około minutę.
Murtaza Kanchwala,

@lazywiz Jeśli nie chcesz tworzyć jednego dysku, po prostu usuń akcję podziału.
Murtaza Kanchwala,

3

W PySpark znalazłem dodatkowy użyteczny sposób na parsowanie plików. Być może w Scali jest odpowiednik, ale nie czuję się wystarczająco dobrze wymyślając tłumaczenie robocze. W rzeczywistości jest to wywołanie textFile z dodanymi etykietami (w poniższym przykładzie klucz = nazwa pliku, wartość = 1 linia z pliku).

„Oznaczony” plik tekstowy

Wejście:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

wyjście: tablica z każdym wpisem zawierającym krotkę przy użyciu nazwa-pliku-klucza i wartość = każdy wiersz pliku. (Technicznie, korzystając z tej metody, możesz również użyć innego klucza oprócz nazwy ścieżki do pliku - być może reprezentacji mieszającej, aby zaoszczędzić na pamięci). to znaczy.

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
  ...]

Możesz także połączyć ponownie jako listę linii:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

Lub połącz ponownie całe pliki z powrotem w pojedyncze ciągi znaków (w tym przykładzie wynik jest taki sam, jak wynik z całych plików tekstowych, ale z ciągiem „file:” usuniętym ze ścieżki pliku).

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()


Kiedy uruchomiłem ten wiersz kodu - Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) dostałem błąd tj TypeError: 'PipelinedRDD' object is not iterable. Rozumiem, że ta linia tworzy RDD, który jest niezmienny, więc zastanawiałem się, jak udało ci się dołączyć go do innej zmiennej?
KartikKannapur

3

możesz użyć

JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

tutaj otrzymasz ścieżkę do pliku i zawartość tego pliku. dzięki czemu można wykonać dowolną akcję całego pliku w tym samym czasie, co oszczędza koszty ogólne


2

Wszystkie odpowiedzi są poprawne z sc.textFile

Zastanawiałem się, dlaczego nie wholeTextFilesNa przykład w tym przypadku ...

val minPartitions = 2
val path = "/pathtohdfs"
    sc.wholeTextFiles(path,minPartitions)
      .flatMap{case (path, text) 
    ...

jednym ograniczeniem jest to, że musimy ładować małe pliki, w przeciwnym razie wydajność będzie niska i może prowadzić do OOM.

Uwaga :

  • Pełny plik powinien pasować do pamięci
  • Dobre dla formatów plików, których NIE można podzielić według linii ... takich jak pliki XML

Dalsze informacje do odwiedzenia


lub po prostusc.wholeTextFiles(folder).flatMap...
Evhz

sc.wholeTextFiles („/ path / to / dir”)
Ram Ghadiyaram

1

Dostępne jest proste, czyste rozwiązanie. Użyj metody wholeTextFiles (). Spowoduje to pobranie katalogu i utworzenie pary klucz-wartość. Zwrócony RDD będzie parą RDD. Znajdź poniżej opis z dokumentacji Spark :

SparkContext.wholeTextFiles pozwala czytać katalog zawierający wiele małych plików tekstowych i zwraca każdy z nich jako pary (nazwa pliku, treść). Jest to w przeciwieństwie do textFile, który zwraca jeden rekord na linię w każdym pliku


-1

WYPRÓBUJ TEN interfejs używany do zapisywania DataFrame w zewnętrznych systemach pamięci masowej (np. Systemach plików, sklepach z kluczowymi wartościami itp.). Użyj DataFrame.write (), aby uzyskać do tego dostęp.

Nowości w wersji 1.4.

csv (ścieżka, tryb = Brak, kompresja = Brak, sep = Brak, cytat = Brak, escape = Brak, nagłówek = Brak, nullValue = Brak, escapeQuotes = Brak, quoteAll = Brak, dateFormat = Brak, timestampFormat = Brak) Zapisuje zawartość DataFrame w formacie CSV pod określoną ścieżką.

Parametry: ścieżka - ścieżka w dowolnym trybie systemu plików obsługiwanym przez Hadoop - określa zachowanie operacji składowania, gdy dane już istnieją.

append: Dołącz zawartość DataFrame do istniejących danych. nadpisz: nadpisz istniejące dane. ignore: cicho zignoruj ​​tę operację, jeśli dane już istnieją. błąd (przypadek domyślny): Zgłaszaj wyjątek, jeśli dane już istnieją. kompresja - kodek kompresji używany podczas zapisywania do pliku. Może to być jedna ze znanych krótkich nazw bez rozróżniania wielkości liter (none, bzip2, gzip, lz4, snappy i deflate). sep - ustawia pojedynczy znak jako separator dla każdego pola i wartości. Jeśli ustawiono Brak, używa wartości domyślnej,,. quote - ustawia pojedynczy znak używany do zmiany wartości cytowanych, w których separator może być częścią wartości. Jeśli ustawiony jest None, używa wartości domyślnej, „. Jeśli chcesz wyłączyć cytaty, musisz ustawić pusty ciąg. Escape - ustawia pojedynczy znak używany do zmiany znaczenia cytatów wewnątrz już cytowanej wartości. Jeśli None jest ustawiony , używa wartości domyślnej, \ escapeQuotes - Flaga wskazująca, czy wartości zawierające cudzysłowy zawsze powinny być ujęte w cudzysłowy. Jeśli ustawiono Brak, używa domyślnej wartości true, unikając wszystkich wartości zawierających znak cudzysłowu. quoteAll - Flaga wskazująca, czy wszystkie wartości powinny być zawsze ujęte w cudzysłów. Jeśli ustawiono Brak, używa wartości domyślnej false, a jedynie wartości specjalne zawierające znak cudzysłowu. nagłówek - zapisuje nazwy kolumn jako pierwszy wiersz. Jeśli ustawiono None, używa wartości domyślnej, false. nullValue - ustawia ciąg reprezentujący wartość zerową. Jeśli ustawiono Brak, używa wartości domyślnej, pusty ciąg. dateFormat - ustawia ciąg wskazujący format daty. Niestandardowe formaty dat są zgodne z formatami java.text.SimpleDateFormat. Dotyczy to typu daty. Jeśli ustawiono Brak, używa wartości domyślnej rrrr-MM-dd. timestampFormat - ustawia ciąg znaków wskazujący format znacznika czasu. Niestandardowe formaty dat są zgodne z formatami java.text.SimpleDateFormat. Dotyczy to typu znacznika czasu. Jeśli ustawiony jest None, używa wartości domyślnej rrrr-MM-dd'T'HH: mm: ss.SSSZZ.


Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.