Napisz pojedynczy plik CSV za pomocą spark-csv


Odpowiedzi:


171

Tworzy folder z wieloma plikami, ponieważ każda partycja jest zapisywana indywidualnie. Jeśli potrzebujesz pojedynczego pliku wyjściowego (nadal w folderze), możesz repartition(preferowane, jeśli dane wyjściowe są duże, ale wymagają przetasowania):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

lub coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

ramka danych przed zapisaniem:

Wszystkie dane zostaną zapisane mydata.csv/part-00000. Zanim skorzystasz z tej opcji, upewnij się, że rozumiesz, co się dzieje i jaki jest koszt przesłania wszystkich danych do jednego pracownika . Jeśli używasz rozproszonego systemu plików z replikacją, dane będą przesyłane wiele razy - najpierw pobierane do jednego pracownika, a następnie rozprowadzane w węzłach magazynowania.

Alternatywnie można zostawić swój kod, jak to jest i używać narzędzi ogólnego przeznaczenia jak cati HDFSgetmerge po prostu połączyć wszystkie części później.


6
możesz też użyć coalesce: df.coalesce (1) .write.format ("com.databricks.spark.csv") .option ("header", "true") .save ("mydata.csv")
ravi

Spark 1.6 zgłasza błąd, gdy ustawimy .coalesce(1), że jakiś wyjątek FileNotFoundException w katalogu _temporary. Wciąż jest to błąd w Spark: Issues.apache.org/jira/browse/SPARK-2984
Harsha

@Harsha Unlikely. Raczej prosty wynik, coalesce(1)ponieważ jest bardzo drogi i zwykle niepraktyczny.
zero323

Uzgodniono @ zero323, ale jeśli masz specjalne wymagania dotyczące konsolidacji w jeden plik, powinno to być nadal możliwe, biorąc pod uwagę, że masz wystarczające zasoby i czas.
Harsha

2
@Harsha Nie mówię, że nie ma. Jeśli poprawnie dostroisz GC, powinno działać dobrze, ale jest to po prostu strata czasu i najprawdopodobniej zaszkodzi ogólnej wydajności. Osobiście nie widzę powodu, by się tym przejmować, zwłaszcza, że ​​łączenie plików poza Spark jest banalnie proste bez martwienia się o zużycie pamięci.
zero323

36

Jeśli używasz Sparka z HDFS, rozwiązałem problem, pisząc pliki csv normalnie i wykorzystując HDFS do scalania. Robię to bezpośrednio w Spark (1.6):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Nie pamiętam, gdzie nauczyłem się tej sztuczki, ale może ci się to udać.


Nie próbowałem tego - i podejrzewam, że może to nie być proste.
Minkymorgan

1
Dzięki. Dodałem odpowiedź, która działa w Databricks
Josiah Yoder

@Minkymorgan Mam podobny problem, ale nie mogę tego zrobić poprawnie .. Czy możesz spojrzeć na to pytanie stackoverflow.com/questions/46812388/ ...
SUDARSHAN

4
@SUDARSHAN Powyższa funkcja działa z nieskompresowanymi danymi. W twoim przykładzie myślę, że używasz kompresji gzip podczas zapisywania plików - a potem później - próbując je połączyć, co kończy się niepowodzeniem. To nie zadziała, ponieważ nie można scalać plików gzip. Gzip nie jest algorytmem dzielonej kompresji, więc z pewnością nie jest „scalalny”. Możesz przetestować kompresję "zgryźliwą" lub "bz2" - ale wydaje się, że to też się nie powiedzie przy scalaniu. Prawdopodobnie najlepiej jest usunąć kompresję, scalić pliki raw, a następnie skompresować za pomocą dzielonego kodeka.
Minkymorgan

a co jeśli chcę zachować nagłówek? duplikuje się dla każdej części pliku
normalny

32

Mogę się trochę spóźnić do gry tutaj, ale używając coalesce(1)lub repartition(1)może działać dla małych zestawów danych, ale duże zestawy danych byłyby wrzucane do jednej partycji w jednym węźle. Może to spowodować błędy OOM lub w najlepszym przypadku powolne przetwarzanie.

