Jak nadpisać katalog wyjściowy w Spark


108

Mam aplikację do przesyłania strumieniowego iskier, która tworzy zestaw danych dla każdej minuty. Potrzebuję zapisać / nadpisać wyniki przetwarzanych danych.

Kiedy próbowałem nadpisać zbiór danych org.apache.hadoop.mapred.FileAlreadyExistsException zatrzymuje wykonanie.

Ustawiłem właściwość Spark set("spark.files.overwrite","true"), ale nie mam szczęścia.

Jak nadpisać lub wstępnie usunąć pliki ze Spark?


1
Tak, to jest do bani, prawda, uważam to za regresję do 0.9.0. Proszę przyjąć moją odpowiedź :)
samthebest

set("spark.files.overwrite","true")działa tylko dla plików dodanych przeztspark.addFile()
aiman

Odpowiedzi:


107

UPDATE: Zaproponuj użycie Dataframesplus coś takiego jak ... .write.mode(SaveMode.Overwrite) ....

Poręczny alfons:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

W przypadku starszych wersji spróbuj

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

W wersji 1.1.0 możesz ustawić ustawienia conf za pomocą skryptu spark-submit z flagą --conf.

OSTRZEŻENIE (starsze wersje): Według @piggybox w Sparku jest błąd polegający na tym, że nadpisuje tylko pliki, które musi zapisać part-, wszystkie inne pliki pozostaną nieusunięte.


30
Za Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham

W przypadku Spark SQL masz opcje definiowania SaveMode dla Core Spark, których nie masz. Naprawdę chciałbym mieć taką funkcję do saveAsTextFile i innych przekształceń
Murtaza Kanchwala

3
Ukryty problem: w porównaniu do rozwiązania @ pzecevic polegającego na wymazaniu całego folderu przez HDFS, w tym podejściu Spark nadpisze tylko pliki części o tej samej nazwie w folderze wyjściowym. Działa to przez większość czasu, ale jeśli w folderze znajduje się coś innego, na przykład dodatkowe pliki części z innego zadania Spark / Hadoop, nie spowoduje to zastąpienia tych plików.
piggybox

6
Możesz także użyć df.write.mode(mode: String).parquet(path)Where mode: String może być: "overwrite", "append", "ignore", "error".
żyto

1
@avocado Tak myślę, interfejsy Spark API stają się coraz gorsze z każdym wydaniem: P
samthebest


27

Dokumentacja parametru spark.files.overwritemówi tak: „Czy nadpisywać pliki dodane za pośrednictwem, SparkContext.addFile()gdy plik docelowy istnieje, a jego zawartość nie jest zgodna z zawartością źródła”. Nie ma więc żadnego wpływu na metodę saveAsTextFiles.

Możesz to zrobić przed zapisaniem pliku:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Jak wyjaśniono tutaj: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html


29
a co z pysparkiem?
javadba

Następną odpowiedzią na użycie opcji „write.mode (SaveMode.Overwrite)” jest droga do zrobienia
YaOg

hdfs może usuwać nowe pliki w miarę ich pojawiania się, ponieważ nadal usuwa stare.
Jake

25

Z dokumentacji pyspark.sql.DataFrame.save (obecnie 1.3.1) można określić mode='overwrite'podczas zapisywania DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

Sprawdziłem, że spowoduje to nawet usunięcie pozostałych plików partycji. Więc jeśli pierwotnie powiedziałeś 10 partycji / plików, ale potem nadpisałeś folder ramką DataFrame, która miała tylko 6 partycji, wynikowy folder będzie miał 6 partycji / plików.

Zobacz dokumentację Spark SQL, aby uzyskać więcej informacji na temat opcji trybu.


2
Prawdziwe i pomocne, dzięki, ale rozwiązanie specyficzne dla DataFrame - spark.hadoop.validateOutputSpecsbędzie działać we wszystkich interfejsach API Spark.
samthebest

Z jakiegoś powodu spark.hadoop.validateOutputSpecsnie działało dla mnie na 1.3, ale tak.
Eric Walker

