Клиентская библиотека Avro Encoder реестра схем Azure для Python версии 1.0.0
Реестр схем Azure — это служба репозитория схем, размещенная Центры событий Azure, обеспечивающая хранение схем, управление версиями и управление ими. Этот пакет предоставляет кодировщик Avro, способный кодировать и декодировать полезные данные, содержащие идентификаторы схемы реестра схем и содержимое в кодировке Avro.
Исходный код | Пакет (PyPi) | Справочная документация по | API Образцы | Changelog
Заявление об отказе
Поддержка пакетов Python для Пакета Sdk Azure для Python 2.7 закончилась 1 января 2022 г. Дополнительные сведения и вопросы см. на https://github.com/Azure/azure-sdk-for-python/issues/20691
Начало работы
Установка пакета
Установите клиентную библиотеку Avro Encoder реестра схем Azure для Python с помощью pip:
pip install azure-schemaregistry-avroencoder
Предварительные требования:
Для использования этого пакета необходимо:
- Подписка Azure — создайте бесплатную учетную запись.
- Реестр - схем Azure Ниже приведено краткое руководство по созданию группы реестра схем с помощью портал Azure.
- Python 3.6 или более поздней версии — установка Python
Аутентификация клиента
Взаимодействие с Avro Encoder реестра схем начинается с экземпляра класса AvroEncoder, который принимает имя группы схемы и класс клиента реестра схем . Конструктор клиента принимает полное пространство имен Центров событий и учетные данные Azure Active Directory:
Полное пространство имен экземпляра Реестра схем должно иметь формат :
<yournamespace>.servicebus.windows.net
.Конструктору следует передать учетные данные AAD, реализующие протокол TokenCredential . Существуют реализации протокола, доступные
TokenCredential
в пакете azure-identity. Чтобы использовать типы учетных данных, предоставляемыеazure-identity
, установите клиентская библиотека удостоверений Azure для Python с помощью pip:
pip install azure-identity
- Кроме того, чтобы использовать асинхронный API, необходимо сначала установить асинхронный транспорт, например aiohttp:
pip install aiohttp
Создайте AvroEncoder с помощью библиотеки azure-schemaregistry:
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Основные понятия
AvroEncoder
Предоставляет API для кодирования и декодирования из Avro Binary Encoding, а также тип контента с идентификатором схемы. Использует SchemaRegistryClient для получения идентификаторов схемы из содержимого схемы или наоборот.
Поддерживаемые модели сообщений
Добавлена поддержка определенных классов моделей пакета SDK для обмена сообщениями Azure для взаимодействия с AvroEncoder
. Эти модели являются подтипами протокола, определенного MessageType
в azure.schemaregistry.encoder.avroencoder
пространстве имен. В настоящее время поддерживаются следующие классы моделей:
azure.eventhub.EventData
дляazure-eventhub>=5.9.0
;
Формат сообщений
Если кодировщику предоставляется тип сообщения, следующий протоколу MessageType для кодирования, он задаст соответствующие свойства содержимого и типа контента, где:
content
: полезные данные Avro (в общем, полезные данные для конкретного формата)- Двоичная кодировка Avro
- NOT Файл контейнера объектов Avro, который включает схему и не позволяет данному кодировщику переместить схему из полезных данных сообщения в реестр схем.
content type
: строка форматаavro/binary+<schema ID>
, где:avro/binary
является индикатором формата<schema ID>
— шестнадцатеричное представление GUID в том же формате и порядке байтов, что и строка из службы реестра схем.
Если EventData
передается в качестве типа сообщения, для объекта будут заданы EventData
следующие свойства:
- Свойству
body
будет присвоено значение содержимого. - Свойству
content_type
будет присвоено значение типа контента.
Если тип сообщения не указан и по умолчанию кодировщик создаст следующий диктовку: {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }
Примеры
В следующих разделах представлено несколько фрагментов кода, охватывающих некоторые из наиболее распространенных задач реестра схем, в том числе:
- Кодирование
- Декодирование
- Интеграция отправки Центров событий
- Интеграция центров событий, получающих интеграцию
Кодирование
Используйте метод для AvroEncoder.encode
кодирования содержимого с помощью заданной схемы Avro.
Метод будет использовать схему, ранее зарегистрированную в службе реестра схем, и сохранить схему в кэше для использования кодирования в будущем. Чтобы избежать предварительной регистрации схемы в службе и автоматической регистрации ее в методе encode
, аргумент auto_register=True
ключевого слова должен быть передан конструктору AvroEncoder
.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
with encoder:
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
# OR
message_content_dict = encoder.encode(dict_content, schema=definition)
event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])
Декодирование
Используйте метод для AvroEncoder.decode
декодирования содержимого в кодировке Avro с помощью:
- Передача объекта сообщения, который является подтипом протокола MessageType.
- Передача дикта с ключами
content
(тип байтов) иcontent_type
(строка типа). Метод автоматически извлекает схему из службы реестра схем и сохраняет схему в кэше для последующего использования декодирования.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)
with encoder:
# event_data is an EventData object with Avro encoded body
dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
decoded_content = encoder.decode(event_data)
# OR
encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
content_dict = {"content": encoded_bytes, "content_type": content_type}
decoded_content = encoder.decode(content_dict)
Интеграция отправки Центров событий
Интеграция с Центрами событий для отправкиEventData
объекта с содержимым в body
кодировке Avro и соответствующим content_type
.
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
Интеграция центров событий, получающих интеграцию
Интеграция с Центрами событий для получения EventData
объекта и декодирования значения в кодировке body
Avro.
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)
def on_event(partition_context, event):
decoded_content = avro_encoder.decode(event)
with eventhub_consumer, avro_encoder:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
Устранение неполадок
Общие сведения
Avro Encoder реестра схем Azure вызывает исключения, определенные в Azure Core , если при взаимодействии со службой реестра схем возникают ошибки. Ошибки, связанные с недопустимыми типами контента и недопустимыми схемами, будут вызываться как azure.schemaregistry.encoder.avroencoder.InvalidContentError
и azure.schemaregistry.encoder.avroencoder.InvalidSchemaError
соответственно, где __cause__
будут содержать базовое исключение, вызванное библиотекой Apache Avro.
Ведение журнала
Эта библиотека использует стандартную библиотеку ведения журнала для ведения журнала. Основные сведения о сеансах HTTP (URL-адреса, заголовки и т. д.) регистрируются на уровне INFO.
Подробное ведение журнала на уровне DEBUG, включая тексты запросов и ответов и нередактированные заголовки, можно включить на клиенте с помощью аргумента logging_enable
:
import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)
# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)
Аналогичным образом с помощью параметра logging_enable
можно включить подробное журналирование для отдельной операции (даже если этот режим не включен в клиенте):
encoder.encode(dict_content, schema=definition, logging_enable=True)
Дальнейшие действия
Больше примеров кода
Дополнительные примеры, демонстрирующие распространенные сценарии Avro Encoder реестра схем Azure, находятся в каталоге примеров .
Участие
На этом проекте приветствуются публикации и предложения. Для участия в большинстве процессов по разработке документации необходимо принять лицензионное соглашение участника (CLA), в котором указывается, что вы предоставляете нам права на использование ваших публикаций. Для получения подробных сведений посетите веб-страницу https://cla.microsoft.com.
При отправке запроса на включение внесенных изменений CLA-бот автоматически определит необходимость предоставления соглашения CLA и соответствующего оформления запроса на включение внесенных изменений (например, добавление метки, комментария). Просто следуйте инструкциям бота. Будет достаточно выполнить их один раз для всех репозиториев, поддерживающих соглашение CLA.
В рамках этого проекта действуют правила поведения в отношении продуктов с открытым исходным кодом Майкрософт. Дополнительные сведения см. в разделе часто задаваемых вопросов о правилах поведения или обратитесь к opencode@microsoft.com с любыми дополнительными вопросами или комментариями.
Azure SDK for Python
Обратная связь
https://aka.ms/ContentUserFeedback.
Ожидается в ближайшее время: в течение 2024 года мы постепенно откажемся от GitHub Issues как механизма обратной связи для контента и заменим его новой системой обратной связи. Дополнительные сведения см. в разделеОтправить и просмотреть отзыв по