Python용 Azure 스키마 레지스트리 Avro 인코더 클라이언트 라이브러리 - 버전 1.0.0

Azure Schema Registry는 스키마 스토리지, 버전 관리 및 관리를 제공하는 Azure Event Hubs 호스팅되는 스키마 리포지토리 서비스입니다. 이 패키지는 스키마 레지스트리 스키마 식별자와 Avro로 인코딩된 콘텐츠를 포함하는 페이로드를 인코딩하고 디코딩할 수 있는 Avro 인코더를 제공합니다.

소스 코드 | 패키지(PyPi) | API 참조 설명서 | 샘플 | Changelog

고지 사항

Python 2.7에 대한 Azure SDK Python 패키지 지원은 2022년 1월 1일에 종료되었습니다. 자세한 내용과 질문은 을 참조하세요. https://github.com/Azure/azure-sdk-for-python/issues/20691

시작

패키지 설치

pip를 사용하여 Python용 Azure 스키마 레지스트리 Avro 인코더 클라이언트 라이브러리를 설치합니다.

pip install azure-schemaregistry-avroencoder

필수 조건:

이 패키지를 사용하려면 다음이 있어야 합니다.

클라이언트 인증

스키마 레지스트리 Avro 인코더와의 상호 작용은 스키마 그룹 이름과 스키마 레지스트리 클라이언트 클래스를 사용하는 AvroEncoder 클래스의 인스턴스로 시작합니다. 클라이언트 생성자는 Event Hubs 정규화된 네임스페이스 및 Azure Active Directory 자격 증명을 사용합니다.

  • 스키마 레지스트리 인스턴스의 정규화된 네임스페이스는 형식 <yournamespace>.servicebus.windows.net을 따라야 합니다.

  • TokenCredential 프로토콜을 구현하는 AAD 자격 증명을 생성자에 전달해야 합니다. azure-identity 패키지에서 사용할 수 있는 프로토콜의 TokenCredential 구현이 있습니다. 에서 제공하는 azure-identity자격 증명 형식을 사용하려면 pip를 사용하여 Python용 Azure ID 클라이언트 라이브러리를 설치하세요.

pip install azure-identity
  • 또한 비동기 API를 사용하려면 먼저 aiohttp와 같은 비동기 전송을 설치해야 합니다.
pip install aiohttp

azure-schemaregistry 라이브러리를 사용하여 AvroEncoder를 만듭니다.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
# Namespace should be similar to: '<your-eventhub-namespace>.servicebus.windows.net'
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

주요 개념

AvroEncoder

Avro 이진 인코딩에서 인코딩 및 디코딩할 API와 스키마 ID가 있는 콘텐츠 형식을 제공합니다. SchemaRegistryClient를 사용하여 스키마 콘텐츠에서 스키마 ID를 얻거나 그 반대의 경우도 마찬가지입니다.

지원되는 메시지 모델

와 상호 운용성을 AvroEncoder위해 특정 Azure Messaging SDK 모델 클래스에 지원이 추가되었습니다. 이러한 모델은 네임스페이 MessageType 스 아래에 정의된 프로토콜의 하위 형식입니다 azure.schemaregistry.encoder.avroencoder . 현재 지원되는 모델 클래스는 다음과 같습니다.

  • azure-eventhub>=5.9.0의 경우 azure.eventhub.EventData

메시지 형식

인코딩을 위해 MessageType 프로토콜을 따르는 메시지 형식이 인코더에 제공되면 해당 콘텐츠 및 콘텐츠 형식 속성을 설정합니다. 여기서는 다음을 수행합니다.

  • content: Avro 페이로드(일반적으로 형식별 페이로드)

    • Avro 이진 인코딩
    • 스키마를 포함하고 이 인코더의 목적을 무효화하여 스키마를 메시지 페이로드에서 스키마 레지스트리로 이동하는 NOT Avro 개체 컨테이너 파일입니다.
  • content type: 형식 avro/binary+<schema ID>의 문자열입니다. 여기서:

    • avro/binary 형식 표시기입니다.
    • <schema ID> 는 스키마 레지스트리 서비스의 문자열과 동일한 형식 및 바이트 순서인 GUID의 16진수 표현입니다.

가 메시지 형식으로 전달되면 EventData 개체에 다음 속성이 EventData 설정됩니다.

  • 속성은 body 콘텐츠 값으로 설정됩니다.
  • 속성은 content_type 콘텐츠 형식 값으로 설정됩니다.

메시지 유형이 제공되지 않고 기본적으로 인코더가 다음 받아쓰기를 만듭니다. {"content": <Avro encoded payload>, "content_type": 'avro/binary+<schema ID>' }

예제

다음 섹션에서는 다음을 포함하여 가장 일반적인 스키마 레지스트리 작업 중 일부를 다루는 몇 가지 코드 조각을 제공합니다.

Encoding

메서드를 AvroEncoder.encode 사용하여 지정된 Avro 스키마로 콘텐츠를 인코딩합니다. 메서드는 스키마 레지스트리 서비스에 이전에 등록된 스키마를 사용하고 향후 인코딩 사용을 위해 스키마를 캐시된 상태로 유지합니다. 서비스에 스키마를 미리 등록하지 않고 메서드에 자동으로 등록 encode 하려면 키워드 인수 auto_register=True 를 생성자에 전달 AvroEncoder 해야 합니다.

