Delen via


Hive Warehouse Connector 2.0 API's in Azure HDInsight

In dit artikel worden alle API's vermeld die worden ondersteund door Hive Warehouse Connector 2.0. Alle voorbeelden die worden weergegeven, laten zien hoe u een spark-shell- en Hive Warehouse-connectorsessie gebruikt.

Een Hive-warehouseconnectorsessie maken:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Vereiste

Voer de installatiestappen voor Hive Warehouse Connector uit .

Ondersteunde API's

  • Stel de database in:

    hive.setDatabase("<database-name>")
    
  • Alle databases weergeven:

    hive.showDatabases()
    
  • Alle tabellen in de huidige database weergeven

    hive.showTables()
    
  • Een tabel beschrijven

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Een database verwijderen

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Een tabel in de huidige database neerzetten

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Een database maken

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Een tabel maken in de huidige database

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Builder voor create-table ondersteunt alleen de onderstaande bewerkingen:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Notitie

    Deze API maakt een tabel met ORC-indeling op de standaardlocatie. Gebruik API voor andere functies/opties of om een tabel te maken met hive-query's executeUpdate .

  • Een tabel lezen

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • DDL-opdrachten uitvoeren op HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Hive-query uitvoeren en resultaat laden in gegevensset

    • Query uitvoeren via LLAP-daemons. [Aanbevolen]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Query uitvoeren via HiveServer2 via JDBC.

      Instellen spark.datasource.hive.warehouse.smartExecution op false in Spark-configuraties voordat de Spark-sessie wordt gestart om deze API te gebruiken

      hive.execute("<hive-query>")
      
  • Hive-warehouseconnectorsessie sluiten

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Hive-samenvoegquery uitvoeren

    Met deze API maakt u een Hive-samenvoegquery in de indeling

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Builder ondersteunt de volgende bewerkingen:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Een gegevensset in batch naar hive-tabel schrijven

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName moet de vorm <db>.<table> of <table>hebben. Als er geen databasenaam wordt opgegeven, wordt de tabel doorzocht/gemaakt in de huidige database

    • SaveMode-typen zijn:

      • Toevoegen: voegt de gegevensset toe aan de opgegeven tabel

      • Overschrijven: de gegevens in de opgegeven tabel worden overschreven met een gegevensset

      • Negeren: schrijfbewerkingen overslaan als de tabel al bestaat, er is geen fout opgetreden

      • ErrorIfExists: genereert een fout als de tabel al bestaat

  • Een gegevensset schrijven naar Hive Table met behulp van HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Notitie

    Stream-schrijfbewerkingen voegen altijd gegevens toe.

  • Een spark-stroom schrijven naar een Hive-tabel

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Volgende stappen