Posílání událostí a přijímání událostí z Center událostí pomocí Pythonu (Azure-eventhub)
V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat z něj události pomocí balíčku Pythonu pro Azure-eventhub .
Požadavky
Pokud s Azure Event Hubs teprve začínáte, přečtěte si téma přehled Event Hubs před provedením tohoto rychlého startu.
K dokončení tohoto rychlého startu potřebujete následující požadavky:
Microsoft Azure předplatné. Pokud chcete používat služby Azure, včetně Azure Event Hubs, potřebujete předplatné. Pokud nemáte existující účet Azure, můžete si zaregistrovat bezplatnou zkušební verzi nebo využít výhody pro předplatitele MSDN při vytváření účtu.
Python 2,7 nebo 3,6 nebo novější, s nainstalovaným a aktualizovaným PIP
Balíček Pythonu pro Event Hubs.
Chcete-li nainstalovat balíček, spusťte tento příkaz na příkazovém řádku, který má Python v cestě:
pip install azure-eventhubNainstalujte následující balíček pro příjem událostí pomocí úložiště objektů BLOB v Azure jako úložiště kontrolního bodu:
pip install azure-eventhub-checkpointstoreblob-aioVytvoří obor názvů Event Hubs a centrum událostí. Prvním krokem je použití Azure Portal k vytvoření oboru názvů typu Event Hubs a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku. Pak Získejte připojovací řetězec pro obor názvů Event Hubs podle pokynů uvedených v článku získání připojovacího řetězce. Připojovací řetězec použijete později v tomto rychlém startu.
Odesílání událostí
V této části vytvoříte skript v jazyce Python, který bude odesílat události do centra událostí, které jste vytvořili dříve.
otevřete oblíbený editor pythonu, například Visual Studio Code.
Vytvořte skript s názvem Send.py. Tento skript pošle dávku událostí do centra událostí, které jste vytvořili dříve.
Vložte následující kód do Send.py:
import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData async def run(): # Create a producer client to send messages to the event hub. # Specify a connection string to your event hubs namespace and # the event hub name. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData('First event ')) event_data_batch.add(EventData('Second event')) event_data_batch.add(EventData('Third event')) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) loop = asyncio.get_event_loop() loop.run_until_complete(run())Poznámka
úplný zdrojový kód, včetně informativních komentářů, přejdete na stránku GitHub send_async. py.
Příjem událostí
V tomto rychlém startu se jako úložiště kontrolního bodu používá úložiště objektů blob Azure. Úložiště kontrolního bodu se používá k trvalému kontrolnímu bodu (to znamená poslední pozice pro čtení).
Upozornění
pokud spustíte tento kód v Azure Stackovém centru, dojde k chybám za běhu, pokud necílíte na konkrétní verzi rozhraní API Storage. důvodem je, že sada Event Hubs SDK používá nejnovější dostupné rozhraní API Azure Storage dostupné v Azure, které nemusí být k dispozici na vaší platformě služby Azure Stack Hub. centrum Azure Stack může podporovat jinou verzi sady Storage SDK objektů Blob, než jaká jsou běžně dostupná v Azure. pokud používáte Blog blogu Azure Storage jako úložiště kontrolního bodu, podívejte se na podporovanou verzi rozhraní API Azure Storage pro sestavení centra Azure Stack a cílete na verzi v kódu.
pokud například používáte v Azure Stack centra verze 2005, nejvyšší dostupná verze služby Storage je verze 2019-02-02. Ve výchozím nastavení používá Klientská knihovna Event Hubs SDK nejvyšší dostupnou verzi v Azure (2019-07-07 v době vydání sady SDK). v takovém případě, kromě následujících kroků v této části, budete také muset přidat kód pro cílení na rozhraní API služby Storage service verze 2019-02-02. příklad, jak cílit na konkrétní verzi rozhraní Storage API, najdete v článku synchronní a asynchronní ukázky v GitHub.
Vytvoření účtu služby Azure Storage a kontejneru objektů BLOB
Pomocí následujících kroků vytvořte účet úložiště Azure a kontejner objektů BLOB.
- Vytvoření účtu Azure Storage
- Vytvoření kontejneru objektů blob
- Získání připojovacího řetězce k účtu úložiště
Nezapomeňte si poznamenejte připojovací řetězec a název kontejneru pro pozdější použití v kódu pro příjem.
Vytvoření skriptu v jazyce Python pro příjem událostí
V této části vytvoříte skript v jazyce Python pro příjem událostí z centra událostí:
otevřete oblíbený editor pythonu, například Visual Studio Code.
Vytvořte skript s názvem recv.py.
Vložte následující kód do recv.py:
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore async def on_event(partition_context, event): # Print the event data. print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id)) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME") # Create a consumer client for the event hub. client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store) async with client: # Call the receive method. Read from the beginning of the partition (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") if __name__ == '__main__': loop = asyncio.get_event_loop() # Run the main method. loop.run_until_complete(main())Poznámka
úplný zdrojový kód včetně dalších informativních komentářů najdete na stránce GitHub recv_with_checkpoint_store_async. py.
Spuštění aplikace příjemce
Chcete-li spustit skript, otevřete příkazový řádek, který má v cestě Python, a pak spusťte tento příkaz:
python recv.py
Spuštění aplikace odesílatele
Chcete-li spustit skript, otevřete příkazový řádek, který má v cestě Python, a pak spusťte tento příkaz:
python send.py
V okně přijímače by se měly zobrazit zprávy, které byly odeslány do centra událostí.
Další kroky
V tomto rychlém startu jste události odeslali a přijali asynchronně. pokud se chcete dozvědět, jak odesílat a přijímat události synchronně, navštivte stránku GitHub sync_samples.
u všech ukázek (synchronních i asynchronních) v GitHub jděte do části Azure Event Hubs client library for Python samples.