Hive Warehouse Connector を使用して Apache Spark と Apache Hive を統合するIntegrate Apache Spark and Apache Hive with the Hive Warehouse Connector

Apache Hive Warehouse Connector (HWC) は、Apache Spark と Apache Hive をより簡単に連携できるライブラリです。Spark DataFrame と Hive テーブルの間のデータ移動などのタスクをサポートするほか、Spark ストリーミング データを Hive テーブルに転送します。The Apache Hive Warehouse Connector (HWC) is a library that allows you to work more easily with Apache Spark and Apache Hive by supporting tasks such as moving data between Spark DataFrames and Hive tables, and also directing Spark streaming data into Hive tables. Hive Warehouse Connector は、Spark と Hive の間で橋渡しのように動作します。Hive Warehouse Connector works like a bridge between Spark and Hive. Scala、Java、Python が開発用にサポートされます。It supports Scala, Java, and Python for development.

Hive Warehouse Connector を使用すると、Hive および Spark の独自の機能を活用して、強力なビッグデータ アプリケーションを構築できます。The Hive Warehouse Connector allows you to take advantage of the unique features of Hive and Spark to build powerful big-data applications. Apache Hive では、ACID (原子性、一貫性、分離性、持続性) なデータベース トランザクションがサポートされています。Apache Hive offers support for database transactions that are Atomic, Consistent, Isolated, and Durable (ACID). Hive における ACID およびトランザクションの詳細については、「Hive Transactions (Hive トランザクション)」を参照してください。For more information on ACID and transactions in Hive, see Hive Transactions. Hive には、Apache Ranger を通じた詳細なセキュリティ コントロールと Apache Spark では使用できない Low Latency Analytical Processing も備わっています。Hive also offers detailed security controls through Apache Ranger and Low Latency Analytical Processing not available in Apache Spark.

Apache Spark には、Apache Hive では使用できないストリーミング機能を提供する Structured Streaming API があります。Apache Spark, has a Structured Streaming API that gives streaming capabilities not available in Apache Hive. HDInsight 4.0 以降、Apache Spark 2.3.1 と Apache Hive 3.1.0 は異なる metastore になっており、相互運用性が困難になることがあります。Beginning with HDInsight 4.0, Apache Spark 2.3.1 and Apache Hive 3.1.0 have separate metastores, which can make interoperability difficult. Hive Warehouse Connector によって、Spark と Hive を一緒に使用することが容易になります。The Hive Warehouse Connector makes it easier to use Spark and Hive together. HWC ライブラリによって LLAP デーモンから Spark Executor にデータが並列で読み込まれるため、Spark から Hive への標準的な JDBC 接続を使用するよりも効率的でスケーラブルになります。The HWC library loads data from LLAP daemons to Spark executors in parallel, making it more efficient and scalable than using a standard JDBC connection from Spark to Hive.

Hive Warehouse Connector のアーキテクチャ

Hive Warehouse Connector でサポートされる操作の一部を次に示します。Some of the operations supported by the Hive Warehouse Connector are:

  • テーブルを記述するDescribing a table
  • ORC 形式データ用のテーブルを作成するCreating a table for ORC-formatted data
  • Hive データを選択して DataFrame を取得するSelecting Hive data and retrieving a DataFrame
  • DataFrame を Hive にバッチで書き込むWriting a DataFrame to Hive in batch
  • Hive の更新ステートメントを実行するExecuting a Hive update statement
  • Hive からテーブル データを読み取り、Spark で変換し、新しい Hive テーブルに書き込むReading table data from Hive, transforming it in Spark, and writing it to a new Hive table
  • Hive ストリーミングを使用して DataFrame または Spark ストリームを Hive に書き込むWriting a DataFrame or Spark stream to Hive using HiveStreaming

Hive Warehouse Connector の設定Hive Warehouse Connector setup

次の手順に従って、Azure HDInsight の Spark および対話型クエリ クラスターの間で Hive Warehouse Connector を設定します。Follow these steps to set up the Hive Warehouse Connector between a Spark and Interactive Query cluster in Azure HDInsight:

クラスターの作成Create clusters

  1. ストレージ アカウントとカスタム Azure 仮想ネットワークを使って HDInsight Spark 4.0 クラスターを作成します。Create an HDInsight Spark 4.0 cluster with a storage account and a custom Azure virtual network. Azure 仮想ネットワークでのクラスターの作成については、「既存の仮想ネットワークへの HDInsight の追加」を参照してください。For information on creating a cluster in an Azure virtual network, see Add HDInsight to an existing virtual network.

  2. Spark クラスターと同じストレージ アカウントと Azure 仮想ネットワークを使って HDInsight 対話型クエリ (LLAP) 4.0 クラスターを作成します。Create an HDInsight Interactive Query (LLAP) 4.0 cluster with the same storage account and Azure virtual network as the Spark cluster.