import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventData

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
name = "example.avro.User"
format = "Avro"

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
schema_registry_client.register_schema(group_name, name, definition, format)
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

with encoder:
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)

    # OR

    message_content_dict = encoder.encode(dict_content, schema=definition)
    event_data = EventData.from_message_content(message_content_dict["content"], message_content_dict["content_type"])

디코딩

메서드를 AvroEncoder.decode 사용하여 Avro로 인코딩된 콘텐츠를 다음 중 하나로 디코딩합니다.

  • MessageType 프로토콜의 하위 형식인 메시지 개체를 전달합니다.
  • content(형식 바이트) 및 content_type (형식 문자열)을 사용하여 받아쓰기를 전달합니다. 메서드는 스키마 레지스트리 서비스에서 스키마를 자동으로 검색하고 향후 디코딩 사용을 위해 스키마를 캐시된 상태를 유지합니다.
import os
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = "<your-group-name>"

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
encoder = AvroEncoder(client=schema_registry_client)

with encoder:
    # event_data is an EventData object with Avro encoded body
    dict_content = {"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
    event_data = encoder.encode(dict_content, schema=definition, message_type=EventData)
    decoded_content = encoder.decode(event_data)

    # OR 

    encoded_bytes = b'<content_encoded_by_azure_schema_registry_avro_encoder>'
    content_type = 'avro/binary+<schema_id_of_corresponding_schema>'
    content_dict = {"content": encoded_bytes, "content_type": content_type}
    decoded_content = encoder.decode(content_dict)

Event Hubs 전송 통합

Event Hubs와 통합하여 Avro로 인코딩된 콘텐츠 및 body 해당 content_type로 설정된 개체를 보냅니 EventData 다.

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

Event Hubs 수신 통합

Event Hubs와 통합하여 개체를 EventData 수신하고 Avro로 인코딩된 body 값을 디코딩합니다.

import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    consumer_group='$Default',
    eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
    decoded_content = avro_encoder.decode(event)

with eventhub_consumer, avro_encoder:
    eventhub_consumer.receive(on_event=on_event, starting_position="-1")

문제 해결

일반

스키마 레지스트리 Avro 인코더는 스키마 레지스트리 서비스와 통신할 때 오류가 발생하는 경우 Azure Core 에 정의된 예외를 발생합니다. 잘못된 콘텐츠/콘텐츠 형식 및 잘못된 스키마와 관련된 오류는 각각 및 azure.schemaregistry.encoder.avroencoder.InvalidSchemaErrorazure.schemaregistry.encoder.avroencoder.InvalidContentError 발생합니다. 여기서 __cause__ 는 Apache Avro 라이브러리에서 발생한 기본 예외를 포함합니다.

로깅

이 라이브러리는 로깅에 표준 로깅 라이브러리를 사용합니다. HTTP 세션(URL, 헤더 등)에 대한 기본 정보는 INFO 수준에서 기록됩니다.

요청/응답 본문 및 수정되지 않은 헤더를 포함한 자세한 DEBUG 수준 로깅은 인수가 있는 클라이언트 logging_enable 에서 사용하도록 설정할 수 있습니다.

import sys
import os
import logging
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential

# Create a logger for the SDK
logger = logging.getLogger('azure.schemaregistry')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

fully_qualified_namespace = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE']
group_name = os.environ['SCHEMAREGISTRY_GROUP']
credential = DefaultAzureCredential()
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, credential, logging_enable=True)
# This client will log detailed information about its HTTP sessions, at DEBUG level
encoder = AvroEncoder(client=schema_registry_client, group_name=group_name)

마찬가지로 logging_enable은 클라이언트에 대해 상세 로깅을 사용하지 않는 경우에도 한 작업에만 사용하게 설정할 수 있습니다.

encoder.encode(dict_content, schema=definition, logging_enable=True)

다음 단계

추가 샘플 코드

일반적인 Azure 스키마 레지스트리 Avro 인코더 시나리오를 보여주는 추가 예제는 샘플 디렉터리에 있습니다.

참여

이 프로젝트에 대한 기여와 제안을 환영합니다. 대부분의 경우 기여하려면 권한을 부여하며 실제로 기여를 사용할 권한을 당사에 부여한다고 선언하는 CLA(기여자 라이선스 계약)에 동의해야 합니다. 자세한 내용은 https://cla.microsoft.com 을 참조하세요.

끌어오기 요청을 제출하면 CLA-bot은 CLA를 제공하고 PR을 적절하게 데코레이팅해야 하는지 여부를 자동으로 결정합니다(예: 레이블, 설명). 봇에서 제공하는 지침을 따르기만 하면 됩니다. 이 작업은 CLA를 사용하여 모든 리포지토리에서 한 번만 수행하면 됩니다.

이 프로젝트에는 Microsoft Open Source Code of Conduct(Microsoft 오픈 소스 준수 사항)가 적용됩니다. 자세한 내용은 Code of Conduct FAQ(규정 FAQ)를 참조하세요. 또는 추가 질문이나 의견은 opencode@microsoft.com으로 문의하세요.