Azure Virtual Network 経由で HDInsight 上の Apache Kafka に接続するConnect to Apache Kafka on HDInsight through an Azure Virtual Network

Azure Virtual Network 経由で HDInsight 上の Apache Kafka へ直接接続する方法について説明します。Learn how to directly connect to Apache Kafka on HDInsight through an Azure Virtual Network. このドキュメントでは、次の構成での Kafka への接続の詳細を示します。This document provides information on connecting to Kafka using the following configurations:

  • オンプレミス ネットワーク内のリソースからの接続。From resources in an on-premises network. この接続は、ローカル ネットワーク上の VPN デバイス (ソフトウェアまたはハードウェア) を使用して確立します。This connection is established by using a VPN device (software or hardware) on your local network.
  • VPN ソフトウェア クライアントを使用した開発環境からの接続。From a development environment using a VPN software client.

注意

この記事は、新しい Azure PowerShell Az モジュールを使用するために更新されました。This article has been updated to use the new Azure PowerShell Az module. AzureRM モジュールはまだ使用でき、少なくとも 2020 年 12 月までは引き続きバグ修正が行われます。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. Az モジュールと AzureRM の互換性の詳細については、「Introducing the new Azure PowerShell Az module (新しい Azure PowerShell Az モジュールの概要)」を参照してください。To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. Az モジュールのインストール手順については、Azure PowerShell のインストールを参照してください。For Az module installation instructions, see Install Azure PowerShell.

アーキテクチャと計画Architecture and planning

HDInsight では、パブリック インターネット経由で Kafka に直接接続することはできません。HDInsight does not allow direct connection to Kafka over the public internet. 代わりに、Kafka クライアント (プロデューサーおよびコンシューマー) で次の接続方法のいずれかを使用する必要があります。Instead, Kafka clients (producers and consumers) must use one of the following connection methods:

  • HDInsight 上で Kafka と同じ仮想ネットワーク内でクライアントを実行する。Run the client in the same virtual network as Kafka on HDInsight. この構成は、「HDInsight での Apache Kafka の開始」で使用されているものです。This configuration is used in the Start with Apache Kafka on HDInsight document. クライアントは、HDInsight のクラスター上、または同じネットワーク内の別の仮想マシン上で直接実行します。The client runs directly on the HDInsight cluster nodes or on another virtual machine in the same network.

  • オンプレミス ネットワークなどのプライベート ネットワークを仮想ネットワークに接続する。Connect a private network, such as your on-premises network, to the virtual network. この構成では、オンプレミス ネットワーク内のクライアントから直接 Kafka を操作することができます。This configuration allows clients in your on-premises network to directly work with Kafka. この構成を有効にするには、次のタスクを実行します。To enable this configuration, perform the following tasks:

    1. 仮想ネットワークを作成します。Create a virtual network.

    2. サイト間構成を使用する VPN ゲートウェイを作成します。Create a VPN gateway that uses a site-to-site configuration. このドキュメントで使用する構成では、オンプレミス ネットワーク内の VPN ゲートウェイ デバイスへ接続します。The configuration used in this document connects to a VPN gateway device in your on-premises network.

    3. 仮想ネットワーク内に DNS サーバーを作成します。Create a DNS server in the virtual network.

    4. 各ネットワークの DNS サーバー間の転送を構成します。Configure forwarding between the DNS server in each network.

    5. 仮想ネットワーク内の HDInsight クラスターに Kafka を作成します。Create a Kafka on HDInsight cluster in the virtual network.

      詳細については、「オンプレミス ネットワークから Apache Kafka に接続する」セクションを参照してください。For more information, see the Connect to Apache Kafka from an on-premises network section.

  • VPN ゲートウェイと VPN クライアントを使用して、仮想ネットワークに各マシンを接続する。Connect individual machines to the virtual network using a VPN gateway and VPN client. この構成を有効にするには、次のタスクを実行します。To enable this configuration, perform the following tasks:

    1. 仮想ネットワークを作成します。Create a virtual network.

    2. ポイント対サイト構成を使用する VPN ゲートウェイを作成します。Create a VPN gateway that uses a point-to-site configuration. この構成は、Windows と MacOS の両方のクライアントで使用することができます。This configuration can be used with both Windows and MacOS clients.

    3. 仮想ネットワーク内の HDInsight クラスターに Kafka を作成します。Create a Kafka on HDInsight cluster in the virtual network.

    4. IP を提供するように Kafka を構成します。Configure Kafka for IP advertising. この構成を行うことで、クライアントでドメイン名の代わりに ブローカー IP アドレスを使用して接続を行えるようになります。This configuration allows the client to connect using broker IP addresses instead of domain names.

    5. 開発システムに VPN クライアントをダウンロードして使用します。Download and use the VPN client on the development system.

      詳細については、「VPN クライアントを使用して Apache Kafka に接続する」セクションを参照してください。For more information, see the Connect to Apache Kafka with a VPN client section.

      警告

      この構成には次の制限があるため、推奨されるのは開発用途のみです。This configuration is only recommended for development purposes because of the following limitations:

      • 各クライアントは、VPN ソフトウェア クライアントを使用して接続する必要があります。Each client must connect using a VPN software client.
      • この VPN クライアントは仮想ネットワークに名前解決要求を渡さないため、Kafka との通信には IP アドレスを使用する必要があります。The VPN client does not pass name resolution requests to the virtual network, so you must use IP addressing to communicate with Kafka. IP で通信を行うには、Kafka クラスターで追加の構成を行う必要があります。IP communication requires additional configuration on the Kafka cluster.

