Używam https://github.com/databricks/spark-csv , próbuję napisać pojedynczy plik CSV, ale nie mogę, tworzy folder.
Potrzebujesz funkcji Scala, która pobierze parametry takie jak ścieżka i nazwa pliku i zapisze ten plik CSV.
Używam https://github.com/databricks/spark-csv , próbuję napisać pojedynczy plik CSV, ale nie mogę, tworzy folder.
Potrzebujesz funkcji Scala, która pobierze parametry takie jak ścieżka i nazwa pliku i zapisze ten plik CSV.
Odpowiedzi:
Tworzy folder z wieloma plikami, ponieważ każda partycja jest zapisywana indywidualnie. Jeśli potrzebujesz pojedynczego pliku wyjściowego (nadal w folderze), możesz repartition
(preferowane, jeśli dane wyjściowe są duże, ale wymagają przetasowania):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
lub coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
ramka danych przed zapisaniem:
Wszystkie dane zostaną zapisane mydata.csv/part-00000
. Zanim skorzystasz z tej opcji, upewnij się, że rozumiesz, co się dzieje i jaki jest koszt przesłania wszystkich danych do jednego pracownika . Jeśli używasz rozproszonego systemu plików z replikacją, dane będą przesyłane wiele razy - najpierw pobierane do jednego pracownika, a następnie rozprowadzane w węzłach magazynowania.
Alternatywnie można zostawić swój kod, jak to jest i używać narzędzi ogólnego przeznaczenia jak cat
i HDFSgetmerge
po prostu połączyć wszystkie części później.
.coalesce(1)
, że jakiś wyjątek FileNotFoundException w katalogu _temporary. Wciąż jest to błąd w Spark: Issues.apache.org/jira/browse/SPARK-2984
coalesce(1)
ponieważ jest bardzo drogi i zwykle niepraktyczny.
Jeśli używasz Sparka z HDFS, rozwiązałem problem, pisząc pliki csv normalnie i wykorzystując HDFS do scalania. Robię to bezpośrednio w Spark (1.6):
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()
Nie pamiętam, gdzie nauczyłem się tej sztuczki, ale może ci się to udać.
Mogę się trochę spóźnić do gry tutaj, ale używając coalesce(1)
lub repartition(1)
może działać dla małych zestawów danych, ale duże zestawy danych byłyby wrzucane do jednej partycji w jednym węźle. Może to spowodować błędy OOM lub w najlepszym przypadku powolne przetwarzanie.
Zdecydowanie sugerowałbym użycie FileUtil.copyMerge()
funkcji z interfejsu API Hadoop. Spowoduje to scalenie wyników w jeden plik.
EDYCJA - efektywnie przenosi dane do sterownika, a nie do węzła wykonawczego. Coalesce()
byłoby dobrze, gdyby pojedynczy wykonawca miał więcej pamięci RAM do wykorzystania niż sterownik.
EDYCJA 2 : copyMerge()
jest usuwana w Hadoop 3.0. Zobacz następujący artykuł o przepełnieniu stosu, aby uzyskać więcej informacji na temat pracy z najnowszą wersją: Jak wykonać CopyMerge w Hadoop 3.0?
Jeśli korzystasz z Databricks i możesz zmieścić wszystkie dane w pamięci RAM jednego pracownika (a tym samym używać .coalesce(1)
), możesz użyć dbfs, aby znaleźć i przenieść wynikowy plik CSV:
val fileprefix= "/mnt/aws/path/file-prefix"
dataset
.coalesce(1)
.write
//.mode("overwrite") // I usually don't use this, but you may want to.
.option("header", "true")
.option("delimiter","\t")
.csv(fileprefix+".tmp")
val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
.filter(file=>file.name.endsWith(".csv"))(0).path
dbutils.fs.cp(partition_path,fileprefix+".tab")
dbutils.fs.rm(fileprefix+".tmp",recurse=true)
Jeśli twój plik nie mieści się w pamięci RAM w pliku roboczym , możesz rozważyć sugestię chaotic3quilibrium, aby użyć FileUtils.copyMerge () . Nie zrobiłem tego i jeszcze nie wiem, czy jest to możliwe, czy nie, np. Na S3.
Ta odpowiedź jest oparta na poprzednich odpowiedziach na to pytanie, a także na moich własnych testach dostarczonego fragmentu kodu. Pierwotnie wysłałem go do Databricks i ponownie publikuję tutaj.
Najlepsza dokumentacja dotycząca opcji rekurencyjnej rm dbfs, jaką znalazłem, znajduje się na forum Databricks .
Rozwiązanie, które działa dla S3 zmodyfikowanego przez Minkymorgan.
Po prostu podaj tymczasową ścieżkę katalogu z partycjami (z inną nazwą niż ścieżka końcowa) jako srcPath
ostateczny plik csv / txt jako destPath
Określ również, deleteSource
jeśli chcesz usunąć oryginalny katalog.
/**
* Merges multiple partitions of spark text file output into single file.
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit = {
import org.apache.hadoop.fs.FileUtil
import java.net.URI
val config = spark.sparkContext.hadoopConfiguration
val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
FileUtil.copyMerge(
fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
)
}
df.write()
Interfejs API iskry utworzy wiele plików części w podanej ścieżce ... aby wymusić zapisanie przez iskrę tylko jednego pliku części df.coalesce(1).write.csv(...)
zamiast df.repartition(1).write.csv(...)
łączenia jest wąską transformacją, podczas gdy repartition to szeroka transformacja, patrz Spark - repartition () vs coalesce ()
df.coalesce(1).write.csv(filepath,header=True)
utworzy folder w podanej ścieżce do jednego part-0001-...-c000.csv
pliku
cat filepath/part-0001-...-c000.csv > filename_you_want.csv
mieć przyjazną dla użytkownika nazwę pliku
df.toPandas().to_csv(path)
tego, aby zapisać pojedynczy plik csv z preferowaną nazwą pliku
repartycjonowanie / łączenie na 1 partycję przed zapisaniem (nadal można uzyskać folder, ale miałby w nim jeden plik części)
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._
Rozwiązałem używając poniższego podejścia (zmiana nazwy pliku hdfs): -
Krok 1: - (Crate Data Frame i zapis na HDFS)
df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")
Krok 2: - (Utwórz konfigurację Hadoop)
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
Krok 3: - (Uzyskaj ścieżkę w ścieżce folderu hdfs)
val pathFiles = new Path("/hdfsfolder/blah/")
Step4: - (Pobierz nazwy plików iskier z folderu hdfs)
val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)
setp5: - (utwórz listę mutowalną scala, aby zapisać wszystkie nazwy plików i dodać je do listy)
var fileNamesList = scala.collection.mutable.MutableList[String]()
while (fileNames.hasNext) {
fileNamesList += fileNames.next().getPath.getName
}
println(fileNamesList)
Krok 6: - (filtruj kolejność plików _SUCESS z listy nazw plików)
// get files name which are not _SUCCESS
val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")
krok 7: - (przekonwertuj listę scala na ciąg i dodaj żądaną nazwę pliku do ciągu folderu hdfs, a następnie zastosuj zmianę nazwy)
val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
Używam tego w Pythonie, aby uzyskać pojedynczy plik:
df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Ta odpowiedź rozszerza zaakceptowaną odpowiedź, daje więcej kontekstu i zawiera fragmenty kodu, które można uruchomić w powłoce Spark na komputerze.
Więcej kontekstu na temat zaakceptowanej odpowiedzi
Zaakceptowana odpowiedź może sprawiać wrażenie, że przykładowy kod wysyła pojedynczy mydata.csv
plik, a tak nie jest. Pokażmy:
val df = Seq("one", "two", "three").toDF("num")
df
.repartition(1)
.write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")
Oto, co zostanie wyświetlone:
Documents/
tmp/
mydata.csv/
_SUCCESS
part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv
NB mydata.csv
to folder w zaakceptowanej odpowiedzi - to nie jest plik!
Jak wyprowadzić pojedynczy plik o określonej nazwie
Możemy użyć spark-daria do wypisania pojedynczego mydata.csv
pliku.
import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = sys.env("HOME") + "/Documents/better/staging",
filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)
Spowoduje to wyświetlenie pliku w następujący sposób:
Documents/
better/
mydata.csv
Ścieżki S3
DariaWriters.writeSingleFile
Aby użyć tej metody w S3, musisz przekazać ścieżki s3a :
DariaWriters.writeSingleFile(
df = df,
format = "csv",
sc = spark.sparkContext,
tmpFolder = "s3a://bucket/data/src",
filename = "s3a://bucket/data/dest/my_cool_file.csv"
)
Więcej informacji znajdziesz tutaj .
Unikanie copyMerge
copyMerge zostało usunięte z Hadoop 3. DariaWriters.writeSingleFile
Implementacja używa fs.rename
, jak opisano tutaj . Spark 3 nadal korzystał z Hadoop 2 , więc implementacje copyMerge będą działać w 2020 roku. Nie jestem pewien, kiedy Spark dokona aktualizacji do Hadoop 3, ale lepiej unikać podejścia copyMerge, które spowoduje uszkodzenie kodu, gdy Spark uaktualni Hadoop.
Kod źródłowy
Poszukaj DariaWriters
obiektu w kodzie źródłowym spark-daria, jeśli chcesz sprawdzić implementację.
Wdrożenie PySpark
Zapisanie pojedynczego pliku za pomocą PySpark jest łatwiejsze, ponieważ można przekonwertować DataFrame na Pandas DataFrame, która jest domyślnie zapisywana jako pojedynczy plik.
from pathlib import Path
home = str(Path.home())
data = [
("jellyfish", "JALYF"),
("li", "L"),
("luisa", "LAS"),
(None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)
Ograniczenia
DariaWriters.writeSingleFile
Podejście Scala i df.toPandas()
Python zbliżyć tylko pracę dla małych zbiorów danych. Ogromnych zbiorów danych nie można zapisać jako pojedynczych plików. Zapisywanie danych jako pojedynczego pliku nie jest optymalne z punktu widzenia wydajności, ponieważ danych nie można zapisywać równolegle.
wykorzystując Listbuffer możemy zapisać dane do jednego pliku:
import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
val text = spark.read.textFile("filepath")
var data = ListBuffer[String]()
for(line:String <- text.collect()){
data += line
}
val writer = new FileWriter("filepath")
data.foreach(line => writer.write(line.toString+"\n"))
writer.close()
Jest jeszcze jeden sposób korzystania z Javy
import java.io._
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit)
{
val p = new java.io.PrintWriter(f);
try { op(p) }
finally { p.close() }
}
printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}