Hive Warehouse Connector 2.0 API's in Azure HDInsight
In dit artikel worden alle API's vermeld die worden ondersteund door Hive Warehouse Connector 2.0. Alle voorbeelden die worden weergegeven, laten zien hoe u een spark-shell- en Hive Warehouse-connectorsessie gebruikt.
Een Hive-warehouseconnectorsessie maken:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Vereiste
Voer de installatiestappen voor Hive Warehouse Connector uit .
Ondersteunde API's
Stel de database in:
hive.setDatabase("<database-name>")
Alle databases weergeven:
hive.showDatabases()
Alle tabellen in de huidige database weergeven
hive.showTables()
Een tabel beschrijven
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Een database verwijderen
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Een tabel in de huidige database neerzetten
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Een database maken
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Een tabel maken in de huidige database
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Builder voor create-table ondersteunt alleen de onderstaande bewerkingen:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Notitie
Deze API maakt een tabel met ORC-indeling op de standaardlocatie. Gebruik API voor andere functies/opties of om een tabel te maken met hive-query's
executeUpdate
.Een tabel lezen
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
DDL-opdrachten uitvoeren op HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw excpetion in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Hive-query uitvoeren en resultaat laden in gegevensset
Query uitvoeren via LLAP-daemons. [Aanbevolen]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Query uitvoeren via HiveServer2 via JDBC.
Instellen
spark.datasource.hive.warehouse.smartExecution
opfalse
in Spark-configuraties voordat de Spark-sessie wordt gestart om deze API te gebruikenhive.execute("<hive-query>")
Hive-warehouseconnectorsessie sluiten
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Hive-samenvoegquery uitvoeren
Met deze API maakt u een Hive-samenvoegquery in de indeling
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Builder ondersteunt de volgende bewerkingen:
mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Een gegevensset in batch naar hive-tabel schrijven
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName moet de vorm
<db>.<table>
of<table>
hebben. Als er geen databasenaam wordt opgegeven, wordt de tabel doorzocht/gemaakt in de huidige databaseSaveMode-typen zijn:
Toevoegen: voegt de gegevensset toe aan de opgegeven tabel
Overschrijven: de gegevens in de opgegeven tabel worden overschreven met een gegevensset
Negeren: schrijfbewerkingen overslaan als de tabel al bestaat, er is geen fout opgetreden
ErrorIfExists: genereert een fout als de tabel al bestaat
Een gegevensset schrijven naar Hive Table met behulp van HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Notitie
Stream-schrijfbewerkingen voegen altijd gegevens toe.
Een spark-stroom schrijven naar een Hive-tabel
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()
Volgende stappen
Feedback
https://aka.ms/ContentUserFeedback.
Binnenkort beschikbaar: In de loop van 2024 zullen we GitHub-problemen geleidelijk uitfaseren als het feedbackmechanisme voor inhoud en deze vervangen door een nieuw feedbacksysteem. Zie voor meer informatie:Feedback verzenden en weergeven voor