Share via


API Hive Warehouse Connector 2.0 in Azure HDInsight

Questo articolo elenca tutte le API supportate da Hive Warehouse Connector 2.0. Tutti gli esempi illustrati sono come eseguire usando spark-shell e la sessione del connettore hive warehouse.

Come creare una sessione del connettore Hive warehouse:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Prerequisito

Completare la procedura di configurazione di Hive Warehouse Connector.

API supportate

  • Impostare il database:

    hive.setDatabase("<database-name>")
    
  • Elencare tutti i database:

    hive.showDatabases()
    
  • Elencare tutte le tabelle nel database corrente

    hive.showTables()
    
  • Descrivere una tabella

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Eliminare un database

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Eliminare una tabella nel database corrente

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Creazione di un database

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Creare una tabella nel database corrente

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Il generatore per create-table supporta solo le operazioni seguenti:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Nota

    Questa API crea una tabella formattata ORC nel percorso predefinito. Per altre funzionalità/opzioni o per creare una tabella usando query Hive, usare l'API executeUpdate .

  • Leggere una tabella

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Eseguire comandi DDL in HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Eseguire query Hive e caricare i risultati nel set di dati

    • Esecuzione di query tramite daemon LLAP. [Consigliato]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Esecuzione di query tramite HiveServer2 tramite JDBC.

      Impostare su spark.datasource.hive.warehouse.smartExecutionfalse in configurazioni Spark prima di avviare la sessione Spark per usare questa API

      hive.execute("<hive-query>")
      
  • Chiudere la sessione del connettore Hive Warehouse

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Eseguire una query hive merge

    Questa API crea una query di merge Hive nel formato

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Builder supporta le operazioni seguenti:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Scrivere un set di dati in una tabella Hive in batch

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName deve essere di tipo form <db>.<table> o <table>. Se non viene specificato alcun nome di database, verrà eseguita la ricerca/creazione della tabella nel database corrente

    • I tipi SaveMode sono:

      • Accoda: aggiunge il set di dati alla tabella specificata

      • Sovrascrittura: sovrascrive i dati nella tabella specificata con il set di dati

      • Ignora: ignora la scrittura se la tabella esiste già, non viene generato alcun errore

      • ErrorIfExists: genera un errore se la tabella esiste già

  • Scrivere un set di dati in una tabella Hive usando HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Nota

    Stream scrive sempre i dati di accodamento.

  • Scrittura di un flusso spark in una tabella Hive

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Passaggi successivi