你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 事件中心 Python 客户端库 - 版本 5.11.5

Azure 事件中心是一种高度可缩放的发布-订阅服务,每秒可引入数百万个事件并将其流式传输到多个使用者。 这样,便可以处理和分析连接的设备和应用程序生成的大量数据。 事件中心收集数据后,可以使用任何实时分析提供程序或批处理/存储适配器来检索、转换和存储数据。 如果想要详细了解Azure 事件中心,可查看:什么是事件中心

使用 Azure 事件中心客户端库可发布和使用 Azure 事件中心事件,还可以:

  • 出于商业智能和诊断目的,发出有关应用程序的遥测数据。
  • 发布有关应用程序状态的信息,相关方可能会需要观察该状态并将其视为采取措施的触发器。
  • 观察业务或其他生态系统内发生的重要操作和交互,使松散耦合的系统无需结合在一起即可相互交互。
  • 接收来自一个或多个发布者的事件,对其进行转换以更好地满足生态系统的需求,然后将转换后的事件发布到新流供使用者观察。

源代码 | 包 (PyPi) | 包 (Conda) | API 参考文档 | 产品文档 | 样品

入门

先决条件

  • Python 3.7 或更高版本。

  • Microsoft Azure 订阅:若要使用 Azure 服务(包括 Azure 事件中心),需要订阅。 如果没有现有的 Azure 帐户,可以在 创建帐户时注册免费试用版或使用 MSDN 订阅者权益。

  • 包含事件中心的事件中心命名空间:若要与Azure 事件中心交互,还需要提供命名空间和事件中心。 如果不熟悉如何创建 Azure 资源,可能需要按照使用 Azure 门户创建事件中心的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

安装包

使用 pip 安装适用于 Python 的 Azure 事件中心 客户端库:

$ pip install azure-eventhub

验证客户端

与事件中心的交互从 EventHubConsumerClient 或 EventHubProducerClient 类的实例开始。 需要主机名、SAS/AAD 凭据和事件中心名称或连接字符串来实例化客户端对象。

从 连接字符串 创建客户端:

若要使事件中心客户端库与事件中心交互,最简单的方法是使用连接字符串,该连接字符串是在创建事件中心命名空间时自动创建的。 如果不熟悉 Azure 中的共享访问策略,可能需要按照分步指南获取事件中心连接字符串

  • 方法from_connection_string将窗体Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey>和实体名称连接字符串带到事件中心实例。 可从 Azure 门户获取该连接字符串。

使用 azure-identity 库创建客户端:

或者,可以使用 Credential 对象通过 AAD 对 azure 标识包进行身份验证。

  • 上面链接的示例中演示的此构造函数采用事件中心实例的主机名和实体名称,以及实现 TokenCredential 协议的凭据。 azure-identity 包中提供了协议的实现TokenCredential。 主机名的格式为 <yournamespace.servicebus.windows.net>
  • 若要使用 提供的 azure-identity凭据类型,请安装包: pip install azure-identity
  • 此外,若要使用异步 API,必须先安装异步传输,例如 aiohttppip install aiohttp
  • 使用 Azure Active Directory 时,必须为主体分配一个允许访问事件中心的角色,例如Azure 事件中心数据所有者角色。 有关将 Azure Active Directory 授权与事件中心配合使用的详细信息,请参阅 相关文档

关键概念

  • EventHubProducerClient 是遥测数据、诊断信息、使用情况日志或其他日志数据的源,作为嵌入式设备解决方案、移动设备应用程序、在主机或其他设备上运行的游戏、某些基于客户端或服务器的业务解决方案或网站的一部分。

  • EventHubConsumerClient 从事件中心获取此类信息并对其进行处理。 处理可能涉及聚合、复杂计算和筛选。 也可能涉及以原始或转换方式分发或存储信息。 事件中心使用者通常是具有内置分析功能(如 Azure 流分析、Apache Spark 或 Apache Storm)的强大的大规模平台基础结构部件。

  • 分区是事件中心内保留的有序事件。 Azure 事件中心通过分区的使用者模式提供消息流,在此模式下,每个使用者只读取消息流的特定子集或分区。 当较新的事件到达时,它们将添加到此序列的末尾。 分区数量在创建事件中心时指定,无法更改。

  • 使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调、从自身立场独立读取流。 每个使用者组的分区上最多可以有 5 个并发读取者,但建议给定分区和使用者组配对只有一个活动的使用者。 每个活动的读取者都会从其分区接收所有事件;如果同一个分区有多个读取者,他们将接收重复的事件。

有关更多概念和更深入的讨论,请参阅: 事件中心功能。 此外,AMQP 的概念在 OASIS 高级消息队列协议 (AMQP) 版本 1.0 中进行了充分阐述。

线程安全

