Gebeurtenissen verzenden naar of gebeurtenissen ontvangen van Event Hubs met behulp van Python (azure-eventhub)

In deze quickstart ziet u hoe u gebeurtenissen kunt verzenden naar en ontvangen van een Event Hub met behulp van het Python-pakket azure-eventhub.

Vereisten

Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.

Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:

  • Microsoft Azure-abonnement. Als u Azure-services wilt gebruiken, met inbegrip van Azure Event Hubs, hebt u een abonnement nodig. Als u nog geen Azure-account hebt, kunt u zich aanmelden voor een gratis proefversie of uw voordelen als MSDN-abonnee gebruiken wanneer u een account maakt.

  • Python 2.7 of 3.6 of hoger, met PIP geïnstalleerd en bijgewerkt.

  • Het Python-pakket voor Event Hubs.

    Als u het pakket wilt installeren, voert u deze opdracht uit bij een opdrachtprompt met Python in het pad:

    pip install azure-eventhub
    

    Installeer het volgende pakket om de gebeurtenissen te ontvangen met behulp van Azure Blob-opslag als controlepuntopslag:

    pip install azure-eventhub-checkpointstoreblob-aio
    
  • Een Event Hubs-naamruimte en een Event Hub maken. In de eerste stap gebruikt u Azure Portal om een naamruimte van het type Event Hubs te maken en de beheerreferenties te verkrijgen die de toepassing nodig heeft om met de Event Hub te communiceren. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken. Haal vervolgens de verbindingsreeks voor de Event Hubs-naamruimte op door de instructies in het artikel te volgen: Verbindingstekenreeks ophalen. U gebruikt de verbindingsreeks later in deze quickstart.

Gebeurtenissen verzenden

In deze sectie maakt u een Python-script voor het verzenden van gebeurtenissen naar de Event Hub die u eerder hebt gemaakt.

  1. Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.

  2. Maak een script met de naam send.py. Met dit script wordt een batch van gebeurtenissen verzonden naar de Event Hub die u eerder hebt gemaakt.

  3. Plak de volgende code in send.py:

    import asyncio
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
        # the event hub name.
        producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData('First event '))
            event_data_batch.add(EventData('Second event'))
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    
    

    Notitie

    Ga voor de volledige broncode, inclusief informatieve opmerkingen, naar de pagina GitHub send_async.py.

Gebeurtenissen ontvangen

In deze quickstart wordt Azure Blob Storage gebruikt als controlepuntopslag. De controlepuntopslag wordt gebruikt om controlepunten te behouden (dat wil zeggen, de laatste leesposities).

Waarschuwing

Als u deze code op Azure Stack Hub uitvoert, treden er runtimefouten op tenzij u zich richt op een specifieke versie van de Storage-API. Dat komt doordat de Event Hubs-SDK de meest recente Azure Storage-API gebruikt die beschikbaar is in Azure maar die niet beschikbaar is op uw Azure Stack Hub-platform. Azure Stack Hub biedt mogelijk ondersteuning voor een andere versie van de Storage Blob-SDK dan de versies die doorgaans in Azure beschikbaar zijn. Als u Azure Blob-opslag gebruikt als controlepuntopslag, controleert u de ondersteunde versie van de Azure Storage-API voor uw build van Azure Stack Hub en stelt u die versie in uw code als doel in.

Als u bijvoorbeeld op Azure Stack Hub versie 2005 uitvoert, is de hoogste beschikbare versie van de Storage-service versie 2019-02-02. De Event Hubs-SDK-clientbibliotheek maakt standaard gebruik van de hoogste beschikbare versie op Azure (2019-07-07 op het moment van de release van de SDK). In dit geval moet u naast de volgende stappen in deze sectie ook code toevoegen om de API-versie van de Storage-service te richten op 2019-02-02. Zie de synchrone en asynchrone voorbeelden in GitHub voor een voorbeeld van het instellen van een specifieke versie van de Storage-API.

Een Azure-opslagaccount en een blobcontainer maken

Maak een Azure-opslagaccount met daarin een blobcontainer door de volgende stappen te volgen:

  1. Een Azure Storage-account maken
  2. Een blobcontainer maken
  3. De verbindingsreeks voor het opslagaccount ophalen

Zorg ervoor dat u de verbindingsreeks en de containernaam vastlegt voor later gebruik in de receive-code.

Een Python-script maken om gebeurtenissen te ontvangen

In deze sectie maakt u een Python-script om gebeurtenissen van uw Event Hub te ontvangen:

  1. Open uw favoriete Python-editor, bijvoorbeeld Visual Studio Code.

  2. Maak een script met de naam recv.py.

  3. Plak de volgende code in recv.py:

    import asyncio
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
    
    
    async def on_event(partition_context, event):
        # Print the event data.
        print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
        async with client:
            # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
            await client.receive(on_event=on_event,  starting_position="-1")
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        # Run the main method.
        loop.run_until_complete(main())    
    

    Notitie

    Ga voor de volledige broncode, inclusief aanvullende informatieve opmerkingen, naar de pagina GitHub recv_with_checkpoint_store_async.py.

De ontvangende app uitvoeren

U voert het script uit door een opdrachtprompt te openen met Python in het pad en deze opdracht uit te voeren:

python recv.py

De verzendende app uitvoeren

U voert het script uit door een opdrachtprompt te openen met Python in het pad en deze opdracht uit te voeren:

python send.py

In het ontvangstvenster worden de berichten weergegeven die naar de Event Hub zijn verzonden.

Volgende stappen

In deze quickstart hebt u gebeurtenissen asynchroon verzonden en ontvangen. Ga naar de pagina GitHub sync_samples om te leren hoe u gebeurtenissen synchroon verzendt en ontvangt.

Ga naar de Azure Event Hubs-clientbibliotheek voor Python-voorbeelden voor alle voorbeelden (zowel synchrone als asynchrone) in GitHub.