Senden von Ereignissen an oder Empfangen von Ereignissen von Event Hubs mithilfe von Python

In dieser Schnellstartanleitung erfahren Sie, wie Sie mithilfe des Python-Pakets azure-eventhub Ereignisse an einen Event Hub senden bzw. von dort empfangen.

Voraussetzungen

Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

  • Microsoft Azure-Abonnement. Für die Verwendung von Azure-Diensten benötigen Sie ein Abonnement. Das gilt auch für Azure Event Hubs. Wenn Sie nicht über ein Azure-Konto verfügen, registrieren Sie sich für eine kostenlose Testversion.
  • Python 3.8 oder höher mit installiertem und aktualisiertem pip
  • Visual Studio Code (empfohlen) oder eine andere integrierte Entwicklungsumgebung (Integrated Development Environment, IDE)
  • Erstellen Sie einen Event Hubs-Namespace und einen Event Hub. Verwenden Sie zunächst das Azure-Portal, um einen Event Hubs-Namespace zu erstellen, und rufen Sie damit die Verwaltungsanmeldeinformationen ab, die Ihre Anwendung benötigt, um mit der Event Hubs-Instanz zu kommunizieren. Erstellen Sie anhand der Anleitung in diesem Artikel einen Namespace und einen Event Hub.

Installieren der Pakete zum Senden von Ereignissen

Um die Python-Pakete für Event Hubs zu installieren, öffnen Sie eine Eingabeaufforderung mit Python im Pfad. Ändern Sie das Verzeichnis in den Ordner, in dem Sie Ihre Beispiele speichern möchten.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Authentifizieren der App bei Azure

In dieser Schnellstartanleitung werden zwei Möglichkeiten zum Herstellen einer Verbindung mit Azure Event Hubs gezeigt: kennwortlos und Verbindungszeichenfolge. Die erste Option zeigt Ihnen, wie Sie Ihren Sicherheitsprinzipal in Microsoft Entra ID und die rollenbasierte Zugriffssteuerung (RBAC) verwenden, um eine Verbindung zu einem Event Hubs Namespace herzustellen. Sie müssen sich keine Gedanken darüber machen, dass hartcodierte Verbindungszeichenfolgen in Ihrem Code oder in einer Konfigurationsdatei oder in einem sicheren Speicher wie Azure Key Vault vorhanden sind. Die zweite Option zeigt, wie Sie mithilfe einer Verbindungszeichenfolge eine Verbindung mit einem Event Hubs-Namespace herstellen. Wenn Sie noch nicht mit Azure vertraut sind, ist die Option mit der Verbindungszeichenfolge möglicherweise einfacher. In realen Anwendungen und Produktionsumgebungen wird die kennwortlose Option empfohlen. Weitere Informationen finden Sie unter Authentifizierung und Autorisierung. Weitere Informationen zur kennwortlosen Authentifizierung finden Sie auch auf der Übersichtsseite.

Zuweisen von Rollen zu Ihrem Microsoft Entra-Benutzer

Achten Sie bei der lokalen Entwicklung darauf, dass das Benutzerkonto, das die Verbindung mit Azure Event Hubs herstellt, über die korrekten Berechtigungen verfügt. Zum Senden und Empfangen von Nachrichten ist die Rolle Azure Event Hubs-Datenbesitzer erforderlich. Um sich selbst diese Rolle zuweisen zu können, benötigen Sie die Rolle „Benutzerzugriffsadministrator“ oder eine andere Rolle, die die Aktion Microsoft.Authorization/roleAssignments/write umfasst. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen zu den verfügbaren Bereichen für Rollenzuweisungen finden Sie auf der Seite Grundlegendes zum Bereich von Azure RBAC.

Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Azure Event Hubs Data Owner zugewiesen. Diese Rolle bietet Vollzugriff auf Azure Event Hubs-Ressourcen. Halten Sie sich in einem echten Szenario an das Prinzip der geringsten Rechte, um Benutzern nur die benötigten Mindestberechtigungen zu erteilen und so die Produktionsumgebung besser zu schützen.

In Azure integrierte Rollen für Azure Event Hubs

Bei Azure Event Hubs ist die Verwaltung der Namespaces und aller zugehörigen Ressourcen über das Azure-Portal und die Azure-Ressourcenverwaltungs-API bereits durch das Azure RBAC-Modell geschützt. Azure stellt die folgenden integrierten Azure-Rollen zum Autorisieren des Zugriffs auf einen Event Hubs-Namespace bereit:

Informationen zum Erstellen einer benutzerdefinierten Rolle finden Sie unter Erforderliche Rechte für Event Hubs-Vorgänge.

Wichtig

