Поделиться через


клиентская библиотека Центры событий Azure для Python версии 5.11.5

Центры событий Azure — это высокомасштабируемая служба публикации и подписки, которая может принимать миллионы событий в секунду и передавать их нескольким потребителям. Это позволяет обрабатывать и анализировать большие объемы данных, создаваемых подключенными устройствами и приложениями. После сбора данных Центры событий можно получать, преобразовывать и хранить их с помощью любого поставщика аналитики в режиме реального времени или адаптеров пакетной обработки и хранения. Если вы хотите узнать больше о Центры событий Azure, ознакомьтесь с разделом Что такое Центры событий?

Клиентская библиотека Центров событий Azure позволяет публиковать и потреблять события Центров событий Azure и может использоваться в следующих целях:

  • выдача данных телеметрии о приложении для бизнес-аналитики и диагностики;
  • публикация фактов о состоянии вашего приложения, которые заинтересованные стороны могут наблюдать и использовать в качестве триггера для выполнения действий;
  • наблюдение за важными операциями и взаимодействиями внутри бизнес-среды или другой экосистемы, что позволяет слабо связанным системам взаимодействовать без необходимости привязывать их друг к другу;
  • получение событий от одного или нескольких издателей, их преобразование в соответствии с потребностями экосистемы и последующая публикация преобразованных событий в новом потоке, где их могут наблюдать потребители.

Исходный код | Пакет (PyPi) | Пакет (Conda) | Справочная документация по | APIДокументация по продукту | Образцы

Начало работы

Предварительные требования

  • Python 3.7 или более поздней версии.

  • Подписка Microsoft Azure: Чтобы использовать службы Azure, включая Центры событий Azure, вам потребуется подписка. Если у вас нет учетной записи Azure, вы можете зарегистрироваться для получения бесплатной пробной версии или использовать преимущества подписчика MSDN при создании учетной записи.

  • Пространство имен Центров событий с концентратором событий: Для взаимодействия с Центры событий Azure также необходимо иметь пространство имен и концентратор событий. Если вы не знакомы с созданием ресурсов Azure, вы можете воспользоваться пошаговым руководством по созданию концентратора событий с помощью портал Azure. Здесь также можно найти подробные инструкции по использованию Azure CLI, Azure PowerShell или шаблонов Azure Resource Manager (ARM) для создания концентратора событий.

Установка пакета

Установите клиентская библиотека Центры событий Azure для Python с помощью pip:

$ pip install azure-eventhub

Аутентификация клиента

Взаимодействие с Центрами событий начинается с экземпляра класса EventHubConsumerClient или EventHubProducerClient. Для создания экземпляра клиентского объекта требуется имя узла, учетные данные SAS/AAD и имя концентратора событий или строка подключения.

Создайте клиент из строка подключения:

Чтобы клиентская библиотека Центров событий взаимодействовала с концентратором событий, проще всего использовать строка подключения, которая создается автоматически при создании пространства имен Центров событий. Если вы не знакомы с политиками общего доступа в Azure, вы можете следовать пошаговому руководству, чтобы получить строка подключения Центров событий.

  • Метод from_connection_string принимает строка подключения формы Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey> и имени сущности в экземпляр концентратора событий. Вы можете получить строка подключения из портал Azure.

Создайте клиент с помощью библиотеки azure-identity:

Кроме того, можно использовать объект Credential для проверки подлинности с помощью AAD с пакетом azure-identity.

  • Этот конструктор, показанный в приведенном выше примере, принимает имя узла и имя сущности экземпляра концентратора событий и учетные данные, реализующие протокол TokenCredential . Существуют реализации протокола, доступные TokenCredential в пакете azure-identity. Имя узла имеет формат <yournamespace.servicebus.windows.net>.
  • Чтобы использовать типы учетных данных, предоставляемые azure-identity, установите пакет : pip install azure-identity
  • Кроме того, чтобы использовать асинхронный API, необходимо сначала установить асинхронный транспорт, например aiohttp: pip install aiohttp
  • При использовании Azure Active Directory субъекту должна быть назначена роль, которая разрешает доступ к Центрам событий, например роль владельца данных Центры событий Azure. Дополнительные сведения об использовании авторизации Azure Active Directory с Центрами событий см. в соответствующей документации.