hosts ファイルの変更Modify hosts file

自分の対話型クエリ クラスターの headnode0 にある /etc/hosts ファイルのノード情報をコピーし、自分の Spark クラスターの headnode0 にある /etc/hosts ファイルに情報を連結します。Copy the node information from the /etc/hosts file on headnode0 of your Interactive Query cluster and concatenate the information to the /etc/hosts file on the headnode0 of your Spark cluster. この手順によって、自分の Spark クラスターが対話型クエリ クラスター内のノードの IP アドレスを解決できるようになります。This step will allow your Spark cluster to resolve IP addresses of the nodes in Interactive Query cluster. cat /etc/hosts を使用して、更新されたファイルの内容を表示します。View the contents of the updated file with cat /etc/hosts. 最終的な出力は、次のスクリーンショットに示されている内容のようになります。The final output should look something like what is shown in the screenshot below.

Hive Warehouse Connector の hosts ファイル

準備情報の収集Gather preliminary information

対話型クエリ クラスターからFrom your Interactive Query cluster

  1. https://LLAPCLUSTERNAME.azurehdinsight.net を使用してクラスターの Apache Ambari ホームページに移動します。LLAPCLUSTERNAME は、対話型クエリ クラスターの名前です。Navigate to the cluster's Apache Ambari home page using https://LLAPCLUSTERNAME.azurehdinsight.net where LLAPCLUSTERNAME is the name of your Interactive Query cluster.

  2. [Hive] > [CONFIGS] > [Advanced] > [Advanced hive-site] > [hive.zookeeper.quorum] に移動し、値を書き留めます。Navigate to Hive > CONFIGS > Advanced > Advanced hive-site > hive.zookeeper.quorum and note the value. 値は次のようになります。zk0-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:2181,zk1-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:2181,zk4-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:2181The value may be similar to: zk0-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:2181,zk1-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:2181,zk4-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:2181.

  3. [Hive] > [CONFIGS] > [Advanced] > [General] > [hive.metastore.uris] に移動し、値を書き留めます。Navigate to Hive > CONFIGS > Advanced > General > hive.metastore.uris and note the value. 値は次のようになります。thrift://hn0-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:9083,thrift://hn1-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:9083The value may be similar to: thrift://hn0-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:9083,thrift://hn1-iqgiro.rekufuk2y2cezcbowjkbwfnyvd.bx.internal.cloudapp.net:9083.

Apache Spark クラスターからFrom your Apache Spark cluster

  1. https://SPARKCLUSTERNAME.azurehdinsight.net を使用してクラスターの Apache Ambari ホームページに移動します。SPARKCLUSTERNAME は Apache Spark クラスターの名前です。Navigate to the cluster's Apache Ambari home page using https://SPARKCLUSTERNAME.azurehdinsight.net where SPARKCLUSTERNAME is the name of your Apache Spark cluster.

  2. [Hive] > [CONFIGS] > [Advanced] > [Advanced hive-interactive-site] > [hive.llap.daemon.service.hosts] に移動し、値を書き留めます。Navigate to Hive > CONFIGS > Advanced > Advanced hive-interactive-site > hive.llap.daemon.service.hosts and note the value. 値は次のようになります。@llap0The value may be similar to: @llap0.

Spark クラスター設定の構成Configure Spark cluster settings

Spark Ambari Web UI から、 [Spark2] > [CONFIGS] > [Custom spark2-defaults] に移動します。From your Spark Ambari web UI, navigate to Spark2 > CONFIGS > Custom spark2-defaults.

Apache Ambari Spark2 の構成

必要に応じて [Add Property](プロパティの追加) を選択し、次のものを追加または更新します。Select Add Property... as needed to add/update the following:

