Руководством по программированию .NET для концентраторов событий Azure (устаревший пакет Microsoft. Azure. EventHubs)

В данной статье обсуждаются некоторые распространенные сценарии написания кодов с помощью Центров событий Azure. Предполагается, что вы уже имеете представление о Центрах событий. Общие сведения о Центрах событий см. в статье Общие сведения о Центрах событий Azure.

Предупреждение

Это краткое справочное по для старого пакета Microsoft. Azure. EventHubs . Мы рекомендуем перенести код, чтобы использовать последний пакет Azure. Messaging. EventHubs .

Издатели событий

Отправка событий в концентратор событий осуществляется с использованием HTTP POST или через подключение AMQP 1.0. Выбор способа и времени зависит от определенного сценария, к которому выполняется обращение. Подключения AMQP 1.0 измеряются как подключения через посредника по служебной шине. Они больше всего подходят для сценариев с большими объемами сообщений и менее строгими требованиями к задержке, так как такие подключения обеспечивают постоянный канал обмена сообщениями.

При использовании управляемых API .NET основными конструктивными элементами для публикации данных в Центрах событий являются классы EventHubClient и EventData. EventHubClient предоставляет канал связи AMQP, по которому события отправляются в концентратор событий. Класс EventData представляет собой событие и используется для публикации сообщений в концентраторе событий. Этот класс включает тело, некоторые метаданные (свойства) и сведения о заголовке (Системпропертиес) о событии. По мере перемещения объекта EventData через концентратор событий к объекту добавляются другие свойства.

Начало работы

Классы .NET, поддерживающие Центры событий, входят в пакет NuGet Microsoft.Azure.EventHubs. Центр событий можно установить с помощью обозревателя решений Visual Studio или консоли диспетчера пакетов в Visual Studio. Для этого выполните следующую команду в окне консоли диспетчера пакетов :

Install-Package Microsoft.Azure.EventHubs

Создание концентратора событий

Вы можете использовать портал Azure, службу Azure PowerShell или Azure CLI для создания Центров событий. Подробности см. в статье Создание пространства имен концентраторов событий и концентратора событий с помощью портала Azure.

Создание клиента Центров событий

Основным классом для взаимодействия с концентраторами событий является класс Microsoft.Azure.EventHubs.EventHubClient. Можно создать экземпляр этого класса, используя метод CreateFromConnectionString, как показано в следующем примере:

private const string EventHubConnectionString = "Event Hubs namespace connection string";
private const string EventHubName = "event hub name";

var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
    EntityPath = EventHubName

};
eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

Отправка событий в концентратор событий

Для отправки событий в концентратор событий создается экземпляр EventHubClient, который отправляется асинхронно с помощью метода SendAsync. Этот метод принимает один параметр экземпляра EventData и синхронно отправляет его в концентратор событий.

Сериализация событий

Класс EventData имеет два перегруженных конструктора, которые принимают различные параметры, байты или массив байтов, которые представляют полезные данные событий. При использовании JSON совместно с EventDataможно применить метод Encoding.UTF8.GetBytes() для получения массива байтов для строки в кодировке JSON. Например:

for (var i = 0; i < numMessagesToSend; i++)
{
    var message = $"Message {i}";
    Console.WriteLine($"Sending message: {message}");
    await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}

Ключ раздела

Примечание

Если вы не знакомы с секциями, см. эту статью.

При отправке данных событий можно указать значение, которое хэшируется для создания назначения секции. Укажите свойство секции, используя PartitionSender.PartitionID. Кроме того, решение об использовании секций подразумевает выбор между доступностью и целостностью. Дополнительные сведения см. в статье Доступность и согласованность в Центрах событий.

Пакетные операции отправки событий

Пакетная отправка событий позволяет повысить пропускную способность. Вы можете использовать API CreateBatch для создания пакета, в который можно позже добавить объекты данных для вызова SendAsync.

Размер одного пакета не должен превышать 1 МБ для события. Кроме того, каждое сообщение в пакете использует один и тот же идентификатор издателя. Отправитель должен убедиться, что размер пакета не превышает максимальный размер события. Если размер превышен, возникает ошибка отправки. Можно использовать вспомогательный метод EventHubClient.CreateBatch, чтобы убедиться, что пакет не превысит 1 МБ. Вы получаете пустой пакет EventDataBatch из API CreateBatch, а затем с помощью TryAdd добавляете события для создания пакета.

Асинхронная отправка и отправка в нужном масштабе

События можно отправлять в концентратор событий асинхронно. Асинхронная отправка увеличивает частоту, с которой клиент может отправлять события. SendAsync возвращает объект Task. На стороне клиента можно использовать класс RetryPolicy для управления параметрами повторного выполнения попыток клиентом.

Получатели событий

Класс EventProcessorHost обрабатывает данные из Центров событий. Эту реализацию следует использовать при создании модулей чтения событий на платформе .NET. EventProcessorHost предоставляет потокобезопасную многопроцессную среду безопасного выполнения для реализаций обработчиков событий. Эта среда также предоставляет средства управления контрольными точками и аренды секций.

Чтобы воспользоваться классом EventProcessorHost, можно реализовать интерфейс IEventProcessor. Этот интерфейс содержит четыре метода:

Чтобы начать обработку событий, создайте экземпляр EventProcessorHost, указав соответствующие параметры для концентратора событий. Например:

var eventProcessorHost = new EventProcessorHost(
        EventHubName,
        PartitionReceiver.DefaultConsumerGroupName,
        EventHubConnectionString,
        StorageConnectionString,
        StorageContainerName);

Затем вызовите регистеревентпроцессорасинк , чтобы зарегистрировать реализацию IEventProcessor в среде выполнения:

await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

На этом этапе узел попытается получить аренду для каждой секции в концентраторе событий, используя "жадный" алгоритм. Эти аренды будут продолжаться в течение заданного промежутка времени, после чего их необходимо продлить. По мере перехода новых узлов (в нашем случае это рабочие экземпляры) в оперативный режим они размещают резервирования аренды и со временем нагрузка смещается между узлами при каждой попытке получения дополнительной аренды.

Узел обработчика событий

Со временем устанавливается равновесие. Такая динамическая возможность позволяет применить к потребителям автоматическое масштабирование как вверх, так и вниз на основе использования ресурсов ЦП. Так как в Центрах событий не реализован прямой механизм подсчета сообщений, среднее использование ресурсов ЦП зачастую является оптимальным для измерения масштабирования серверной части или потребителя. Если издатели начинают публиковать больше событий, чем могут обработать потребители, увеличение использования ресурсов ЦП потребителями можно использовать для автоматического масштабирования на основе числа экземпляров исполнителей.

Класс EventProcessorHost также реализует механизм создания контрольных точек на основе службы хранилища Azure. Этот механизм хранит смещение для каждой секции, чтобы каждый потребитель мог определить последнюю контрольную точку от предыдущего потребителя. По мере перехода секций между узлами перенос нагрузки осуществляется за счет механизма синхронизации.

Отзыв издателя

В дополнение к расширенным функциям, выполняемым узлом обработчика событий, служба концентраторов событий включает отзыв издателя , чтобы запретить конкретным издателям отправлять события в концентратор событий. Эти функции полезны, если маркер издателя скомпрометирован или после обновления программного обеспечения эти функции перестали работать должным образом. В этих случаях для идентификатора издателя, который является частью маркера SAS, можно заблокировать возможность публикации событий.

Примечание

В настоящее время эта функция поддерживается только REST API (отзыв издателя).

Дальнейшие действия

Дополнительные сведения о сценариях Центров событий см. в разделах, ссылки на которые указаны ниже.