Zdecydowanie sugerowałbym użycie FileUtil.copyMerge()funkcji z interfejsu API Hadoop. Spowoduje to scalenie wyników w jeden plik.

EDYCJA - efektywnie przenosi dane do sterownika, a nie do węzła wykonawczego. Coalesce()byłoby dobrze, gdyby pojedynczy wykonawca miał więcej pamięci RAM do wykorzystania niż sterownik.

EDYCJA 2 : copyMerge()jest usuwana w Hadoop 3.0. Zobacz następujący artykuł o przepełnieniu stosu, aby uzyskać więcej informacji na temat pracy z najnowszą wersją: Jak wykonać CopyMerge w Hadoop 3.0?


Jakieś przemyślenia na temat tego, jak w ten sposób uzyskać plik CSV z wierszem nagłówka? Nie chciałbym, aby plik tworzył nagłówek, ponieważ spowodowałoby to przeplatanie się nagłówków w całym pliku, po jednym dla każdej partycji.
nojo

Jest opcja, że użyłem w przeszłości udokumentowane: markhneedham.com/blog/2014/11/30/...
etspaceman

@etspaceman Cool. Nadal nie mam dobrego sposobu, aby to zrobić, niestety, ponieważ muszę to zrobić w Javie (lub Spark, ale w sposób, który nie zużywa dużo pamięci i może pracować z dużymi plikami) . Nadal nie mogę uwierzyć, że usunęli to wywołanie API ... jest to bardzo powszechne użycie, nawet jeśli nie jest dokładnie używane przez inne aplikacje w ekosystemie Hadoop.
woot

20

Jeśli korzystasz z Databricks i możesz zmieścić wszystkie dane w pamięci RAM jednego pracownika (a tym samym używać .coalesce(1)), możesz użyć dbfs, aby znaleźć i przenieść wynikowy plik CSV:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Jeśli twój plik nie mieści się w pamięci RAM w pliku roboczym , możesz rozważyć sugestię chaotic3quilibrium, aby użyć FileUtils.copyMerge () . Nie zrobiłem tego i jeszcze nie wiem, czy jest to możliwe, czy nie, np. Na S3.

Ta odpowiedź jest oparta na poprzednich odpowiedziach na to pytanie, a także na moich własnych testach dostarczonego fragmentu kodu. Pierwotnie wysłałem go do Databricks i ponownie publikuję tutaj.

Najlepsza dokumentacja dotycząca opcji rekurencyjnej rm dbfs, jaką znalazłem, znajduje się na forum Databricks .


3

Rozwiązanie, które działa dla S3 zmodyfikowanego przez Minkymorgan.

Po prostu podaj tymczasową ścieżkę katalogu z partycjami (z inną nazwą niż ścieżka końcowa) jako srcPathostateczny plik csv / txt jako destPath Określ również, deleteSourcejeśli chcesz usunąć oryginalny katalog.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}

Implementacja copyMerge wyświetla listę wszystkich plików i wykonuje po nich iteracje, nie jest to bezpieczne w s3. jeśli napiszesz swoje pliki, a następnie je wymienisz - nie gwarantuje to, że wszystkie z nich zostaną wymienione. zobacz [to | docs.aws.amazon.com/AmazonS3/latest/dev/…
LiranBo

3

df.write()Interfejs API iskry utworzy wiele plików części w podanej ścieżce ... aby wymusić zapisanie przez iskrę tylko jednego pliku części df.coalesce(1).write.csv(...)zamiast df.repartition(1).write.csv(...)łączenia jest wąską transformacją, podczas gdy repartition to szeroka transformacja, patrz Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True) 

utworzy folder w podanej ścieżce do jednego part-0001-...-c000.csvpliku

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

mieć przyjazną dla użytkownika nazwę pliku


alternatywnie, jeśli ramka danych nie jest zbyt duża (~ GB lub mieści się w pamięci sterownika), możesz również użyć df.toPandas().to_csv(path)tego, aby zapisać pojedynczy plik csv z preferowaną nazwą pliku
pprasad009

