Send events to or receive events from event hubs by using Python (azure-eventhub version 5)

This quickstart shows how to send events to and receive events from an event hub using the azure-eventhub version 5 Python package.

Prerequisites

If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

To complete this quickstart, you need the following prerequisites:

  • Microsoft Azure subscription. To use Azure services, including Azure Event Hubs, you need a subscription. If you don't have an existing Azure account, you can sign up for a free trial or use your MSDN subscriber benefits when you create an account.

  • Python 2.7 or 3.5 or later, with PIP installed and updated.

  • The Python package for Event Hubs.

    To install the package, run this command in a command prompt that has Python in its path:

    pip install azure-eventhub
    

    Install the following package for receiving the events by using Azure Blob storage as the checkpoint store:

    pip install azure-eventhub-checkpointstoreblob-aio
    
  • Create an Event Hubs namespace and an event hub. The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. To create a namespace and an event hub, follow the procedure in this article. Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. You use the connection string later in this quickstart.

Send events

In this section, you create a Python script to send events to the event hub that you created earlier.

  1. Open your favorite Python editor, such as Visual Studio Code.

  2. Create a script called send.py. This script sends a batch of events to the event hub that you created earlier.

  3. Paste the following code into send.py:

    import asyncio
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
     	    # the event hub name.
        producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData('First event '))
            event_data_batch.add(EventData('Second event'))
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    
    

    Note

    For the complete source code, including informational comments, go to the GitHub send_async.py page.

Receive events

This quickstart uses Azure Blob storage as a checkpoint store. The checkpoint store is used to persist checkpoints (that is, the last read positions).

Warning

If you run this code on Azure Stack Hub, you will experience runtime errors unless you target a specific Storage API version. That's because the Event Hubs SDK uses the latest available Azure Storage API available in Azure that may not be available on your Azure Stack Hub platform. Azure Stack Hub may support a different version of Storage Blob SDK than those typically available on Azure. If you are using Azure Blog Storage as a checkpoint store, check the supported Azure Storage API version for your Azure Stack Hub build and target that version in your code.

For example, If you are running on Azure Stack Hub version 2005, the highest available version for the Storage service is version 2019-02-02. By default, the Event Hubs SDK client library uses the highest available version on Azure (2019-07-07 at the time of the release of the SDK). In this case, besides following steps in this section, you will also need to add code to target the Storage service API version 2019-02-02. For an example on how to target a specific Storage API version, see the synchronous and asynchronous samples on GitHub.

Create an Azure storage account and a blob container

Create an Azure storage account and a blob container in it by doing the following steps:

  1. Create an Azure Storage account
  2. Create a blob container
  3. Get the connection string to the storage account

Be sure to record the connection string and container name for later use in the receive code.

Create a Python script to receive events

In this section, you create a Python script to receive events from your event hub:

  1. Open your favorite Python editor, such as Visual Studio Code.

  2. Create a script called recv.py.

  3. Paste the following code into recv.py:

    import asyncio
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
    
    
    async def on_event(partition_context, event):
        # Print the event data.
        print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
        async with client:
            # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
            await client.receive(on_event=on_event,  starting_position="-1")
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        # Run the main method.
        loop.run_until_complete(main())    
    

    Note

    For the complete source code, including additional informational comments, go to the GitHub recv_with_checkpoint_store_async.py page.

Run the receiver app

To run the script, open a command prompt that has Python in its path, and then run this command:

python recv.py

Run the sender app

To run the script, open a command prompt that has Python in its path, and then run this command:

python send.py

The receiver window should display the messages that were sent to the event hub.

Next steps

In this quickstart, you've sent and received events asynchronously. To learn how to send and receive events synchronously, go to the GitHub sync_samples page.

For all the samples (both synchronous and asynchronous) on GitHub, go to Azure Event Hubs client library for Python samples.