仮想ネットワークにおける HDInsight の使用方法の詳細については、Azure HDInsight クラスター用の仮想ネットワークの計画に関するページを参照してください。For more information on using HDInsight in a virtual network, see Plan a virtual network for Azure HDInsight clusters.

オンプレミス ネットワークから Apache Kafka に接続するConnect to Apache Kafka from an on-premises network

Connect HDInsight to your on-premises network (オンプレミス ネットワークに HDInsight を接続する)」の手順に従って、オンプレミス ネットワークと通信する Kafka クラスターを作成します。To create a Kafka cluster that communicates with your on-premises network, follow the steps in the Connect HDInsight to your on-premises network document.

重要

HDInsight クラスターの作成時には、クラスター種類で Kafka を選択します。When creating the HDInsight cluster, select the Kafka cluster type.

これらの手順により、次の構成が作成されます。These steps create the following configuration:

  • Azure Virtual NetworkAzure Virtual Network
  • サイト間 VPN ゲートウェイSite-to-site VPN gateway
  • Azure Storage アカウント (HDInsight で使用します)Azure Storage account (used by HDInsight)
  • HDInsight 上の KafkaKafka on HDInsight

Kafka クライアントがオンプレミスからクラスターへ接続できることを確認するには、「例:Python クライアント」セクションの手順を実行します。To verify that a Kafka client can connect to the cluster from on-premises, use the steps in the Example: Python client section.

VPN クライアントを使用して Apache Kafka に接続するConnect to Apache Kafka with a VPN client

このセクションの手順では、次の構成を作成します。Use the steps in this section to create the following configuration:

  • Azure Virtual NetworkAzure Virtual Network
  • ポイント対サイト VPN ゲートウェイPoint-to-site VPN gateway
  • Azure Storage アカウント (HDInsight で使用されます)Azure Storage Account (used by HDInsight)
  • HDInsight 上の KafkaKafka on HDInsight
  1. ポイント対サイト接続での自己署名証明書の使用に関する記事の手順を実行します。Follow the steps in the Working with self-signed certificates for Point-to-site connections document. このドキュメントでは、ゲートウェイに必要な証明書を作成しています。This document creates the certificates needed for the gateway.

  2. PowerShell プロンプトを開き、次のコードを使用して Azure サブスクリプションにサインインします。Open a PowerShell prompt and use the following code to sign in to your Azure subscription:

    Connect-AzAccount
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. 次のコードを使用して、構成情報を含む変数を作成します。Use the following code to create variables that contain configuration information:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $HdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. 次のコードを使用して、Azure リソース グループと仮想ネットワークを作成します。Use the following code to create the Azure resource group and virtual network:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    警告

    このプロセスは、完了するまで数分かかる可能性があります。It can take several minutes for this process to complete.

  5. 次のコードを使用して、Azure Storage アカウントと BLOB コンテナーを作成します。Use the following code to create the Azure Storage Account and blob container:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. 次のコードを使用して、HDInsight クラスターを作成します。Use the following code to create the HDInsight cluster:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    警告

    このプロセスは、完了するまで約 15 分かかります。This process takes around 15 minutes to complete.

