你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的 Azure EventHubs 检查点存储客户端库 - 版本 1.1.4

使用存储 Blob

Azure EventHubs 检查点存储用于在处理来自 Azure 事件中心 的事件时存储检查点。 此检查点存储包用作 的插件包 EventHubConsumerClient。 它使用 Azure 存储 Blob 作为持久存储来维护检查点和分区所有权信息。

请注意,这是一个同步库,对于 Azure EventHubs 检查点存储客户端库的异步版本,请参阅 azure-eventhub-checkpointstoreblob-aio

源代码 | 包 (PyPi) | API 参考文档 | Azure Eventhubs 文档 | Azure 存储文档

入门

先决条件

  • Python2.7、Python 3.6 或更高版本。

  • Microsoft Azure 订阅:若要使用 Azure 服务(包括 Azure 事件中心),需要订阅。 如果没有现有的 Azure 帐户,可以在 创建帐户时注册免费试用版或使用 MSDN 订阅者权益。

  • 包含事件中心的事件中心命名空间:若要与Azure 事件中心交互,还需要提供命名空间和事件中心。 如果不熟悉如何创建 Azure 资源,可能需要按照使用 Azure 门户创建事件中心的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

  • Azure 存储帐户:需要有一个 Azure 存储帐户并创建一个Azure Blob 存储块容器,以使用 Blob 存储检查点数据。 可以按照 创建 Azure 块 Blob 存储帐户的指南进行操作。

安装包

$ pip install azure-eventhub-checkpointstoreblob

关键概念

检查点

检查点是读取者在分区事件序列中标记或提交其位置时执行的过程。 检查点操作由使用者负责,并在使用者组中的每个分区上进行。 这种责任意味着,对于每个使用者组而言,每个分区读取者必须跟踪它在事件流中的当前位置,当它认为数据流已完成时,可以通知服务。 如果读取者与分区断开连接,当它重新连接时,将开始读取前面由该使用者组中该分区的最后一个读取者提交的检查点。 当读取者建立连接时,它会将此偏移量传递给事件中心,以指定要从其开始读取数据的位置。 这样,用户便可以使用检查点将事件标记为已由下游应用程序“完成”,并且在不同计算机上运行的读取者之间发生故障转移时,还可以提供弹性。 若要返回到较旧的数据,可以在此检查点过程中指定较低的偏移量。 借助此机制,检查点可以实现故障转移复原和事件流重放。

&偏移序列号

两个偏移 & 序列号都表示事件在分区中的位置。 可以将它们视为客户端游标。 偏移量是事件的字节编号。 偏移量/序列号使事件使用者 (读取者) 可以指定事件流中要从中开始读取事件的点。 可以指定时间戳,以便接收仅在给定时间戳之后排队的事件。 使用者负责在事件中心服务的外部存储其自身的偏移量值。 在分区中,每个事件都包含偏移量、序列号和排队时的时间戳。

示例

创建 EventHubConsumerClient

创建 EventHubConsumerClient 的最简单方法是使用连接字符串。

from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

有关创建 EventHubConsumerClient的其他方法,请参阅 EventHubs 库 了解更多详细信息。

使用 BlobCheckpointStore to do 检查点使用事件


from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'


def on_event(partition_context, event):
    # Put your code here.
    partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    with client:
        client.receive(on_event)

if __name__ == '__main__':
    main()

与不同版本的 Azure 存储服务 API 一起使用BlobCheckpointStore

某些环境具有不同版本的 Azure 存储服务 API。 BlobCheckpointStore 默认情况下,使用存储服务 API 版本 2019-07-07。 若要针对其他版本使用它,请指定 api_version 创建 BlobCheckpointStore 对象时。

疑难解答

常规

启用日志记录有助于解决问题。

日志记录

  • 启用 azure.eventhub.extensions.checkpointstoreblob 记录器以从库收集跟踪。
  • 启用 azure.eventhub 记录器以从 azure-eventhub 主库收集跟踪。
  • 启用 azure.eventhub.extensions.checkpointstoreblob._vendor.storage 记录器以从 Azure 存储 Blob 库收集跟踪。
  • 启用 uamqp 记录器以从基础 uAMQP 库收集跟踪。
  • 在创建客户端时通过设置 logging_enable=True 启用 AMQP 帧级跟踪。

后续步骤

更多示例代码

开始使用我们的 EventHubs 检查点存储示例

文档

此处提供了参考文档。

提供反馈

如果遇到任何 bug 或有建议,请在项目的“ 问题 ”部分中提交问题。

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。

曝光数