Share via


Biblioteca cliente do Azure Schema Registry Avro Serializer para Python - versão 1.0.0b4

Azure Schema Registry é um serviço de repositório de esquemas hospedado por Hubs de Eventos do Azure, fornecendo armazenamento de esquemas, versão e gestão. Este pacote fornece um serializador Avro capaz de serializar e deserizar cargas que contenham identificadores de esquemas de schema e dados codificados pela Avro.

Código fonte | Pacote (PyPi) | Documentação de | referência da API Amostras | Alterlog

Exclusão de Responsabilidade

O suporte de pacotes Azure SDK Python para Python 2.7 termina a 01 de janeiro de 2022. Para mais informações e perguntas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691

Introdução

Instale o pacote

Instale a biblioteca de clientes Azure Schema Registry Avro Serializer e a biblioteca de clientes Azure Identity para Python com pip:

pip install azure-schemaregistry-avroserializer azure-identity

Pré-requisitos:

Para utilizar este pacote, você deve ter:

Autenticar o cliente

A interação com o Schema Registry Avro Serializer começa com uma instância da classe AvroSerializer, que tem o nome de grupo de schema e a classe Schema Registry Client . O construtor cliente leva o espaço de nome totalmente qualificado e a credencial de Diretório Ativo Azure:

  • O espaço de nome totalmente qualificado da instância do registo de Schema deve seguir o formato: <yournamespace>.servicebus.windows.net.

  • Uma credencial AAD que implementa o protocolo TokenCredential deve ser passada ao construtor. Existem implementações do TokenCredential protocolo disponível no pacote de identidade azul. Para utilizar os tipos de credenciais fornecidos por, por azure-identityfavor instale a biblioteca cliente Azure Identity para Python com pip:

pip install azure-identity
  • Além disso, para utilizar a API async suportada no Python 3.6+, deve primeiro instalar um transporte de async, como aiohttp:
pip install aiohttp

Criar o AvroSerializer utilizando a biblioteca de esquemas azure::

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Conceitos-chave

AvroSerializador

Fornece a API para serializar e deserializar da Avro Binary Encoding mais um cabeçalho com identificação de esquema. Usa SchemaRegistryClient para obter IDs de esquema do conteúdo do esquema ou vice-versa.

Formato de mensagem

O mesmo formato é utilizado por serializadores de registo de esquemas em línguas Azure SDK.

As mensagens são codificadas da seguinte forma:

  • 4 bytes: Indicador de Formato

    • Atualmente sempre zero para indicar o formato abaixo.
  • 32 bytes: Schema ID

    • UTF-8 representação hexadecimal da GUID.
    • 32 dígitos de hexáxinho, sem hífenes.
    • O mesmo formato e encomenda de byte como string do serviço de registo de Schema.
  • Bytes restantes: Carga útil avro (em geral, carga útil específica do formato)

    • Avro Binary Encoding
    • NÃO Arquivo de Contentores de Objetos Avro, que inclui o esquema e derrota o propósito deste serialzer de mover o esquema para fora da carga útil da mensagem e para o registo de esquemas.

Exemplos

As seguintes secções fornecem vários fragmentos de código que cobrem algumas das tarefas mais comuns do Registo de Schema, incluindo:

Serialização

Utilize AvroSerializer.serialize o método para serializar dados dict com o esquema avro dado. O método utilizaria um esquema previamente registado no serviço de registo de Schema e manteria o esquema em cache para futura utilização da serialização. Também é possível evitar o pré-registo do esquema ao serviço e registar-se automaticamente com o serialize método, através da AvroSerializer instantânea com o argumento auto_register_schemas=Trueda palavra-chave .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Deserialização

Utilize AvroSerializer.deserialize o método para deserializar bytes crus em dados dict. O método recupera automaticamente o esquema do Serviço de Registo de Schema e mantém o esquema em cache para futura utilização da deserialização.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Centros de Eventos Enviando Integração

Integração com os Centros de Eventos para enviar dados avro dict serializados como o corpo do EventData.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Centros de Eventos Que Recebem Integração

Integração com os Centros de Eventos para receber EventData e deserizar bytes crus em dados avro dict.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Resolução de problemas

Geral

Azure Schema Registry Avro Serializer levanta exceções definidas em Azure Core.

Registo

Esta biblioteca utiliza a biblioteca de registos padrão para registar registos. Informações básicas sobre sessões HTTP (URLs, cabeçalhos, etc.) são registadas ao nível info.

A registo detalhado do nível DEBUG, incluindo os órgãos de pedido/resposta e os cabeçalhos não redigidos, pode ser ativado num cliente com o logging_enable argumento:

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

Da mesma forma, logging_enable pode permitir a registo detalhado para uma única operação, mesmo quando não está habilitado para o cliente:

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Passos seguintes

Mais código de amostra

Por favor, encontre mais exemplos no diretório de amostras que demonstre cenários comuns de Azure Schema Registry Avro Serializer.

Contribuir

Agradecemos todas as contribuições e sugestões para este projeto. A maioria das contribuições requerem que celebre um Contrato de Licença de Contribuição (CLA) no qual se declare que tem o direito de conceder e que, na verdade, concede-nos os direitos para utilizar a sua contribuição. Para mais detalhes, visite https://cla.microsoft.com.

Quando submete um pedido Pull, um bot do CLA determina automaticamente se tem de fornecer um CLA e decorar o PR de forma adequada (por exemplo, etiqueta, comentário). Só tem de seguir as instruções fornecidas pelo bot. Apenas terá de fazer isto uma vez em todos os repositórios com o nosso CLA.

Este projeto adotou o Microsoft Open Source Code of Conduct (Código de Conduta do Microsoft Open Source). Para mais informações consulte o Código de Conduta FAQ ou contacte opencode@microsoft.com com quaisquer perguntas ou comentários adicionais.