IP を提供するように Kafka を構成するConfigure Kafka for IP advertising

既定では、Apache Zookeeper は、Kafka ブローカーのドメイン名をクライアントに返します。By default, Apache Zookeeper returns the domain name of the Kafka brokers to clients. この構成では、仮想ネットワーク内のエンティティに対して名前解決を使用できないため、VPN ソフトウェア クライアントは使用できません。This configuration does not work with the VPN software client, as it cannot use name resolution for entities in the virtual network. このように構成する場合は、次の手順を実行して、ドメイン名ではなく IP アドレスを提供するように Kafka を構成します。For this configuration, use the following steps to configure Kafka to advertise IP addresses instead of domain names:

  1. Web ブラウザーを使用し、https://CLUSTERNAME.azurehdinsight.net にアクセスします。Using a web browser, go to https://CLUSTERNAME.azurehdinsight.net. CLUSTERNAME を HDInsight クラスター上の Kafka の名前に置き換えます。Replace CLUSTERNAME with the name of the Kafka on HDInsight cluster.

    プロンプトが表示されたら、クラスターの HTTPS ユーザー名とパスワードを入力します。When prompted, use the HTTPS user name and password for the cluster. クラスターの Ambari Web UI が表示されます。The Ambari Web UI for the cluster is displayed.

  2. Kafka に関する情報を表示するには、左にある一覧から [Kafka] を選択します。To view information on Kafka, select Kafka from the list on the left.

    Kafka が強調表示されているサービスの一覧

  3. Kafka 構成を表示するには、上部中央の [Configs (構成)] を選択します。To view Kafka configuration, select Configs from the top middle.

    Apache Ambari サービスの構成

  4. kafka-env 構成を検索するには、右上の [Filter (フィルター)] フィールドに「kafka-env」と入力します。To find the kafka-env configuration, enter kafka-env in the Filter field on the upper right.

    kafka-env の Kafka 構成

  5. IP アドレスを提供するように Kafka を構成するには、次のテキストを kafka-env-template フィールドの最後に追加します。To configure Kafka to advertise IP addresses, add the following text to the bottom of the kafka-env-template field:

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. Kafka がリッスンするインターフェイスを構成するには、右上の [Filter (フィルター)] フィールドに「listeners」と入力します。To configure the interface that Kafka listens on, enter listeners in the Filter field on the upper right.

  7. すべてのネットワーク インターフェイスをリッスンするように Kafka を構成するには、 [listeners (リスナー)] フィールドの値を PLAINTEXT://0.0.0.0:9092に変更します。To configure Kafka to listen on all network interfaces, change the value in the listeners field to PLAINTEXT://0.0.0.0:9092.

  8. 構成を保存するには、 [Save (保存)] ボタンを使用します。To save the configuration changes, use the Save button. 変更を説明するテキスト メッセージを入力します。Enter a text message describing the changes. 変更が保存されたら、 [OK] を保存します。Select OK once the changes have been saved.

    Apache Ambari の保存の構成

  9. Kafka の再起動時にエラーが発生しないようにするため、 [Service Actions (サービス アクション) ] ボタンを使用して [Turn On Maintenance Mode (メンテナンス モードの有効化)] を選択します。To prevent errors when restarting Kafka, use the Service Actions button and select Turn On Maintenance Mode. [OK] を選択して、この操作を完了します。Select OK to complete this operation.

    [Turn On Maintenance Mode (メンテナンス モードの有効化)] が強調表示されているサービス アクション

  10. Kafka を再起動するには、 [Restart (再起動)] ボタンをクリックし、 [Restart All Affected (影響を受けるものをすべて再起動)] を選択します。To restart Kafka, use the Restart button and select Restart All Affected. 再起動を確認し、操作が完了したら [OK] ボタンを使用します。Confirm the restart, and then use the OK button after the operation has completed.

    [Restart All Affected (影響を受けるものをすべて再起動)] が強調表示されている [Restart (再起動)] ボタン

  11. メンテナンス モードを無効にするには、 [Service Actions (サービス アクション)] ボタンをクリックし、 [Turn Off Maintenance Mode (メンテナンス モードの無効化)] を選択します。To disable maintenance mode, use the Service Actions button and select Turn Off Maintenance Mode. [OK] を選択して、この操作を完了します。Select OK to complete this operation.

