Python (azure-eventhub) を使用して Event Hubs との間でイベントを送受信する

このクイックスタートでは、azure-eventhub Python パッケージを使用して、イベント ハブとの間でイベントを送受信する方法について説明します。

前提条件

Azure Event Hubs を初めて使用する場合は、このクイックスタートを行う前に Event Hubs の概要を参照してください。

このクイック スタートを完了するには、次の前提条件を用意しておく必要があります。

  • Microsoft Azure サブスクリプション。 Azure Event Hubs を含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Microsoft Azure アカウントをお持ちでない場合は、アカウントを作成する際に、無料試用版にサインアップするか、MSDN サブスクライバー特典を利用できます。

  • Python 2.7 または 3.6 以降 (PIP がインストールおよび更新されている)。

  • Event Hubs 用の Python パッケージ。

    パッケージをインストールするには、コマンド プロンプトで Python をパス設定して、次のコマンドを実行します。

    pip install azure-eventhub
    

    Azure Blob Storage をチェックポイント ストアとして使用してイベントを受信するには、次のパッケージをインストールします。

    pip install azure-eventhub-checkpointstoreblob-aio
    
  • Event Hubs 名前空間とイベント ハブを作成する。 最初の手順では、Azure Portal を使用して Event Hubs 型の名前空間を作成し、アプリケーションがイベント ハブと通信するために必要な管理資格情報を取得します。 名前空間とイベント ハブを作成するには、こちらの記事の手順に従います。 その後、次の記事の手順に従って、Event Hubs 名前空間用の接続文字列を取得します: 接続文字列を取得する。 この接続文字列は、このクイックスタートの後の手順で必要になります。

送信イベント

このセクションでは、前に作成したイベント ハブにイベントを送信する Python スクリプトを作成します。

  1. Visual Studio Codeなど、お使いの Python エディターを開きます。

  2. send.py という名前のスクリプトを作成します。 このスクリプトは、前に作成したイベント ハブにイベントのバッチを送信します。

  3. 次のコードを send.py に貼り付けます。

    import asyncio
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
        # the event hub name.
        producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData('First event '))
            event_data_batch.add(EventData('Second event'))
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    
    

    注意

    情報提供を目的としたコメントを含む完全なソース コードについては、GitHub の send_async.py ページにアクセスしてください。

受信イベント

このクイックスタートでは、Azure Blob Storage をチェックポイント ストアとして使用します。 チェックポイント ストアは、チェックポイント (最後の読み取り位置) を保持するために使用されます。

警告

このコードを Azure Stack Hub で実行すると、特定の Storage API バージョンを対象としている場合を除き、実行時エラーが発生します。 これは、Event Hubs SDK では、Azure で利用できる最新の Azure Storage API が使用されますが、Azure Stack Hub プラットフォームではこれを利用できない可能性があるためです。 Azure Stack Hub でサポートされる Storage Blob SDK のバージョンは、Azure で一般的に利用できるバージョンと異なる場合があります。 チェックポイント ストアとして Azure Blob Storage を使用している場合は、Azure Stack Hub ビルドでサポートされている Azure Storage API バージョンを確認し、コード内でそのバージョンを対象にします。

たとえば、Azure Stack Hub バージョン 2005 上で実行している場合、Storage サービスで利用できる最も高いバージョンは 2019-02-02 となります。 既定では、Event Hubs SDK クライアント ライブラリでは、Azure で利用できる最も高いバージョン (SDK のリリース時点では 2019-07-07) が使用されます。 この場合は、このセクションの手順に従うことに加え、Storage サービス API バージョン 2019-02-02 を対象とするコードを追加する必要もあります。 特定の Storage API バージョンを対象にする方法の例については、GitHub の同期版のサンプルと非同期版のサンプルを参照してください。

Azure Storage アカウントと BLOB コンテナーを作成する

次の手順に従って、Azure Storage アカウントと BLOB コンテナーを作成します。

  1. Azure ストレージ アカウントの作成
  2. BLOB コンテナーを作成する
  3. ストレージ アカウントへの接続文字列を取得する

接続文字列とコンテナー名を記録しておいてください。後から受信コードで使用します。

イベントを受信する Python スクリプトの作成

このセクションでは、イベント ハブからイベントを受信する Python スクリプトを作成します。

  1. Visual Studio Codeなど、お使いの Python エディターを開きます。

  2. recv.py という名前のスクリプトを作成します。

  3. 次のコードを recv.py に貼り付けます。

    import asyncio
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
    
    
    async def on_event(partition_context, event):
        # Print the event data.
        print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
        async with client:
            # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
            await client.receive(on_event=on_event,  starting_position="-1")
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        # Run the main method.
        loop.run_until_complete(main())    
    

    注意

    情報提供を目的とした補足コメントを含む完全なソース コードについては、GitHub の recv_with_checkpoint_store_async.py ページにアクセスしてください。

受信側アプリを実行する

スクリプトを実行するには、Python をパス設定した状態でコマンド プロンプトを開き、次のコマンドを実行します。

python recv.py

送信側アプリを実行する

スクリプトを実行するには、Python をパス設定した状態でコマンド プロンプトを開き、次のコマンドを実行します。

python send.py

イベント ハブに送信されたメッセージが受信側ウィンドウに表示されます。

次のステップ

このクイックスタートでは、イベントを非同期的に送受信しました。 イベントを同期的に送受信する方法については、GitHub の sync_samples ページにあるサンプルを参照してください。

GitHub にあるすべての (同期と非同期の両方の) サンプルについては、Python 用 Azure Event Hubs クライアント ライブラリ サンプル ページにアクセスしてください。