Czy można zapisać DataFrame
w Spark bezpośrednio w Hive?
Próbowałem przekonwertować DataFrame
do, Rdd
a następnie zapisać jako plik tekstowy, a następnie załadować w gałęzi. Ale zastanawiam się, czy mogę bezpośrednio zapisać dataframe
do ula
Odpowiedzi:
Możesz utworzyć tymczasową tabelę w pamięci i przechowywać ją w tabeli gałęzi przy użyciu sqlContext.
Powiedzmy, że twoja ramka danych to myDf. Możesz utworzyć jedną tabelę tymczasową za pomocą,
myDf.createOrReplaceTempView("mytempTable")
Następnie możesz użyć prostej instrukcji hive, aby utworzyć tabelę i zrzucić dane z tabeli tymczasowej.
sqlContext.sql("create table mytable as select * from mytempTable");
temporary
stół do hive
stołu? Robiąc show tables
to, zawiera tylko hive
tabele dla mojej spark 2.3.0
instalacji
Użyj DataFrameWriter.saveAsTable
. ( df.write.saveAsTable(...)
) Zobacz temat Spark SQL i DataFrame Guide .
df.write().saveAsTable(tableName)
również zapisanie danych strumieniowych do tabeli?
Nie widzę df.write.saveAsTable(...)
wycofanych w dokumentacji Spark 2.0. To zadziałało dla nas na Amazon EMR. Doskonale byliśmy w stanie wczytać dane z S3 do ramki danych, przetworzyć je, stworzyć tabelę z wyniku i odczytać ją za pomocą MicroStrategy. Odpowiedź Vinays też się sprawdziła.
musisz mieć / utworzyć HiveContext
import org.apache.spark.sql.hive.HiveContext;
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Następnie bezpośrednio zapisz ramkę danych lub wybierz kolumny do zapisania jako tabela gałęzi
df to dataframe
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
lub
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
lub
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes to Append / Ignore / Overwrite / ErrorIfExists
Dodałem tutaj definicję dla HiveContext z dokumentacji Spark,
Oprócz podstawowego SQLContext można również utworzyć HiveContext, który zapewnia nadzbiór funkcji zapewnianych przez podstawowy SQLContext. Dodatkowe funkcje obejmują możliwość pisania zapytań przy użyciu pełniejszego parsera HiveQL, dostępu do funkcji Hive UDF oraz możliwość odczytu danych z tabel Hive. Aby użyć HiveContext, nie musisz mieć istniejącej konfiguracji Hive, a wszystkie źródła danych dostępne dla SQLContext są nadal dostępne. HiveContext jest pakowany tylko osobno, aby uniknąć uwzględnienia wszystkich zależności Hive w domyślnej kompilacji Spark.
w Spark w wersji 1.6.2 użycie „dbName.tableName” powoduje wystąpienie tego błędu:
org.apache.spark.sql.AnalysisException: określenie nazwy bazy danych lub innych kwalifikatorów nie jest dozwolone w przypadku tabel tymczasowych. Jeśli nazwa tabeli zawiera kropki (.), Zacytuj ją za pomocą odwrotnych apostrofów () .`
df.write().mode...
należy zmienić nadf.write.mode...
Zapisywanie do Hive to tylko kwestia użycia write()
metody SQLContext:
df.write.saveAsTable(tableName)
From Spark 2.2: użyj DataSet zamiast DataFrame.
From Spark 2.2: use DataSet instead DataFrame.
Przepraszam, piszę późno na post, ale nie widzę zaakceptowanej odpowiedzi.
df.write().saveAsTable
wyrzuci AnalysisException
i nie jest kompatybilny ze stołem HIVE.
Przechowywanie DF, które df.write().format("hive")
powinno załatwić sprawę!
Jeśli jednak to nie zadziała, to kierując się wcześniejszymi komentarzami i odpowiedziami, jest to moim zdaniem najlepsze rozwiązanie (choć otwarte na sugestie).
Najlepszym podejściem jest jawne utworzenie tabeli HIVE (w tym tabeli PARTITIONED),
def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
"PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}
zapisz DF jako tabelę tymczasową,
df.createOrReplaceTempView("$tempTableName")
i wstaw do tabeli PARTITIONED HIVE:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)
Offcourse ostatnia kolumna w DF będzie KOLUMNA PODZIAŁU więc utworzyć tabelę odpowiednio ula!
Prosimy o komentarz, jeśli to działa! albo nie.
--AKTUALIZACJA--
df.write()
.partitionBy("$partition_column")
.format("hive")
.mode(SaveMode.append)
.saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
Oto wersja PySpark do tworzenia tabeli Hive z pliku parkiet. Być może wygenerowano pliki Parquet przy użyciu wywnioskowanego schematu, a teraz chcesz wypchnąć definicję do metastore Hive. Możesz także przekazać definicję do systemu, takiego jak AWS Glue lub AWS Athena, a nie tylko do magazynu metastore Hive. Tutaj używam spark.sql do wypychania / tworzenia trwałej tabeli.
# Location where my parquet files are present.
df = spark.read.parquet("s3://my-location/data/")
cols = df.dtypes
buf = []
buf.append('CREATE EXTERNAL TABLE test123 (')
keyanddatatypes = df.dtypes
sizeof = len(df.dtypes)
print ("size----------",sizeof)
count=1;
for eachvalue in keyanddatatypes:
print count,sizeof,eachvalue
if count == sizeof:
total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
else:
total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
buf.append(total)
count = count + 1
buf.append(' )')
buf.append(' STORED as parquet ')
buf.append("LOCATION")
buf.append("'")
buf.append('s3://my-location/data/')
buf.append("'")
buf.append("'")
##partition by pt
tabledef = ''.join(buf)
print "---------print definition ---------"
print tabledef
## create a table using spark.sql. Assuming you are using spark 2.1+
spark.sql(tabledef);
W przypadku zewnętrznych tabel Hive używam tej funkcji w PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
print("Saving result in {}.{}".format(database, table_name))
output_schema = "," \
.join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
.replace("StringType", "STRING") \
.replace("IntegerType", "INT") \
.replace("DateType", "DATE") \
.replace("LongType", "INT") \
.replace("TimestampType", "INT") \
.replace("BooleanType", "BOOLEAN") \
.replace("FloatType", "FLOAT")\
.replace("DoubleType","FLOAT")
output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)
sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
.format(database, table_name, output_schema, save_format, database, table_name)
sparkSession.sql(query)
dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
W moim przypadku to działa dobrze:
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
Gotowe!!
Możesz czytać Dane, podając jako „Pracownik”
hive.executeQuery("select * from Employee").show()
Aby uzyskać więcej informacji, użyj tego adresu URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
Jeśli chcesz utworzyć tabelę gałęzi (która nie istnieje) z ramki danych (czasami nie udaje się jej utworzyć
DataFrameWriter.saveAsTable
).StructType.toDDL
pomoże w wyświetlaniu kolumn jako ciągu.
val df = ...
val schemaStr = df.schema.toDDL # This gives the columns
spark.sql(s"""create table hive_table ( ${schemaStr})""")
//Now write the dataframe to the table
df.write.saveAsTable("hive_table")
hive_table
zostanie utworzony w domyślnej przestrzeni, ponieważ nie udostępniliśmy żadnej bazy danych pod adresem spark.sql()
. stg.hive_table
może być używany do tworzenia hive_table
w stg
bazie danych.
Możesz użyć biblioteki Spark-Llap Hortonworks w ten sposób
import com.hortonworks.hwc.HiveWarehouseSession
df.write
.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
.mode("append")
.option("table", "myDatabase.myTable")
.save()