Основные понятия

  • EventHubProducerClient — это источник данных телеметрии, диагностика информации, журналов использования или других данных журнала в составе внедренного решения устройства, мобильного приложения устройства, названия игры, работающей на консоли или другом устройстве, какого-либо клиентского или серверного бизнес-решения или веб-сайта.

  • EventHubConsumerClient получает такие сведения из концентратора событий и обрабатывает их. Обработка может включать агрегирование, сложные вычисления и фильтрацию. Обработка может также включать распространение или хранение информации в исходном или преобразованном виде. Потребители концентраторов событий часто являются надежными и масштабируемыми частями инфраструктурами платформы со встроенными возможностями аналитики, например Azure Stream Analytics, Apache Spark или Apache Storm.

  • Секция — это упорядоченная последовательность событий, которая хранится в концентраторе событий. Служба "Центры событий Azure" обеспечивает потоковую передачу сообщений посредством шаблона секционированных потребителей, в котором каждый потребитель считывает только определенное подмножество (секцию) потока сообщений. По мере поступления новых событий они добавляются в конец этой последовательности. Количество секций указывается во время создания концентратора событий и не может быть изменено.

  • Группа потребителей — это представление всего концентратора событий. Группы потребителей предоставляют каждому из нескольких приложений возможность иметь отдельное представление потока событий, а также считывать поток независимо друг от друга, в собственном темпе и из собственного положения. В одной группе потребителей может быть не более 5 одновременных читателей. Однако рекомендуется только один активный потребитель для каждой пары из секции и группы потребителей. Каждый активный читатель получает все события из своего раздела. Если в одном разделе несколько читателей, они получат дублирующиеся события.

Дополнительные понятия и более подробное обсуждение см. в разделе Функции Центров событий. Кроме того, концепции AMQP хорошо описаны в статье OASIS Advanced Messaging Queuing Protocol (AMQP) версии 1.0.

Потокобезопасность

Мы не гарантируем, что EventHubProducerClient или EventHubConsumerClient являются потокобезопасны. Мы не рекомендуем повторно использовать эти экземпляры в потоках. Выполняющееся приложение может использовать эти классы потокобезопасным способом.

Тип EventDataBatch модели данных не является потокобезопасной. Он не должен использоваться совместно между потоками и использоваться одновременно с клиентскими методами.

Примеры

В следующих разделах представлено несколько фрагментов кода, охватывающих некоторые из наиболее распространенных задач Центров событий, в том числе:

Проверка концентратора событий

Получение идентификаторов секций концентратора событий.

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

Публикация событий в концентраторе событий

Используйте метод в create_batch , чтобы создать EventDataBatch объект, который затем можно отправить с помощью send_batchEventHubProducerClient метода . События могут добавляться в EventDataBatch с помощью add метода , пока не будет достигнут максимальный размер пакета в байтах.

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

Использование событий из концентратора событий

Существует несколько способов получения событий из EventHub. Чтобы просто активировать обратный вызов при получении события, EventHubConsumerClient.receive будет использоваться метод следующим образом:

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

Использование событий из концентратора событий в пакетах

В то время как приведенный выше пример активирует обратный вызов для каждого сообщения по мере его получения, следующий пример активирует обратный вызов для пакета событий, пытаясь получить число за раз.

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

Асинхронная публикация событий в концентраторе событий

Используйте метод в create_batch , чтобы создать EventDataBatch объект, который затем можно отправить с помощью send_batchEventHubProducer метода . События могут добавляться в EventDataBatch с помощью add метода , пока не будет достигнут максимальный размер пакета в байтах.

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

Асинхронное использование событий из концентратора событий

Этот пакет SDK поддерживает синхронный и асинхронный код. Чтобы получить, как показано в приведенных выше примерах, но в aio, потребуется следующее:

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

Асинхронное использование событий из концентратора событий в пакетах

Все синхронные функции также поддерживаются в aio. Как показано выше для синхронного пакетного получения, в asyncio можно выполнить то же самое, как показано ниже.

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

Использование событий и сохранение контрольных точек с помощью хранилища контрольных точек

EventHubConsumerClient — это высокоуровневая конструкция, которая позволяет получать события из нескольких секций одновременно и распределять нагрузку с другими потребителями, используя один концентратор событий и группу потребителей.

Это также позволяет пользователю отслеживать ход обработки событий с помощью контрольных точек.

Контрольная точка представляет последнее успешно обработанное событие пользователем из определенной секции группы потребителей в экземпляре концентратора событий. использует EventHubConsumerClient экземпляр CheckpointStore для обновления контрольных точек и хранения соответствующих сведений, необходимых для алгоритма балансировки нагрузки.

