你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Python 的 Azure 架构注册表 Avro 编码器客户端库 - 版本 1.0.0

Azure 架构注册表是由 Azure 事件中心 托管的架构存储库服务,提供架构存储、版本控制和管理。 此包提供一个 Avro 编码器,能够对包含架构注册表架构标识符和 Avro 编码内容的有效负载进行编码和解码。

源代码 | 包 (PyPi) | API 参考文档 | 样品 | 更改日志

免责声明

对 Python 2.7 的 Azure SDK Python 包支持已于 2022 年 1 月 1 日结束。 有关详细信息和问题,请参阅 https://github.com/Azure/azure-sdk-for-python/issues/20691

入门

安装包

使用 pip 安装适用于 Python 的 Azure 架构注册表 Avro 编码器客户端库:

pip install azure-schemaregistry-avroencoder

先决条件:

若要使用此包,必须具有:

验证客户端

与架构注册表 Avro Encoder 的交互从 AvroEncoder 类的实例开始,该实例采用架构组名称和 架构注册表客户端 类。 客户端构造函数采用事件中心完全限定的命名空间和 Azure Active Directory 凭据:

  • 架构注册表实例的完全限定命名空间应采用以下格式: <yournamespace>.servicebus.windows.net

  • 应将实现 TokenCredential 协议的 AAD 凭据传递给构造函数。 azure-identity 包中提供了协议的实现TokenCredential。 若要使用 提供的 azure-identity凭据类型,请使用 pip 安装适用于 Python 的 Azure 标识客户端库:

pip install azure-identity
  • 此外,若要使用异步 API,必须先安装异步传输,例如 aiohttp
pip install aiohttp

使用 azure-schemaregistry 库创建 AvroEncoder:

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

关键概念

AvroEncoder

提供 API,用于对 Avro 二进制编码进行编码和解码,以及具有架构 ID 的内容类型。 使用 SchemaRegistryClient 从架构内容获取架构 ID,反之亦然。

支持的消息模型

已向某些 Azure 消息传递 SDK 模型类添加了支持,以便与 AvroEncoder进行互操作性。 这些模型是在 命名空间下定义的协议的MessageTypeazure.schemaregistry.encoder.avroencoder子类型。 目前,支持的模型类包括:

  • azure-eventhub>=5.9.0,表示集 azure.eventhub.EventData

消息格式

如果将遵循 MessageType 协议的消息类型提供给编码器进行编码,它将设置相应的内容和内容类型属性,其中:

  • content:Avro 有效负载通常 (,特定于格式的有效负载)

    • Avro 二进制编码
    • NOT Avro 对象容器文件,其中包含架构,并破坏此编码器将架构从消息有效负载移出并移入架构注册表的目的。
  • content type:格式 avro/binary+<schema ID>的字符串,其中:

    • avro/binary 是格式指示器
    • <schema ID> 是 GUID 的十六进制表示形式,格式和字节顺序与架构注册表服务中的字符串相同。

如果 EventData 作为消息类型传入,将在 对象上 EventData 设置以下属性:

  • 属性 body 将设置为内容值。
  • 属性 content_type 将设置为内容类型值。

如果未提供消息类型,默认情况下,编码器将创建以下 dict: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

示例

以下部分提供了几个代码片段,涵盖了一些最常见的架构注册表任务,包括:

编码

AvroEncoder.encode使用 方法使用给定的 Avro 架构对内容进行编码。 方法将使用以前注册到架构注册表服务的架构,并保持架构缓存以供将来编码使用。 为了避免将架构预先注册到服务并使用 方法自动注册它 encode ,应将 keyword 参数 auto_register=True 传递给 AvroEncoder 构造函数。

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

解码

AvroEncoder.decode使用 方法通过以下任一方法解码 Avro 编码的内容:

  • 传入消息对象,该消息对象是 MessageType 协议的子类型。
  • 传入具有键 content 的 dict (类型字节) 和 content_type (类型字符串) 。 方法自动从架构注册表服务检索架构,并保留架构缓存以供将来解码使用。
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

事件中心发送集成

事件中心 集成,以发送 EventData 对象,该 body 对象设置为 Avro 编码的内容和相应的 content_type

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

事件中心接收集成

事件中心 集成以接收 EventData 对象并解码 Avro 编码 body 的值。

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

疑难解答

常规

如果在与架构注册表服务通信时遇到错误,Azure 架构注册表 Avro 编码器将引发 Azure Core 中定义的异常。 与无效内容/内容类型和无效架构相关的错误将分别作为 azure.schemaregistry.encoder.avroencoder.InvalidContentErrorazure.schemaregistry.encoder.avroencoder.InvalidSchemaError引发,其中 __cause__ 将包含 Apache Avro 库引发的基础异常。

日志记录

此库使用标准 日志记录 库进行日志记录。 有关 HTTP 会话 (URL、标头等的基本信息,) 在 INFO 级别记录。

可以使用 参数在客户端 logging_enable 上启用详细的调试级别日志记录,包括请求/响应正文和未处理标头:

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

同样,即使没有为客户端启用详细日志记录,logging_enable 也可以为单个操作启用:

encoder.encode(dict_content, schema=definition, logging_enable=True)

后续步骤

更多示例代码

演示常见 Azure 架构注册表 Avro 编码器方案的更多示例位于 示例 目录中。

贡献

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 https://cla.microsoft.com

提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。