VPN ゲートウェイに接続するConnect to the VPN gateway

VPN ゲートウェイに接続するには、ポイント対サイト接続の構成に関するドキュメントの「Azure への接続」セクションに従います。To connect to the VPN gateway, use the Connect to Azure section of the Configure a Point-to-Site connection document.

例:Python クライアントExample: Python client

Kafka への接続を検証するには、次の手順に従って Python プロデューサーとコンシューマーを作成します。To validate connectivity to Kafka, use the following steps to create and run a Python producer and consumer:

  1. 次のいずれかの方法により、Kafka クラスター内のノードの完全修飾ドメイン名 (FQDN) と IP アドレスを取得します。Use one of the following methods to retrieve the fully qualified domain name (FQDN) and IP addresses of the nodes in the Kafka cluster:

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    このスクリプトは、$resourceGroupName が仮想ネットワークを含む Azure リソース グループの名前であることを前提としています。This script assumes that $resourceGroupName is the name of the Azure resource group that contains the virtual network.

    返された情報を、次の手順で使用するために保存します。Save the returned information for use in the next steps.

  2. 次のコマンドを使用して、kafka-python クライアントをインストールします。Use the following to install the kafka-python client:

    pip install kafka-python
    
  3. データを Kafka に送信するには、次の Python コードを使用します。To send data to Kafka, use the following Python code:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
       producer.send('testtopic', b'test message')
    

    'kafka_broker' エントリを、このセクションの手順 1 で返されたアドレスに置き換えます。Replace the 'kafka_broker' entries with the addresses returned from step 1 in this section:

    • ソフトウェア VPN クライアント を使用している場合、kafka_broker エントリはワーカー ノードの IP アドレスに置き換えます。If you are using a Software VPN client, replace the kafka_broker entries with the IP address of your worker nodes.

    • カスタム DNS サーバー経由での名前解決を有効化 している場合は、kafka_broker エントリをワーカー ノードの FQDN に置き換えます。If you have enabled name resolution through a custom DNS server, replace the kafka_broker entries with the FQDN of the worker nodes.

      注意

      このコードは、文字列 test message をトピック testtopic に送信します。This code sends the string test message to the topic testtopic. HDInsight 上の Kafka の既定の構成では、トピックが存在しない場合は作成します。The default configuration of Kafka on HDInsight is to create the topic if it does not exist.

  4. Kafka からメッセージを取得するには、次の Python コードを使用します。To retrieve the messages from Kafka, use the following Python code:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    #       of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    'kafka_broker' エントリを、このセクションの手順 1 で返されたアドレスに置き換えます。Replace the 'kafka_broker' entries with the addresses returned from step 1 in this section:

    • ソフトウェア VPN クライアント を使用している場合、kafka_broker エントリはワーカー ノードの IP アドレスに置き換えます。If you are using a Software VPN client, replace the kafka_broker entries with the IP address of your worker nodes.

    • カスタム DNS サーバー経由での名前解決を有効化 している場合は、kafka_broker エントリをワーカー ノードの FQDN に置き換えます。If you have enabled name resolution through a custom DNS server, replace the kafka_broker entries with the FQDN of the worker nodes.

次の手順Next steps

仮想ネットワークでの HDInsight の使用については、Azure HDInsight クラスター用の仮想ネットワーク デプロイの計画に関するドキュメントを参照してください。For more information on using HDInsight with a virtual network, see the Plan a virtual network deployment for Azure HDInsight clusters document.

ポイント対サイト VPN ゲートウェイを使用する Azure Virtual Network の作成の詳細については、次のドキュメントを参照してください。For more information on creating an Azure Virtual Network with Point-to-Site VPN gateway, see the following documents:

HDInsight 上の Apache Kafka の操作の詳細については、次のドキュメントを参照してください。For more information on working with Apache Kafka on HDInsight, see the following documents: