使用 Python 將事件傳送至事件中樞或從中接收事件

本快速入門說明如何使用 azure-eventhub Python 套件,來傳送事件至事件中樞和從事件中樞接收事件。

必要條件

如果您對 Azure 事件中樞並不熟悉,在進行此快速入門之前,請先參閱事件中樞概述

若要完成本快速入門,您必須符合下列必要條件:

  • Microsoft Azure 訂用帳戶。 若要使用 Azure 服務 (包括 Azure 事件中樞),您需要訂用帳戶。 如果您沒有現有 Azure 帳戶,可以註冊免費試用
  • Python 3.8 或更新版本,且已安裝並更新 pip。
  • Visual Studio Code (建議) 或任何其他整合式開發環境 (IDE)。
  • 建立事件中樞命名空間和事件中樞。 第一個步驟是使用 Azure 入口網站來建立「事件中樞」命名空間,然後取得您應用程式與事件中樞進行通訊所需的管理認證。 若要建立命名空間和事件中樞,請依照這篇文章中的程序操作。

安裝套件以傳送事件

若要為事件中樞安裝 Python 套件,請開啟在其路徑中有 Python 的命令提示字元。 將目錄變更為您要保留範例的資料夾。

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

向 Azure 驗證應用程式

本快速入門顯示兩種連線到 Azure 事件中樞的方式:無密碼和連接字串。 第一個選項如何使用 Microsoft Entra ID 和顯示角色型存取控制 (RBAC) 中的安全性主體來連線到事件中樞命名空間。 您不需要擔心在程式碼或設定檔或 Azure Key Vault 等安全儲存體中,有硬式編碼連接字串。 第二個選項顯示如何使用連接字串來連線到事件中樞命名空間。 如果您不熟悉 Azure,則連接字串選項可能會更容易遵循。 建議在真實世界應用程式和實際執行環境中使用無密碼選項。 如需詳細資訊,請參閱驗證與授權。 您也可以在概觀頁面上,深入了解無密碼驗證。

將角色指派給 Microsoft Entra 使用者

在本機開發時,請確定連線到 Azure 事件中樞的使用者帳戶具有正確的權限。 您需要 Azure 事件中樞資料擁有者角色,才能傳送和接收訊息。 若要將此角色指派給您自己,您需要使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write 動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上,深入了解角色指派的可用範圍。

下列範例會將 Azure Event Hubs Data Owner 角色指派給您的使用者帳戶,該角色提供對 Azure 事件中樞資源的完整存取權。 在實際案例中,遵循最低權限原則,只為使用者提供更安全實際執行環境所需的最低權限。

Azure 事件中樞的 Azure 內建角色

對於 Azure 事件中樞來說,透過 Azure 入口網站和 Azure 資源管理 API 來管理的命名空間和所有相關資源,皆已使用 Azure RBAC 模型來加以保護。 Azure 提供下列 Azure 內建角色,以授權存取事件中樞命名空間:

如果您想要建立自訂角色,請參閱事件中樞作業所需的權限

重要

在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘。 在罕見的情況下,可能需要高達八分鐘的時間。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 在 Azure 入口網站中,使用主要搜尋列或左側導覽找出您的事件中樞命名空間。

  2. 在概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]

    A screenshot showing how to assign a role.

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋 Azure Event Hubs Data Owner 並選取相符的結果。 接著,選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

傳送事件

在本節中,建立 Python 指令碼來將事件傳送至您稍早建立的事件中樞。

  1. 開啟您慣用的 Python 編輯器,例如 Visual Studio 程式碼

  2. 建立名為 send.py 的指令碼。 此指令碼會將一批事件傳送至您稍早建立的事件中樞。

  3. 將下列程式碼貼到 send.py 中:

    在程式碼中,請使用實際值來取代下列預留位置:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    注意

    如需使用連接字串以異步方式將事件傳送至事件中樞的其他選項範例,請參閱 GitHub send_async.py page 頁面。 顯示的模式也適用於傳送無密碼事件。

接收事件

本快速入門會使用 Azure Blob 儲存體作為檢查點存放區。 檢查點存放區的功用是保存檢查點 (也就是最後一個讀取位置)。

使用 Azure Blob 儲存體作為檢查點存放區時,請遵循下列建議:

  • 針對每個取用者群組使用不同的容器。 您可以使用相同的儲存體帳戶,但每個群組各使用一個容器。
  • 請勿將容器用於其他任何項目,也不會將儲存體帳戶用於其他任何項目。
  • 儲存體帳戶應位於與已部署應用程式所在的相同區域中。 如果應用程式是內部部署,請嘗試選擇最接近的區域。

在 Azure 入口網站的 [儲存體帳戶] 頁面上,於 [Blob 服務] 區段中,確定已停用下列設定。

  • 階層式命名空間
  • Blob 虛刪除
  • 版本控制

建立 Azure 儲存體帳戶和 Blob 容器

若要建立 Azure 儲存體帳戶及其中的 Blob 容器,請執行下列步驟:

  1. 建立 Azure 儲存體帳戶
  2. 建立 Blob 容器
  3. 向 Blob 容器進行驗證。

請務必記下連接字串和容器名稱,以便稍後在接收程式碼中使用。

在本機開發時,請確定存取 Blob 資料的使用者帳戶具有正確的權限。 您需要儲存體 Blob 資料參與者才能讀取和寫入 Blob 資料。 若要指派此角色給您自己,您需要被指派使用者存取管理員角色,或另一個包含 Microsoft.Authorization/roleAssignments/write 動作的角色。 您可以使用 Azure 入口網站、Azure CLI 或 Azure PowerShell,將 Azure RBAC 角色指派給使用者。 您可以在範圍概觀頁面上深入了解角色指派的可用範圍。

在此案例中,您會將權限指派給使用者帳戶 (以儲存體帳戶為範圍),以遵循最低權限原則。 此做法只為使用者提供所需的最低權限,並建立更安全的實際執行環境。

下列範例將儲存體 Blob 資料參與者角色指派給使用者帳戶,以針對儲存體帳戶中的 Blob 資料提供讀取和寫入存取權。

重要

在大部分情況下,角色指派在 Azure 中傳播只需要一兩分鐘,但在罕見情況下,可能需要長達八分鐘。 如果您第一次執行程式碼時收到驗證錯誤,請稍候片刻再試一次。

  1. 在 Azure 入口網站中,使用主要搜尋列或左側導覽找出您的儲存體帳戶。

  2. 在儲存體帳戶概觀頁面上,從左側功能表中選取 [存取控制 (IAM)]

  3. 在 [存取控制 (IAM)] 頁面上,選取 [角色指派] 索引標籤。

  4. 從頂端功能表選取 [+ 新增],然後從產生的下拉功能表中選取 [新增角色指派]

    A screenshot showing how to assign a storage account role.

  5. 使用搜尋方塊,從結果篩選出所需的角色。 在此範例中,搜尋「儲存體 Blob 資料參與者」,選取相符的結果,然後選擇 [下一步]

  6. 在 [存取權指派對象為] 下,選取 [使用者、群組或服務主體],然後選擇 [+ 選取成員]

  7. 在對話方塊中,搜尋 Microsoft Entra 使用者名稱 (通常是您的 user@domain 電子郵件地址),然後在對話方塊底部選擇 [選取]

  8. 選取 [檢閱 + 指派] 以移至最終頁面,然後再次選取 [檢閱 + 指派] 以完成此程序。

安裝套件以接收事件

針對接收端,您需要安裝一個或多個套件。 在本快速入門中,您會使用 Azure Blob 儲存體來保存檢查點,讓程式不用再讀取已讀取過的事件。 該儲存體會在 Blob 中定期對接收的訊息執行中繼資料檢查點檢查。 此方法可在稍後輕鬆地從您離開的地方繼續接收訊息。

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

建立用來接收事件的 Python 指令碼

您會在本節中建立 Python 指令碼,以從事件中樞接收事件:

  1. 開啟您慣用的 Python 編輯器,例如 Visual Studio 程式碼

  2. 建立名為 recv.py 的指令碼。

  3. 請將下列程式碼貼到 recv.py 中:

    在程式碼中,請使用實際值來取代下列預留位置:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    注意

    如需使用連接字串從事件中樞異步接收事件的其他選項範例,請參閱 GitHub recv_with_checkpoint_store_async.py page 頁面。 顯示的模式也適用於接收無密碼事件。

執行接收者應用程式

若要執行指令碼,請開啟在其路徑中有 Python 的命令提示字元,並執行下列命令︰

python recv.py

執行傳送者應用程式

若要執行指令碼,請開啟在其路徑中有 Python 的命令提示字元,並執行下列命令︰

python send.py

接收者視窗應該會顯示已傳送到事件中樞的訊息。

疑難排解

如果您在接收者視窗中沒有看到事件,或程式碼回報錯誤,請嘗試下列疑難排解提示:

  • 如果您沒有看到來自 recy.py 的結果,請執行數次 send.py

  • 如果您在使用無密碼程式碼時看到有關「協同程式」的錯誤(搭配認證),請確定您使用的是從 azure.identity.aio 匯入。

  • 如果您看到具有無密碼程式碼的「未封閉用戶端工作階段」(具有認證),請務必在完成時關閉認證。 如需詳細資訊,請參閱非同步認證

  • 如果您在存取儲存體時看到 recv.py 授權錯誤,請確定您已遵循「建立 Azure 儲存體帳戶和 Blob 容器」中的步驟,並將儲存體 Blob 資料參與者角色指派給服務主體。

  • 如果您收到具有不同資料分割標識符的事件,則預期會產生此結果。 資料分割是一種資料組織機制,與取用端應用程式所需的下游平行處理原則有關。 事件中樞內的資料分割數目,與您預期有的並行讀取器數目直接相關。 如需詳細資訊,請參閱 深入瞭解分割區

下一步

在本快速入門中,您已透過非同步方式傳送和接收事件。 若要了解如何同步傳送和接收事件,請移至 GitHub sync_samples 頁面

如需 GitHub 上的所有範例 (同步和非同步) 的資訊,請移至適用於 Python 的 Azure 事件中樞用戶端程式庫範例