Выполните поиск в pypi с префиксом azure-eventhub-checkpointstore , чтобы найти пакеты, которые поддерживают эту функцию, и использовать реализацию CheckpointStore из одного такого пакета. Обратите внимание, что предоставляются как синхронные, так и асинхронные библиотеки.

В приведенном ниже примере мы создадим экземпляр EventHubConsumerClient и используем BlobCheckpointStore. Для выполнения кода необходимо создать учетную запись хранения Azure и контейнер BLOB-объектов .

Хранилище BLOB-объектов Azure асинхронное хранилище контрольных точек и синхронизация хранилища контрольных точек Хранилище BLOB-объектов Azure — это одна из CheckpointStore предоставляемых нами реализаций, которая применяется Хранилище BLOB-объектов Azure в качестве постоянного хранилища.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Использование EventHubConsumerClient для работы с Центр Интернета вещей

Вы также можете использовать EventHubConsumerClient для работы с Центр Интернета вещей. Это полезно для получения данных телеметрии Центр Интернета вещей из связанного EventHub. Связанный строка подключения не будет иметь утверждений отправки, поэтому отправка событий невозможна.

Обратите внимание, что строка подключения должна быть для конечной точки, совместимой с концентратором событий, например Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/; SharedAccessKeyName=my-SA-name; SharedAccessKey=my-SA-key; EntityPath=my-iot-hub-name"

Получить конечную точку, совместимую с Центрами событий, можно двумя способами.

  • Вручную получите "Встроенные конечные точки" Центр Интернета вещей на портале Azure и получите от него.
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

Устранение неполадок

Дополнительные сведения о диагностике azure-eventhubs различных сценариев сбоя см. в руководстве по устранению неполадок.

Дальнейшие действия

Больше примеров кода

Подробные примеры использования этой библиотеки для отправки и получения событий в Центрах событий и из нее см. в каталоге примеров .

Документация

Справочная документация доступна здесь.

Реестр схем и кодировщик Avro

Пакет SDK EventHubs хорошо интегрируется со службой реестра схем и Avro. Дополнительные сведения см. в разделах Пакет SDK для реестра схем и Пакет SDK Avro Encoder для реестра схем.

Поддержка чистого транспорта и обратной совместимости PYTHON AMQP

Клиентская библиотека Центры событий Azure теперь основана на чистой реализации AMQP python. uAMQP удалена как необходимая зависимость.

Чтобы использовать в качестве базового транспорта, выполните следующие действия uAMQP .

  1. Установите uamqp с помощью pip.
$ pip install uamqp 
  1. Передача uamqp_transport=True во время построения клиента.
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

Примечание. Атрибут message вEventDataBatchEventData/ , который ранее предоставлял uamqp.Message, является устаревшим. Для упрощения перехода были введены объекты прежних версий, возвращаемые EventData.message/EventDataBatch.message .

Создание колесика uAMQP из источника

Если uAMQP предназначена для использования в качестве базовой реализации протокола AMQP для azure-eventhub, диски uAMQP можно найти для большинства основных операционных систем.

Если вы планируете использовать uAMQP и работаете на платформе, для которой не предоставлены диски uAMQP, следуйте указаниям по установке uAMQP для установки из источника.

Отзывы

Если вы столкнулись с ошибками или у вас есть предложения, сообщите о проблеме в разделе Проблемы проекта.

Участие

На этом проекте приветствуются публикации и предложения. Для участия в большинстве процессов по разработке документации необходимо принять лицензионное соглашение участника (CLA), в котором указывается, что вы предоставляете нам права на использование ваших публикаций. Для получения подробных сведений посетите веб-страницу https://cla.microsoft.com.

При отправке запроса на включение внесенных изменений CLA-бот автоматически определит необходимость предоставления соглашения CLA и соответствующего оформления запроса на включение внесенных изменений (например, добавление метки, комментария). Просто следуйте инструкциям бота. Будет достаточно выполнить их один раз для всех репозиториев, поддерживающих соглашение CLA.

В рамках этого проекта действуют правила поведения в отношении продуктов с открытым исходным кодом Майкрософт. Дополнительные сведения см. в разделе часто задаваемых вопросов о правилах поведения или обратитесь к opencode@microsoft.com с любыми дополнительными вопросами или комментариями.

Просмотры