Partager via


Bibliothèque de client Avro Serializer pour Python - version 1.0.0b4 d’Azure Schema Registry

Azure Schema Registry est un service de référentiel de schémas hébergé par Azure Event Hubs, qui fournit le stockage des schémas, le contrôle de version et la gestion. Ce package fournit un sérialiseur Avro capable de sérialiser et de désérialiser des charges utiles contenant des identificateurs de schéma du registre de schémas et des données encodées par Avro.

| Code sourcePackage (PyPi) | Documentation de référence sur les | API Échantillons | Changelog

Clause d’exclusion de responsabilité

La prise en charge des packages Python du SDK Azure pour Python 2.7 prend fin le 1er janvier 2022. Pour obtenir plus d’informations et poser des questions, reportez-vous à https://github.com/Azure/azure-sdk-for-python/issues/20691

Prise en main

Installer le package

Installez la bibliothèque de client Azure Schema Registry Avro Serializer et la bibliothèque de client Azure Identity pour Python avec pip :

pip install azure-schemaregistry-avroserializer azure-identity

Configuration requise :

Pour utiliser ce package, vous devez disposer des conditions suivantes :

Authentifier le client

L’interaction avec le sérialiseur Avro Du registre de schémas commence par une instance de la classe AvroSerializer, qui prend le nom du groupe de schémas et la classe cliente Schema Registry . Le constructeur client prend l’espace de noms complet Event Hubs et les informations d’identification Azure Active Directory :

  • L’espace de noms complet de l’instance de Registre de schémas doit suivre le format : <yournamespace>.servicebus.windows.net.

  • Les informations d’identification AAD qui implémentent le protocole TokenCredential doivent être passées au constructeur. Des implémentations du TokenCredential protocole sont disponibles dans le package azure-identity. Pour utiliser les types d’informations d’identification fournis par azure-identity, installez la bibliothèque de client Azure Identity pour Python avec pip :

pip install azure-identity
  • En outre, pour utiliser l’API asynchrone prise en charge sur Python 3.6+, vous devez d’abord installer un transport asynchrone, tel que aiohttp :
pip install aiohttp

Créez AvroSerializer à l’aide de la bibliothèque azure-schemaregistry :

from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = '<< FULLY QUALIFIED NAMESPACE OF THE SCHEMA REGISTRY >>'
group_name = '<< GROUP NAME OF THE SCHEMA >>'
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

Concepts clés

AvroSerializer

Fournit une API pour sérialiser et désérialiser à partir d’Avro Binary Encoding, ainsi qu’un en-tête avec l’ID de schéma. Utilise SchemaRegistryClient pour obtenir des ID de schéma à partir du contenu du schéma ou vice versa.

Format de message

Le même format est utilisé par les sérialiseurs de registre de schémas dans les langages du Kit de développement logiciel (SDK) Azure.

Les messages sont encodés comme suit :

  • 4 octets : Indicateur de format

    • Actuellement, toujours zéro pour indiquer le format ci-dessous.
  • 32 octets : ID de schéma

    • Représentation hexadécimale UTF-8 du GUID.
    • 32 chiffres hexadécimaux, pas de traits d’union.
    • Même format et ordre d’octets que la chaîne du service Registre de schémas.
  • Octets restants : charge utile Avro (en général, charge utile spécifique au format)

    • Encodage binaire Avro
    • NOT Avro Object Container File, qui inclut le schéma et va à l’encontre de l’objectif de ce serialzer de déplacer le schéma hors de la charge utile de message et dans le registre de schémas.

Exemples

Les sections suivantes fournissent plusieurs extraits de code couvrant certaines des tâches les plus courantes du Registre de schémas, notamment :

Sérialisation

Utilisez AvroSerializer.serialize la méthode pour sérialiser des données de dict avec le schéma avro donné. La méthode utilise un schéma précédemment inscrit auprès du service Registre de schémas et conserve le schéma mis en cache pour une utilisation ultérieure de la sérialisation. Il est également possible d’éviter de préinscrire le schéma auprès du service et de l’inscrire automatiquement avec la serialize méthode en instanciant avec AvroSerializer l’argument auto_register_schemas=Truede mot clé .

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_register_client.register(group_name, name, definition, format)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    dict_data = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    encoded_bytes = serializer.serialize(dict_data, schema=definition)

Désérialisation

Utilisez AvroSerializer.deserialize la méthode pour désérialiser des octets bruts dans des données de dictée. La méthode récupère automatiquement le schéma à partir du service de registre de schémas et conserve le schéma mis en cache pour une utilisation ultérieure de la désérialisation.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

with serializer:
    encoded_bytes = b'<data_encoded_by_azure_schema_registry_avro_serializer>'
    decoded_data = serializer.deserialize(encoded_bytes)

Intégration d’envoi d’Event Hubs

Intégration à Event Hubs pour envoyer des données avro dict sérialisées en tant que corps d’EventData.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name, auto_register_schemas=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
    event_data_batch = eventhub_producer.create_batch()
    dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    payload_bytes = avro_serializer.serialize(dict_data, schema=definition)
    event_data_batch.add(EventData(body=payload_bytes))
    eventhub_producer.send_batch(event_data_batch)

Intégration de réception d’Event Hubs

Intégration à Event Hubs pour recevoir EventData et désérialiser des octets bruts dans des données de dictée avro.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_serializer = AvroSerializer(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    bytes_payload = b"".join(b for b in event.body)
    deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

Dépannage

Général

Azure Schema Registry Avro Serializer déclenche des exceptions définies dans Azure Core.

Journalisation

Cette bibliothèque utilise la bibliothèque de journalisation standard pour la journalisation. Les informations de base sur les sessions HTTP (URL, en-têtes, etc.) sont enregistrées au niveau INFO.

La journalisation détaillée au niveau DEBUG, y compris les corps de requête/réponse et les en-têtes non expurgés, peut être activée sur un client avec l’argument logging_enable :

import sys
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import AvroSerializer
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient("<your-fully_qualified_namespace>", credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
serializer = AvroSerializer(client=schema_registry_client, group_name="<your-group-name>")

De la même façon, logging_enable peut activer la journalisation détaillée pour une seule opération, même quand elle n’est pas activée pour le client :

serializer.serialize(dict_data, schema=schema_definition, logging_enable=True)

Étapes suivantes

Autres exemples de code

Vous trouverez d’autres exemples dans le répertoire d’exemples illustrant les scénarios courants du registre de schémas Azure Avro Serializer.

Contribution

Ce projet accepte les contributions et les suggestions. La plupart des contributions vous demandent d’accepter un contrat de licence de contribution (CLA) déclarant que vous avez le droit de nous accorder, et que vous nous accordez réellement, les droits d’utilisation de votre contribution. Pour plus d’informations, visitez https://cla.microsoft.com.

Quand vous envoyez une demande de tirage (pull request), un bot CLA détermine automatiquement si vous devez fournir un contrat CLA et agrémenter la demande de tirage de façon appropriée (par exemple, avec une étiquette ou un commentaire). Suivez simplement les instructions fournies par le bot. Vous ne devez effectuer cette opération qu’une seule fois sur tous les dépôts utilisant notre contrat CLA.

Ce projet a adopté le Code de conduite Open Source de Microsoft. Pour plus d’informations, consultez les Questions fréquentes (FAQ) sur le code de conduite ou envoyez vos questions ou vos commentaires à opencode@microsoft.com.