KeyKey Value
spark.hadoop.hive.llap.daemon.service.hosts hive.llap.daemon.service.hosts から先ほど取得した値。The value you obtained earlier from hive.llap.daemon.service.hosts.
spark.sql.hive.hiveserver2.jdbc.url jdbc:hive2://LLAPCLUSTERNAME.azurehdinsight.net:443/;user=admin;password=PWD;ssl=true;transportMode=http;httpPath=/hive2jdbc:hive2://LLAPCLUSTERNAME.azurehdinsight.net:443/;user=admin;password=PWD;ssl=true;transportMode=http;httpPath=/hive2. JDBC 接続文字列に設定します。これにより、対話型クエリ クラスター上の Hiveserver2 に接続されます。Set to the JDBC connection string, which connects to Hiveserver2 on the Interactive Query cluster. LLAPCLUSTERNAME を対話型クエリ クラスターの名前に置き換えます。REPLACE LLAPCLUSTERNAME with the name of your Interactive Query cluster. PWD を実際のパスワードで置き換えます。Replace PWD with the actual password.
spark.datasource.hive.warehouse.load.staging.dir wasbs://STORAGE_CONTAINER_NAME@STORAGE_ACCOUNT_NAME.blob.core.windows.net/tmpwasbs://STORAGE_CONTAINER_NAME@STORAGE_ACCOUNT_NAME.blob.core.windows.net/tmp. HDFS と互換性のある適切なステージング ディレクトリに設定します。Set to a suitable HDFS-compatible staging directory. 2 つの異なるクラスターがある場合、ステージング ディレクトリは、HiveServer2 がそこにアクセスできるよう、LLAP クラスターのストレージ アカウントのステージング ディレクトリにあるフォルダーである必要があります。If you have two different clusters, the staging directory should be a folder in the staging directory of the LLAP cluster’s storage account so that HiveServer2 has access to it. STORAGE_ACCOUNT_NAME をクラスターによって使用されているストレージ アカウントの名前に置き換え、STORAGE_CONTAINER_NAME をストレージ コンテナーの名前に置き換えます。Replace STORAGE_ACCOUNT_NAME with the name of the storage account being used by the cluster, and STORAGE_CONTAINER_NAME with the name of the storage container.
spark.datasource.hive.warehouse.metastoreUri hive.metastore.uris から先ほど取得した値。The value you obtained earlier from hive.metastore.uris.
spark.security.credentials.hiveserver2.enabled YARN クライアント デプロイ モードの場合は false に設定します。false for YARN client deploy mode.
spark.hadoop.hive.zookeeper.quorum hive.zookeeper.quorum から先ほど取得した値。The value you obtained earlier from hive.zookeeper.quorum.

変更を保存し、必要に応じてコンポーネントを再起動します。Save changes and restart components as needed.

Hive Warehouse Connector の使用Using the Hive Warehouse Connector

クエリの接続と実行Connecting and running queries

いくつかの異なる方法の中から選択し、Hive Warehouse Connector を使って自分の対話型クエリ クラスターに接続してクエリを実行できます。You can choose between a few different methods to connect to your Interactive Query cluster and execute queries using the Hive Warehouse Connector. サポートされている方法には次のツールがあります。Supported methods include the following tools:

この記事で提供されるすべての例は、spark-shell を通じて実行されます。All examples provided in this article will be executed through spark-shell.

spark-shell セッションを開始するには、次の手順を実行します。To start a spark-shell session, do the following steps:

  1. Apache Spark クラスターのヘッドノードに SSH 接続します。SSH into the headnode for your Apache Spark cluster. クラスターへの SSH 接続の詳細については、「SSH を使用して HDInsight (Apache Hadoop) に接続する」を参照してください。For more information about connecting to your cluster with SSH, see Connect to HDInsight (Apache Hadoop) using SSH.

  2. 次のコマンドを入力して、spark-shell を起動します。Enter the following command to start the spark shell:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.2.1-8.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    

    ウェルカム メッセージと scala> プロンプトが表示されます。ここで、コマンドを入力することができます。You will see a welcome message and a scala> prompt where you can enter commands.

  3. spark-shell の起動後、次のコマンドを使用して Hive Warehouse Connector インスタンスを開始できます。After starting the spark-shell, a Hive Warehouse Connector instance can be started using the following commands:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Enterprise セキュリティ パッケージ (ESP) クラスター上でのクエリの接続と実行Connecting and running queries on Enterprise Security Package (ESP) clusters