2
Ugh, tak frustrujące, że można to zrobić tylko poprzez zamianę w pandy. Jak trudno jest po prostu napisać plik bez jakiegoś UUID?
ijoseph

2

repartycjonowanie / łączenie na 1 partycję przed zapisaniem (nadal można uzyskać folder, ale miałby w nim jeden plik części)


2

możesz użyć rdd.coalesce(1, true).saveAsTextFile(path)

będzie przechowywać dane jako pojedynczy plik w path / part-00000


1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

Rozwiązałem używając poniższego podejścia (zmiana nazwy pliku hdfs): -

Krok 1: - (Crate Data Frame i zapis na HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Krok 2: - (Utwórz konfigurację Hadoop)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Krok 3: - (Uzyskaj ścieżkę w ścieżce folderu hdfs)

val pathFiles = new Path("/hdfsfolder/blah/")

Step4: - (Pobierz nazwy plików iskier z folderu hdfs)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (utwórz listę mutowalną scala, aby zapisać wszystkie nazwy plików i dodać je do listy)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Krok 6: - (filtruj kolejność plików _SUCESS z listy nazw plików)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

krok 7: - (przekonwertuj listę scala na ciąg i dodaj żądaną nazwę pliku do ciągu folderu hdfs, a następnie zastosuj zmianę nazwy)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)

1

Używam tego w Pythonie, aby uzyskać pojedynczy plik:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)

1

Ta odpowiedź rozszerza zaakceptowaną odpowiedź, daje więcej kontekstu i zawiera fragmenty kodu, które można uruchomić w powłoce Spark na komputerze.

Więcej kontekstu na temat zaakceptowanej odpowiedzi

Zaakceptowana odpowiedź może sprawiać wrażenie, że przykładowy kod wysyła pojedynczy mydata.csvplik, a tak nie jest. Pokażmy:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Oto, co zostanie wyświetlone:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

NB mydata.csvto folder w zaakceptowanej odpowiedzi - to nie jest plik!

Jak wyprowadzić pojedynczy plik o określonej nazwie

Możemy użyć spark-daria do wypisania pojedynczego mydata.csvpliku.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Spowoduje to wyświetlenie pliku w następujący sposób:

Documents/
  better/
    mydata.csv

Ścieżki S3

DariaWriters.writeSingleFileAby użyć tej metody w S3, musisz przekazać ścieżki s3a :

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Więcej informacji znajdziesz tutaj .

Unikanie copyMerge

copyMerge zostało usunięte z Hadoop 3. DariaWriters.writeSingleFileImplementacja używa fs.rename, jak opisano tutaj . Spark 3 nadal korzystał z Hadoop 2 , więc implementacje copyMerge będą działać w 2020 roku. Nie jestem pewien, kiedy Spark dokona aktualizacji do Hadoop 3, ale lepiej unikać podejścia copyMerge, które spowoduje uszkodzenie kodu, gdy Spark uaktualni Hadoop.

Kod źródłowy

Poszukaj DariaWritersobiektu w kodzie źródłowym spark-daria, jeśli chcesz sprawdzić implementację.

Wdrożenie PySpark

Zapisanie pojedynczego pliku za pomocą PySpark jest łatwiejsze, ponieważ można przekonwertować DataFrame na Pandas DataFrame, która jest domyślnie zapisywana jako pojedynczy plik.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Ograniczenia

DariaWriters.writeSingleFilePodejście Scala i df.toPandas()Python zbliżyć tylko pracę dla małych zbiorów danych. Ogromnych zbiorów danych nie można zapisać jako pojedynczych plików. Zapisywanie danych jako pojedynczego pliku nie jest optymalne z punktu widzenia wydajności, ponieważ danych nie można zapisywać równolegle.


0

wykorzystując Listbuffer możemy zapisać dane do jednego pliku:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()

-2

Jest jeszcze jeden sposób korzystania z Javy

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}

nazwa „prawda” nie jest zdefiniowana
Arron
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.