In der Regel dauert die Verteilung der Rollenzuweisung in Azure ein bis zwei Minuten. In seltenen Fällen kann sie aber bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.

  1. Navigieren Sie im Azure-Portal zu Ihrem Event Hubs-Namespace. Verwenden Sie dazu entweder die Hauptsuchleiste oder die linke Navigationsleiste.

  2. Wählen Sie auf der Übersichtsseite im linken Menü die Option Zugriffssteuerung (IAM) aus.

  3. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.

  4. Wählen Sie im oberen Menü + Hinzufügen und aus dem dann angezeigten Dropdownmenü die Option Rollenzuweisung hinzufügen aus.

    A screenshot showing how to assign a role.

  5. Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Azure Event Hubs Data Owner, und wählen Sie das entsprechende Ergebnis aus. Klicken Sie dann auf Weiter.

  6. Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.

  7. Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.

  8. Wählen Sie Überprüfen und zuweisen aus, um zur letzten Seite zu gelangen, und wählen Sie erneut Überprüfen und zuweisen aus, um den Vorgang abzuschließen.

Senden von Ereignisse

Erstellen Sie in diesem Abschnitt ein Python-Skript, um Ereignisse an den Event Hub zu senden, den Sie zuvor erstellt haben.

  1. Öffnen Sie Ihren bevorzugten Python-Editor, z.B. Visual Studio Code.

  2. Erstellen Sie ein Skript mit dem Namen send.py. Dieses Skript sendet einen Batch von Ereignissen an den Event Hub, den Sie zuvor erstellt haben.

  3. Fügen Sie den folgenden Code in send.py ein:

    Ersetzen Sie im Code die folgenden Platzhalter durch echte Werte:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Hinweis

    Beispiele für andere Optionen zum asynchronen Senden von Ereignissen an Event Hub mithilfe einer Verbindungszeichenfolge finden Sie auf der GitHub-Seite zu send_async.py. Die dort gezeigten Muster lassen sich auch auf das kennwortlose Senden von Ereignissen anwenden.

Empfangen von Ereignissen

In dieser Schnellstartanleitung wird Azure Blob Storage als Prüfpunktspeicher verwendet. Der Prüfpunktspeicher wird verwendet, um Prüfpunkte (d. h. die letzte Leseposition) beizubehalten.

Befolgen Sie diese Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:

  • Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
  • Verwenden Sie weder den Container noch das Speicherkonto für andere Zwecke.
  • Das Speicherkonto sollte sich in derselben Region befinden, in der sich die bereitgestellte Anwendung befindet. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.

Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.

  • Hierarchischer Namespace
  • Vorläufiges Löschen von Blobs
  • Versionsverwaltung

Erstellen eines Azure-Speicherkontos und eines Blobcontainers

Führen Sie die folgenden Schritte aus, um ein Azure-Speicherkonto und einen darin befindlichen Blobcontainer zu erstellen:

  1. Erstellen Sie ein Azure Storage-Konto.
  2. Erstellen Sie einen Blobcontainer.
  3. Authentifizieren Sie sich beim Blobcontainer.

Notieren Sie sich die Verbindungszeichenfolge und den Containernamen zur späteren Verwendung im Empfangscode.

Stellen Sie beim lokalen Entwickeln sicher, dass das Benutzerkonto, das auf Blobdaten zugreift, die richtigen Berechtigungen hat. Sie benötigen die Berechtigung Mitwirkender an Storage-Blobdaten zum Lesen und Schreiben von Blobdaten. Um sich selbst diese Rolle zuweisen zu können, benötigen Sie die Rolle Benutzerzugriffsadministrator oder eine andere Rolle, die die Aktion Microsoft.Authorization/roleAssignments/write enthält. Sie können einem Benutzer Azure RBAC-Rollen über das Azure-Portal, die Azure CLI oder mit Azure PowerShell zuweisen. Weitere Informationen zu den verfügbaren Bereichen für Rollenzuweisungen finden Sie auf der Seite Bereichsübersicht.

In diesem Szenario weisen Sie Ihrem Benutzerkonto Berechtigungen zu, die auf das Speicherkonto zugeschnitten sind, um dem Prinzip der geringsten Rechte zu folgen. Auf diese Weise erhalten Benutzer nur die erforderlichen Mindestberechtigungen, und es entstehen sicherere Produktionsumgebungen.

Im folgenden Beispiel wird Ihrem Benutzerkonto die Rolle Mitwirkender an Storage-Blobdaten zugewiesen, die sowohl Lese- als auch Schreibzugriff auf Blobdaten in Ihrem Speicherkonto ermöglicht.

Wichtig

In den meisten Fällen dauert es eine oder zwei Minute(n), bis die Rollenzuweisung in Azure weitergegeben wird. In seltenen Fällen kann es aber bis zu acht Minuten dauern. Wenn bei der ersten Ausführung Ihres Codes Authentifizierungsfehler auftreten, warten Sie einige Momente, und versuchen Sie es dann erneut.

  1. Suchen Sie im Azure-Portal Ihr Speicherkonto mithilfe der Hauptsuchleiste oder der linken Navigationsleiste.

  2. Wählen Sie auf der Übersichtsseite des Speicherkontos im linken Menü die Option Zugriffssteuerung (IAM) aus.

  3. Wählen Sie auf der Seite Zugriffssteuerung (IAM) die Registerkarte Rollenzuweisungen aus.

  4. Wählen Sie im oberen Menü + Hinzufügen und aus dem dann angezeigten Dropdownmenü die Option Rollenzuweisung hinzufügen aus.

    A screenshot showing how to assign a storage account role.

  5. Über das Suchfeld können Sie die Ergebnisse für die gewünschte Rolle filtern. Suchen Sie in diesem Beispiel nach Mitwirkender an Speicherblobdaten, wählen Sie das entsprechende Ergebnis und dann Weiter aus.

  6. Wählen Sie unter Zugriff zuweisen zu die Option Benutzer, Gruppe oder Dienstprinzipal und dann die Option + Mitglieder auswählen aus.

  7. Suchen Sie im Dialogfeld nach Ihrem Microsoft Entra-Benutzernamen (normalerweise Ihre E-Mail-Adresse benutzer@domäne), und wählen Sie unten im Dialogfeld Auswählen aus.

  8. Wählen Sie Überprüfen und zuweisen aus, um zur letzten Seite zu gelangen, und wählen Sie erneut Überprüfen und zuweisen aus, um den Vorgang abzuschließen.