我们不保证 EventHubProducerClient 或 EventHubConsumerClient 是线程安全的。 不建议跨线程重用这些实例。 由正在运行的应用程序以线程安全的方式使用这些类。

数据模型类型 EventDataBatch 不是线程安全的。 它不应跨线程共享,也不应与客户端方法同时使用。

示例

以下部分提供了几个代码片段,涵盖了一些最常见的事件中心任务,包括:

检查事件中心

获取事件中心的分区 ID。

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

将事件发布到事件中心

create_batch使用 上的 EventHubProducerClient 方法创建对象EventDataBatch,然后可以使用 方法发送send_batch该对象。 可以使用 add 方法将事件添加到 ,EventDataBatch直到达到最大批大小限制(以字节为单位)。

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

使用事件中心的事件

可通过多种方式使用 EventHub 中的事件。 若要在收到事件时仅触发回调, EventHubConsumerClient.receive 将按如下所示使用 方法:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

分批使用事件中心的事件

上述示例在收到每条消息时触发回调,而下面的示例对一批事件触发回调,一次尝试接收一个数字。

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

以异步方式将事件发布到事件中心

create_batch使用 上的 EventHubProducer 方法创建对象EventDataBatch,然后可以使用 方法发送send_batch该对象。 可以使用 add 方法将事件添加到 ,EventDataBatch直到达到最大批大小限制(以字节为单位)。

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

异步使用事件中心的事件

此 SDK 支持基于同步和异步的代码。 若要按上述示例所示接收,但在 aio 中,需要满足以下条件:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

以异步方式批量使用事件中心的事件

aio 也支持所有同步函数。 如上面演示的同步批处理接收,可以在 asyncio 中完成相同的操作,如下所示:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

使用事件并使用检查点存储保存检查点

EventHubConsumerClient 是一种高级构造,它允许一次从多个分区接收事件,并使用同一事件中心和使用者组与其他使用者进行负载均衡。

这也允许用户在使用检查点处理事件时跟踪进度。

检查点用于表示用户从事件中心实例中的使用者组的特定分区中最后一个成功处理的事件。 EventHubConsumerClient使用 的CheckpointStore实例来更新检查点并存储负载均衡算法所需的相关信息。

搜索带有 前缀 azure-eventhub-checkpointstore 的 pypi 以查找支持此的包,并使用此类包中的 CheckpointStore 实现。 请注意,同时提供同步库和异步库。

在下面的示例中,我们将创建 的 EventHubConsumerClient 实例并使用 BlobCheckpointStore。 需要 创建一个 Azure 存储帐户 和一个 Blob 容器 来运行代码。

Azure Blob 存储检查点存储异步Azure Blob 存储检查点存储同步是我们提供的将Azure Blob 存储作为持久存储的实现之一CheckpointStore

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

使用 EventHubConsumerClient 处理IoT 中心

也可以使用 EventHubConsumerClient 来处理IoT 中心。 这对于从链接的 EventHub 接收IoT 中心的遥测数据很有用。 关联的连接字符串将没有发送声明,因此无法发送事件。

请注意,连接字符串需要适用于与事件中心兼容的终结点,例如“Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name”

可通过两种方式获取与事件中心兼容的终结点:

  • 在 Azure 门户中手动获取IoT 中心的“内置终结点”并从中接收。
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

故障排除

有关如何诊断各种故障方案的详细信息,azure-eventhubs请参阅故障排除指南

后续步骤

更多示例代码

请查看 示例 目录,获取有关如何使用此库向/从事件中心发送和接收事件的详细示例。

文档

此处提供了参考文档。

架构注册表和 Avro 编码器

EventHubs SDK 与 架构注册表 服务和 Avro 很好地集成。 有关详细信息,请参阅 架构注册表 SDK架构注册表 Avro 编码器 SDK

纯 Python AMQP 传输和向后兼容性支持

Azure 事件中心客户端库现在基于纯 Python AMQP 实现。 uAMQP 已删除为必需的依赖项。

使用 uAMQP 作为基础传输:

  1. 使用 pip 安装 uamqp
$ pip install uamqp 
  1. 在客户端构造期间传递 uamqp_transport=True
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

注意:message已弃用之前公开 的 uamqp.Message上的 EventData/EventDataBatch属性。 引入了 返回 EventData.message/EventDataBatch.message 的“旧版”对象,以帮助促进转换。

从源生成 uAMQP 滚轮

如果 uAMQP 打算用作 的基础 AMQP 协议实现 azure-eventhub,则可以为大多数主要操作系统找到 uAMQP 轮。

如果打算使用 uAMQP ,并且正在未提供 uAMQP 滚轮的平台上运行,请按照 uAMQP 安装 指南从源进行安装。

提供反馈

如果遇到任何 bug 或有建议,请在项目的“ 问题 ”部分中提出问题。

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。

曝光数