Apache Kafka 用の Azure Event Hubs での Apache Flink の使用

このチュートリアルでは、プロトコル クライアントを変更したり、独自のクラスターを実行したりせずに、Apache Flink をイベント ハブに接続する方法について説明します。 Apache Kafka コンシューマー プロトコルの Event Hubs サポートの詳細については、Apache Kafka 用の Event Hubs に関するページを参照してください。

このチュートリアルでは、以下の内容を学習します。

  • Event Hubs 名前空間を作成します
  • サンプル プロジェクトを複製する
  • Flink プロデューサーを実行する
  • Flink コンシューマーを実行する

Note

このサンプルは GitHub で入手できます。

前提条件

このチュートリアルを完了するには、次の前提条件を満たしている必要があります。

  • Apache Kafka 用の Event Hubs に関する記事を読む。
  • Azure サブスクリプション。 お持ちでない場合は、開始する前に無料アカウントを作成してください。
  • Java Development Kit (JDK) 1.7 以降
    • Ubuntu で apt-get install default-jdk を実行して JDK をインストールします。
    • 必ず、JDK のインストール先フォルダーを指すように JAVA_HOME 環境変数を設定してください。
  • Maven バイナリ アーカイブのダウンロードインストール
    • Ubuntu で apt-get install maven を実行して Maven をインストールします。
  • Git
    • Ubuntu で sudo apt-get install git を実行して Git をインストールします。

Event Hubs 名前空間を作成します

Event Hubs サービスとの間で送受信を行うには、Event Hubs 名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 後で使うので、イベント ハブの接続文字列をコピーしておきます。

サンプル プロジェクトを複製する

Event Hubs の接続文字列を入手したので、Kafka 用 Azure Event Hubs リポジトリをクローンし、flink サブフォルダーに移動します。

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

提供された Flink プロデューサーの例を使用して、Event Hubs サービスにメッセージを送信します。

Event Hubs Kafka エンドポイントを指定する

producer.config

producer/src/main/resources/producer.configbootstrap.servers 値と sasl.jaas.config 値を更新し、正しい認証を使用してプロデューサーを Event Hubs Kafka エンドポイントに転送します。

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX"; などがあります。

コマンド ラインからプロデューサーを実行する

コマンドラインからプロデューサーを実行するには、JAR を生成し、Maven 内から実行します (または、Maven を使用して JAR を生成し、必要な Kafka JAR をクラスパスに追加することによって、Java 内で実行します)。

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

次にプロデューサーは、トピック test にあるイベント ハブへのイベントの送信を開始し、それらのイベントを stdout に出力します。

提供されたコンシューマーの例を使用して、イベント ハブからメッセージを受信します。

Event Hubs Kafka エンドポイントを指定する

consumer.config

consumer/src/main/resources/consumer.configbootstrap.servers 値と sasl.jaas.config 値を更新し、正しい認証を使用してコンシューマーを Event Hubs Kafka エンドポイントに転送します。

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX"; などがあります。

コマンド ラインからコンシューマーを実行する

コマンドラインからコンシューマーを実行するには、JAR を生成し、Maven 内から実行します (または、Maven を使用して JAR を生成し、必要な Kafka JAR をクラスパスに追加することによって、Java 内で実行します)。

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

Event Hubs にイベントがある場合 (たとえば、プロデューサーも実行されている場合)、コンシューマーはトピック test からのイベントの受信を開始します。

Flink を Kafka に接続する方法についての詳細は、Flink の Kafka コネクタ ガイドを調べてください。

次のステップ

Kafka 用 Event Hubs の詳細については、次の記事を参照してください。