Installieren der Pakete zum Empfangen von Ereignissen

Für die empfangende Seite muss mindestens ein weiteres Paket installiert werden. In dieser Schnellstartanleitung wird Azure Blob Storage zum Speichern von Prüfpunkten verwendet, damit das Programm die bereits gelesenen Ereignisse nicht erneut liest. In einem Blob werden für empfangene Nachrichten in regelmäßigen Abständen Metadatenprüfpunkte erstellt. Dadurch kann der Empfang von Nachrichten später problemlos an der Stelle fortgesetzt werden, an der Sie aufgehört haben.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Erstellen eines Python-Skripts zum Empfangen von Ereignissen

In diesem Abschnitt erstellen Sie ein Python-Skript, um Ereignisse aus ihrem Event Hub zu empfangen:

  1. Öffnen Sie Ihren bevorzugten Python-Editor, z.B. Visual Studio Code.

  2. Erstellen Sie ein Skript mit dem Namen recv.py.

  3. Fügen Sie den folgenden Code in recv.py ein:

    Ersetzen Sie im Code die folgenden Platzhalter durch echte Werte:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Hinweis

    Beispiele für andere Optionen zum asynchronen Empfangen von Ereignissen von Event Hub mithilfe einer Verbindungszeichenfolge finden Sie auf der GitHub-Seite zu recv_with_checkpoint_store_async.py. Die dort gezeigten Muster lassen sich auch auf das kennwortlose Empfangen von Ereignissen anwenden.

Ausführen der Empfänger-App

Um das Skript auszuführen, öffnen Sie eine Eingabeaufforderung, deren Pfad Python enthält, und führen Sie dann diesen Befehl aus:

python recv.py

Ausführen der Sender-App

Um das Skript auszuführen, öffnen Sie eine Eingabeaufforderung, deren Pfad Python enthält, und führen Sie dann diesen Befehl aus:

python send.py

Die Nachrichten, die an den Event Hub gesendet wurden, sollten im Empfängerfenster angezeigt werden.

Problembehandlung

Wenn im Empfängerfenster keine Ereignisse angezeigt werden oder der Code einen Fehler meldet, probieren Sie die folgenden Tipps zur Problembehandlung aus:

  • Wenn keine Ergebnisse aus recy.py angezeigt werden, führen Sie send.py mehrmals aus.

  • Wenn bei Verwendung des kennwortlosen Codes (mit Anmeldeinformationen) Fehler zu „coroutine“ angezeigt werden, stellen Sie sicher, dass Sie den Import aus azure.identity.aio verwenden.

  • Wenn bei Verwendung des kennwortlosen Codes (mit Anmeldeinformationen) Fehler „Nicht geschlossene Clientsitzung“ angezeigt wird, stellen Sie sicher, dass Sie die Sitzung schließen, wenn Sie fertig sind. Weitere Informationen finden Sie unter Asynchrone Anmeldeinformationen.

  • Wenn beim Zugriff auf den Speicher Autorisierungsfehler bei recv.py angezeigt werden, stellen Sie sicher, dass Sie die Schritte unter Erstellen eines Azure-Speicherkontos und eines Blobcontainers ausgeführt und dem Dienstprinzipal die Rolle Mitwirkender an Storage-Blobdaten zugewiesen haben.

  • Wenn Sie Ereignisse mit unterschiedlichen Partitions-IDs erhalten, wird dieses Ergebnis erwartet. Partitionen sind ein Mechanismus zum Organisieren von Daten, der sich auf die erforderliche Downstreamparallelität in verarbeitenden Anwendungen bezieht. Die Anzahl der Partitionen in einem Event Hub steht in direktem Zusammenhang mit der erwarteten Anzahl von gleichzeitigen Lesern. Weitere Informationen finden Sie unter Weitere Informationen zu Partitionen.

Nächste Schritte

In dieser Schnellstartanleitung haben Sie Ereignisse asynchron gesendet und empfangen. Beispiele zum synchronen Senden und Empfangen von Ereignissen finden Sie auf der GitHub-Seite „sync_samples“.

Alle (synchronen und asynchronen) Beispiele auf GitHub finden Sie in der Azure Event Hubs-Clientbibliothek für Python-Beispiele.