Enterprise セキュリティ パッケージ (ESP) を使用すると、Active Directory ベースの認証、マルチユーザーのサポート、ロールベースのアクセス制御など、エンタープライズレベルの機能を Azure HDInsight の Apache Hadoop クラスターで利用できます。The Enterprise Security Package (ESP) provides enterprise-grade capabilities like Active Directory-based authentication, multi-user support, and role-based access control for Apache Hadoop clusters in Azure HDInsight. ESP に関する詳細については、「HDInsight で Enterprise セキュリティ パッケージを使用する」を参照してください。For more information on ESP, see Use Enterprise Security Package in HDInsight.

  1. Apache Spark クラスターのヘッドノードに SSH 接続します。SSH into the headnode for your Apache Spark cluster. クラスターへの SSH 接続の詳細については、「SSH を使用して HDInsight (Apache Hadoop) に接続する」を参照してください。For more information about connecting to your cluster with SSH, see Connect to HDInsight (Apache Hadoop) using SSH.

  2. kinit」と入力して、ドメイン ユーザーでログインします。Type kinit and login with a domain user.

  3. 次に示すように、構成パラメーターの完全なリストを使用して、spark-shell を起動します。Start spark-shell with the full list of configuration parameters as shown below. 山かっこ内のすべてが大文字の値は全部、自分のクラスターに基づいて指定する必要があります。All of the values in all capital letters between angle brackets must be specified based on your cluster. 下のいずれかのパラメーターに入力する値を確認する必要がある場合は、「Hive Warehouse Connector の設定」のセクションを参照してください。If you need to find out the values to input for any of the parameters below, see the section on Hive Warehouse Connector setup.:

    spark-shell --master yarn \
    --jars /usr/hdp/3.0.1.0-183/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-183.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    --conf spark.hadoop.hive.llap.daemon.service.hosts='<LLAP_APP_NAME>'
    --conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://<ZOOKEEPER_QUORUM>;serviceDiscoveryMode=zookeeper;zookeeperNamespace=hiveserver2-interactive'
    --conf spark.datasource.hive.warehouse.load.staging.dir='<STAGING_DIR>'
    --conf spark.datasource.hive.warehouse.metastoreUri='<METASTORE_URI>'
    --conf spark.hadoop.hive.zookeeper.quorum='<ZOOKEEPER_QUORUM>'
    

Hive クエリからの Spark DataFrame の作成Creating Spark DataFrames from Hive queries

HWC ライブラリを使用したすべてのクエリの結果は、DataFrame として返されます。The results of all queries using the HWC library are returned as a DataFrame. 次の例は、基本的なクエリを作成する方法を示しています。The following examples demonstrate how to create a basic query.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

クエリの結果は Spark DataFrame です。これは、MLIB や SparkSQL のような Spark ライブラリと共に使用できます。The results of the query are Spark DataFrames, which can be used with Spark libraries like MLIB and SparkSQL.

Spark DataFrame から Hive テーブルへの書き出しWriting out Spark DataFrames to Hive tables

Spark は、Hive によって管理される ACID テーブルへの書き込みをネイティブにサポートしていません。Spark doesn’t natively support writing to Hive’s managed ACID tables. しかし、HWC を使用することで、どの DataFrame も Hive テーブルに書き出すことができます。Using HWC, however, you can write out any DataFrame to a Hive table. 次の例では、この機能の動作を確認できます。You can see this functionality at work in the following example:

  1. 次のコマンドを使用し、sampletable_colorado というテーブルを作成してその列を指定します。Create a table called sampletable_colorado and specify its columns using the following command:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. テーブル hivesampletable をフィルターします (列 state = Colorado)。Filter the table hivesampletable where the column state equals Colorado. Hive テーブルのこのクエリは Spark DataFrame として返されます。This query of the Hive table is returned as a Spark DataFrame. その後 DataFrame は、write 関数を使用して Hive テーブル sampletable_colorado に保存されます。Then the DataFrame is saved in the Hive table sampletable_colorado using the write function.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR).option("table","sampletable_colorado").save()
    
  3. 次のコマンドを使用して結果を表示します。View the results with the following command:

    hive.table("sampletable_colorado").show()
    

    Hive Warehouse Connector の Hive テーブルの表示

構造化ストリーミングの書き込みStructured streaming writes

Hive Warehouse Connector を使用すると、Spark ストリーミングを使って Hive テーブルにデータを書き込むことができます。Using Hive Warehouse Connector, you can use Spark streaming to write data into Hive tables.

