Operace Apache Sparku podporované službou Hive Warehouse Připojení or ve službě Azure HDInsight

Tento článek ukazuje operace založené na sparku podporované službou Hive Warehouse Připojení or (HWC). Všechny níže uvedené příklady se spustí prostřednictvím prostředí Apache Spark.

Požadavek

Dokončete kroky nastavení Připojení soustavu Hive Warehouse.

Začínáme

Pokud chcete spustit relaci spark-shellu, proveďte následující kroky:

  1. Pomocí příkazu ssh se připojte ke clusteru Apache Spark. Upravte následující příkaz nahrazením clusteru názvem clusteru a zadáním příkazu:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. V relaci ssh spusťte následující příkaz, který si poznamenejte hive-warehouse-connector-assembly verzi:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Upravte níže uvedený kód s hive-warehouse-connector-assembly verzí uvedenou výše. Pak spuštěním příkazu spusťte prostředí Spark:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Po spuštění spark-shellu můžete spustit instanci Připojení oru Hive Warehouse pomocí následujících příkazů:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Vytváření datových rámců Sparku pomocí dotazů Hive

Výsledky všech dotazů používajících knihovnu HWC se vrátí jako datový rámec. Následující příklady ukazují, jak vytvořit základní dotaz Hive.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Výsledky dotazu jsou datové rámce Sparku, které je možné použít s knihovnami Sparku, jako je MLIB a SparkSQL.

Zápis datových rámců Sparku do tabulek Hive

Spark nativně nepodporuje zápis do spravovaných tabulek ACID Hive. Pomocí HWC ale můžete do tabulky Hive zapisovat libovolný datový rámec. Tuto funkci si můžete prohlédnout v práci v následujícím příkladu:

  1. Vytvořte volanou sampletable_colorado tabulku a pomocí následujícího příkazu zadejte její sloupce:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Vyfiltrujte tabulku hivesampletable , ve které se sloupec state rovná Colorado. Tento dotaz Hive vrátí datový rámec Sparku a výsledek se uloží do tabulky sampletable_colorado Hive pomocí write funkce.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Výsledky zobrazíte pomocí následujícího příkazu:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Zápisy strukturovaného streamování

Pomocí služby Hive Warehouse Připojení or můžete pomocí streamování Sparku zapisovat data do tabulek Hive.

Důležité

Zápisy strukturovaného streamování se nepodporují v clusterech Spark 4.0 s podporou ESP.

Pomocí následujícího postupu ingestujte data ze streamu Sparku na portu localhost 9999 do tabulky Hive prostřednictvím. Hive Warehouse Připojení or.

  1. V otevřeném prostředí Spark spusťte stream Spark pomocí následujícího příkazu:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Pomocí následujících kroků vygenerujte data pro datový proud Spark, který jste vytvořili:

    1. Otevřete druhou relaci SSH ve stejném clusteru Spark.
    2. Do příkazového řádku zadejte nc -lk 9999. Tento příkaz používá netcat nástroj k odesílání dat z příkazového řádku do zadaného portu.
  3. Vraťte se do první relace SSH a vytvořte novou tabulku Hive pro uložení streamovaných dat. V prostředí spark-shell zadejte následující příkaz:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Potom pomocí následujícího příkazu zapište streamovaná data do nově vytvořené tabulky:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Důležité

    Možnosti metastoreUri musí database být v současné době nastavené ručně kvůli známému problému v Apache Sparku. Další informace o tomto problému najdete v tématu SPARK-25460.

  5. Vraťte se do druhé relace SSH a zadejte následující hodnoty:

    foo
    HiveSpark
    bar
    
  6. Vraťte se do první relace SSH a poznamenejte si krátkou aktivitu. K zobrazení dat použijte následující příkaz:

    hive.table("stream_table").show()
    

K zastavení netcat druhé relace SSH použijte kombinaci kláves Ctrl+C. Slouží :q k ukončení spark-shellu v první relaci SSH.

Další kroky