1
@samthebest Dzięki save(... , mode=trasie możesz nadpisać jeden zestaw plików, dołączyć inny itp. w tym samym kontekście Spark. Czy nie spark.hadoop.validateOutputSpecsograniczyłbyś się tylko do jednego trybu na kontekst?
dnlbrky

1
@dnlbrky OP nie zażądał dołączenia. Jak powiedziałem, prawda, przydatna, ale niepotrzebna. Gdyby PO zapytał „jak mam dołączyć”, można by udzielić szeregu odpowiedzi. Ale nie wchodźmy w to. Radzę również rozważyć użycie wersji Scala DataFrames, ponieważ ma ona bezpieczeństwo typów i więcej sprawdzania - na przykład jeśli miałeś literówkę w „nadpisaniu”, nie dowiedziałbyś się, dopóki DAG nie zostanie oceniony - co w zadaniu Big Data mogłoby być 2 godziny później !! Jeśli używasz wersji Scala, kompilator sprawdzi wszystko z góry! Całkiem fajne i bardzo ważne dla Big Data.
samthebest

15

df.write.mode('overwrite').parquet("/output/folder/path")działa, jeśli chcesz nadpisać plik parkietu za pomocą Pythona. To jest iskra 1.6.2. API może się różnić w późniejszych wersjach


Tak, to działa świetnie dla moich wymagań (Databricks)
Nick.McDermaid

4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)

Tylko dla Spark 1, w najnowszej wersjidf.write.mode(SaveMode.Overwrite)
ChikuMiku

3

Ta przeciążona wersja funkcji zapisywania działa dla mnie:

yourDF.save (outputPath, org.apache.spark.sql.SaveMode.valueOf ("Overwrite"))

Powyższy przykład nadpisze istniejący folder. Savemode może również przyjąć te parametry ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Dołącz : tryb dołączania oznacza, że ​​podczas zapisywania ramki DataFrame do źródła danych, jeśli dane / tabela już istnieją, zawartość ramki DataFrame powinna zostać dołączona do istniejących danych.

ErrorIfExists : tryb ErrorIfExists oznacza, że ​​podczas zapisywania DataFrame w źródle danych, jeśli dane już istnieją, oczekuje się zgłoszenia wyjątku.

Ignoruj : tryb Ignoruj ​​oznacza, że ​​podczas zapisywania DataFrame w źródle danych, jeśli dane już istnieją, operacja składowania nie zapisuje zawartości DataFrame i nie zmienia istniejących danych.


1

Jeśli chcesz użyć własnego niestandardowego formatu wyjściowego, będziesz w stanie uzyskać pożądane zachowanie również z RDD.

Przyjrzyj się następującym klasom: FileOutputFormat , FileOutputCommitter

W formacie pliku wyjściowego masz metodę o nazwie checkOutputSpecs, która sprawdza, czy katalog wyjściowy istnieje. W FileOutputCommitter masz commitJob, który zwykle przesyła dane z katalogu tymczasowego do ostatecznego miejsca.

Nie byłem jeszcze w stanie tego zweryfikować (zrobiłbym to, gdy tylko mam kilka wolnych minut), ale teoretycznie: Jeśli rozszerzę FileOutputFormat i zastąpię checkOutputSpecs na metodę, która nie zgłasza wyjątku w katalogu, który już istnieje i dostosuję commitJob mojego niestandardowego committera wyjściowego, aby wykonać dowolną logikę, którą chcę (np. zastąpić niektóre pliki, dołączyć inne), niż mogę osiągnąć pożądane zachowanie również z RDD.

Format wyjściowy jest przekazywany do: saveAsNewAPIHadoopFile (jest to metoda wywołana również przez saveAsTextFile, aby faktycznie zapisać pliki). Committer wyjścia jest konfigurowany na poziomie aplikacji.


Unikałbym zbliżania się do podklasy FileOutputCommitter, jeśli możesz temu pomóc: to przerażający fragment kodu. Hadoop 3.0 dodaje punkt wtyczki, w którym FileOutputFormat może przyjmować różne implementacje refaktoryzowanej superklasy (PathOutputCommitter). S3 z Netflix będzie zapisywać w miejscu w drzewie podzielonym na partycje,
rozwiązując
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.