下の手順に従って、localhost ポート 9999 上の Spark ストリームから Hive テーブルにデータを取り込む Hive Warehouse Connector サンプルを作成します。Follow the steps below to create a Hive Warehouse Connector example that ingests data from a Spark stream on localhost port 9999 into a Hive table.

  1. クエリの接続と実行」の手順に従います。Follow the steps under Connecting and running queries.

  2. 次のコマンドを使用して、Spark ストリームを開始します。Begin the spark stream with the following command:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  3. 次の手順を実行して、作成した Spark ストリームのためのデータを生成します。Generate data for the Spark stream that you created, by doing the following steps:

    1. 同じ Spark クラスターで2番目の SSH セッションを開きます。Open a second SSH session on the same Spark cluster.
    2. コマンド プロンプトで「 nc -lk 9999」と入力します。At the command prompt, type nc -lk 9999. このコマンドでは、netcat ユーティリティを使用して、コマンド ラインから指定のポートにデータを送信します。This command uses the netcat utility to send data from the command line to the specified port.
  4. 最初の SSH セッションに戻り、ストリーミング データを保持する新しい Hive テーブルを作成します。Return to the first SSH session and create a new Hive table to hold the streaming data. spark-shell で、次のコマンドを入力します。At the spark-shell, enter the following command:

    hive.createTable("stream_table").column("value","string").create()
    
  5. その後、次のコマンドを使用して、新しく作成したテーブルにストリーミング データを書き込みます。Then write the streaming data to the newly created table using the following command:

    lines.filter("value = 'HiveSpark'").writeStream.format(HiveWarehouseSession.STREAM_TO_STREAM).option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    重要

    現在、metastoreUri および database オプションは、Apache Spark の既知の問題のため、手動で設定する必要があります。The metastoreUri and database options must currently be set manually due to a known issue in Apache Spark. この問題の詳細については、SPARK-25460 を参照してください。For more information about this issue, see SPARK-25460.

  6. 2番目の SSH セッションに戻り、次の値を入力します。Return to the second SSH session and enter the following values:

    foo
    HiveSpark
    bar
    
  7. 最初の SSH セッションに戻り、この簡単なアクティビティに注目します。Return to the first SSH session and note the brief activity. 次のコマンドを使用して、データを表示します。Use the following command to view the data:

    hive.table("stream_table").show()
    

2番目の SSH セッションで、Ctrl + C を使用して netcat を停止します。Use Ctrl + C to stop netcat on the second SSH session. 最初の SSH セッションで、:q を使用して spark-shell を終了します。Use :q to exit spark-shell on the first SSH session.

Spark ESP クラスター上のデータのセキュリティ保護Securing data on Spark ESP clusters

  1. 次のコマンドを入力して、一定のサンプル データが含まれるテーブル demo を作成します。Create a table demo with some sample data by entering the following commands:

    create table demo (name string);
    INSERT INTO demo VALUES ('HDinsight');
    INSERT INTO demo VALUES ('Microsoft');
    INSERT INTO demo VALUES ('InteractiveQuery');
    
  2. 次のコマンドを使用して、テーブルの内容を表示します。View the table's contents with the following command. ポリシーを適用する前、demo テーブルにはすべての列が表示されます。Before applying the policy, the demo table shows the full column.

    hive.executeQuery("SELECT * FROM demo").show()
    

    Ranger ポリシーを適用する前のデモ テーブル

  3. 列の最後の 4 文字だけが表示される列マスク ポリシーを適用します。Apply a column masking policy that only shows the last four characters of the column.

    1. https://CLUSTERNAME.azurehdinsight.net/ranger/ で Ranger 管理 UI に移動します。Go to the Ranger Admin UI at https://CLUSTERNAME.azurehdinsight.net/ranger/.

    2. [Hive] の下にある自分のクラスターの Hive サービスをクリックします。Click on the Hive service for your cluster under Hive. Ranger サービス マネージャーranger service manager

    3. [Masking](マスク) タブをクリックし、 [Add New Policy](新しいポリシーの追加) をクリックしますClick on the Masking tab and then Add New Policy

      Hive Warehouse Connector の Ranger Hive ポリシーの一覧

    a.a. 目的のポリシー名を入力します。Provide a desired policy name. 次のように選択します。データベース: default、Hive テーブル: demo、Hive 列: name、ユーザー: rsadmin2、アクセスの種類: select[Select Masking Option](マスク オプションの選択) メニュー: Partial mask: show last 4Select database: Default, Hive table: demo, Hive column: name, User: rsadmin2, Access Types: select, and Partial mask: show last 4 from the Select Masking Option menu. [追加] をクリックします。Click Add. ポリシーの作成create policy

  4. テーブルの内容をもう一度表示します。View the table's contents again. Ranger ポリシーの適用後は、列の最後の 4 文字だけを確認できます。After applying the ranger policy, we can see only the last four characters of the column.

    Ranger ポリシーを適用した後のデモ テーブル

次の手順Next steps