Skicka händelser till eller ta emot händelser från händelsehubbar med hjälp av Python (azure-eventhub)
Den här snabbstarten visar hur du skickar händelser till och tar emot händelser från en händelsehubb med hjälp av Python-paketet azure-eventhub.
Förutsättningar
Om du inte har Azure Event Hubs bör du Event Hubs översikt innan du gör den här snabbstarten.
För att slutföra den här snabbstarten, behöver du följande förhandskrav:
Microsoft Azure prenumeration. Om du vill använda Azure-Azure Event Hubs måste du ha en prenumeration. Om du inte har ett befintligt Azure-konto kan du registrera dig för en kostnadsfri utvärderingsversion eller använda dina MSDN-prenumerantförmåner när du skapar ett konto.
Python 2.7 eller 3.6 eller senare, med PIP installerat och uppdaterat.
Python-paketet för Event Hubs.
Installera paketet genom att köra det här kommandot i en kommandotolk som har Python i sin sökväg:
pip install azure-eventhubInstallera följande paket för att ta emot händelserna med hjälp av Azure Blob Storage som kontrollpunktslager:
pip install azure-eventhub-checkpointstoreblob-aioSkapa en Event Hubs och en händelsehubb. Det första steget är att använda Azure Portal för att skapa ett namnområde av typen Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln. Hämta sedan anslutningssträngen för Event Hubs namnområdet genom att följa anvisningarna i artikeln: Hämta anslutningssträngen. Du använder anslutningssträngen senare i den här snabbstarten.
Skicka händelser
I det här avsnittet skapar du ett Python-skript för att skicka händelser till den händelsehubb som du skapade tidigare.
Öppna din Python-favoritredigerare, till exempel Visual Studio Code.
Skapa ett skript med namnet send.py. Det här skriptet skickar en batch med händelser till händelsehubben som du skapade tidigare.
Klistra in följande kod i send.py:
import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData async def run(): # Create a producer client to send messages to the event hub. # Specify a connection string to your event hubs namespace and # the event hub name. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData('First event ')) event_data_batch.add(EventData('Second event')) event_data_batch.add(EventData('Third event')) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) loop = asyncio.get_event_loop() loop.run_until_complete(run())Anteckning
Den fullständiga källkoden, inklusive informationskommentarer, finns på sidan GitHub send_async.py.
Ta emot händelser
I den här snabbstarten används Azure Blob Storage som kontrollpunktslager. Kontrollpunktslagret används för att bevara kontrollpunkter (det vill säga de senaste läspositionerna).
Varning
Om du kör den här koden Azure Stack Hub får du körningsfel om du inte riktar in dig på en Storage API-version. Det beror på att Event Hubs SDK använder det senaste tillgängliga AZURE STORAGE-API:et som är tillgängligt i Azure och som kanske inte är tillgängligt på din Azure Stack Hub plattform. Azure Stack Hub kan ha stöd för en annan version av Storage Blob SDK än de som vanligtvis är tillgängliga i Azure. Om du använder Azure Blog Storage som ett kontrollpunktslager kontrollerar du vilken Azure Storage API-version som stöds för din Azure Stack Hub-version och riktar in dig på den versionen i koden.
Om du till exempel kör Azure Stack Hub version 2005 är den högsta tillgängliga versionen för Storage-tjänsten version 2019-02-02. Som standard använder Event Hubs SDK-klientbiblioteket den högsta tillgängliga versionen i Azure (2019-07-07 vid tidpunkten för SDK-versionen). I det här fallet måste du, förutom att följa stegen i det här avsnittet, även lägga till kod för api-versionen 2019-02-02 för Storage-tjänstens API. Ett exempel på hur du riktar in dig på en Storage API-version finns i de synkrona och asynkrona exemplen på GitHub.
Skapa ett Azure Storage-konto och en blobcontainer
Skapa ett Azure Storage-konto och en blobcontainer i det genom att göra följande:
Se till att registrera anslutningssträngen och containernamnet för senare användning i mottagningskoden.
Skapa ett Python-skript för att ta emot händelser
I det här avsnittet skapar du ett Python-skript för att ta emot händelser från din händelsehubb:
Öppna din Python-favoritredigerare, till exempel Visual Studio Code.
Skapa ett skript med namnet recv.py.
Klistra in följande kod i recv.py:
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore async def on_event(partition_context, event): # Print the event data. print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id)) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME") # Create a consumer client for the event hub. client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store) async with client: # Call the receive method. Read from the beginning of the partition (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") if __name__ == '__main__': loop = asyncio.get_event_loop() # Run the main method. loop.run_until_complete(main())Anteckning
Den fullständiga källkoden, inklusive ytterligare informationskommentarer, finns på sidan GitHub recv_with_checkpoint_store_async.py.
Kör mottagarappen
Om du vill köra skriptet öppnar du en kommandotolk med Python i sökvägen och kör sedan följande kommando:
python recv.py
Kör avsändarappen
Om du vill köra skriptet öppnar du en kommandotolk med Python i sökvägen och kör sedan följande kommando:
python send.py
Mottagarfönstret ska visa de meddelanden som skickades till händelsehubben.
Nästa steg
I den här snabbstarten har du skickat och tagit emot händelser asynkront. Information om hur du skickar och tar emot händelser synkront finns på GitHub sync_samples sidan.
För alla exempel (både synkrona och asynkrona) på GitHub går du till Azure Event Hubs för Python-exempel.