Microsoft Azure - Системы, ориентированные на потоки событий

Кристофер Беннидж

Продукты и технологии:
Microsoft Azure, Azure Event Hubs
В статье рассматриваются:

  • архитектура Azure Event Hubs;
  • конфигурирование Azure Event Hubs;
  • реализация процессора событий.

В наши дни главное — это данные. Они помогают нам принимать обоснованные решения, большие данные (Big Data) — принимать обоснованные и далеко идущие решения, а огромные потоки данных — принимать обоснованные и своевременные решения. Эти постоянно текущие потоки данных часто называют потоками событий. И все чаще создаются программные системы, основная цель которых — обработка потоков событий.

Даже в разных отраслях и предметных областях проявляется четко выраженный общий архитектурный шаблон этих ориентированных на потоки событий систем. Этот шаблон современных систем, ориентированных на потоки событий, играет ту же фундаментальную роль, что и классическая многоуровневая архитектура, сохраняющаяся в традиционных корпоративных системах, размещаемых на предприятиях. Я начну с краткого описания этого нарождающегося шаблона.

Выявляем шаблон

Сначала я должен прояснить термин «событие». Здесь это просто некая порция данных, означающая, что в системе что-то произошло. События, как правило, имеют малый размер, исчисляемый байтами или килобайтами. Вы также услышите термины вроде сообщений, телеметрии или даже просто данных вместо события.

События порождаются источниками событий (event producers). Источниками может быть что угодно: подключенные автомобили, холодильники с компьютерным управлением, игровые приставки, персональные гаджеты для фитнеса или даже программная система, генерирующая события самодиагностики. Однако важно понимать, что в большинстве этих систем вы имеете дело с многочисленными источниками событий.

Во многих системах количество источников событий исчисляется от десятков тысяч до десятков миллионов и более. Это означает, что эти системы, как правило, имеют крупный масштаб (high volume) и высокую скорость (high velocity). Крупный масштаб подразумевает наличие огромного объема данных в целом, а высокая скорость — что данные генерируются часто.

Кроме источников событий, существуют потребители событий (event consumers). Потребители являются настоящей сердцевиной систем этих типов. Они отвечают за анализ, интерпретацию и реакцию на события. Количество потребителей в типичной системе может варьироваться от пары до нескольких десятков. События не направляются конкретным потребителям. Каждый потребитель видит один и тот же набор событий. В контексте Microsoft Azure потребителями чаще всего являются облачные сервисы.

Рассмотрим пример. Есть поток событий, представляющий финансовые транзакции. Источники событий в этом сценарии — системы кассовых терминалов в розничных магазинах. Один потребитель отвечает за анализ потока на предмет мошеннических действий и генерацию оповещений. Другой анализирует тот же поток, чтобы своевременно оптимизировать цепочки поставок. Наконец, третий отвечает за трансляцию событий в долговременные хранилища для последующего анализа.

В комбинации с крупным масштабом и высокой скоростью событий этот шаблон источников и потребителей требует решения нескольких интересных проблем.

  • Как предотвратить перегрузку потребителей всплесками генерируемых событий? То есть, как должна реагировать система, когда частота событий начинает превышать частоту их потребления?
  • Поскольку скорость событий высокая, как можно масштабировать индивидуального потребителя событий?

Ключ к решению этих проблем — в использовании брокера событий (рис. 1). Именно эту роль играет недавно выпущенный Azure Event Hubs.

Архитектура Azure Event Hub
Рис. 1. Архитектура Azure Event Hub

Applications Приложения
Devices Устройства
Event broker Брокер событий
Consumer Потребитель

Как же применение такого брокера вроде Event Hubs решает описанные мной проблемы?

Знакомимся с Event Hubs

Event Hubs обеспечивает эластичность, необходимую для абсорбции и сохранения событий на то время, пока их не подхватят потребители. Event Hubs может эффективно стабилизировать изменения в скорости потока событий, чтобы потребителям не приходилось заботиться об этом. Без такой стабилизации (выравнивания) потребитель может захлебнуться потоком событий и начать сбоить.

Применение брокера изолирует источники и потребители событий друг от друга. Эта изоляция особенно важна в более сложных версиях архитектурного шаблона, где между источниками и потребителями необходимы дополнительные посредники. Event Hubs — это точка композиции, шов или граница в архитектуре. Все компоненты, которые взаимодействуют через Event Hub, не требуют знания специфики друг друга.

К этому моменту легко перепутать Event Hubs с традиционными корпоративными сервисами обмена сообщениями, которые предлагают тот же тип изоляции. Однако Event Hubs отличается от них в нескольких ключевых отношениях, что и делает его идеальным для этого архитектурного шаблона.

Независимые потребители

Event Hubs использует модель публикации и подписки, но каждый потребитель имеет независимое представление одного и того же потока событий. В некоторых традиционных системах обмена сообщениями со множеством потребителей сообщения копируются для каждого заинтересованного потребителя. Это может быть неэффективным в отношении скорости и занимаемого пространства, но преимущество в том, что у каждого потребителя свой входной ящик (inbox). По мере обработки сообщений потребитель удаляет их из своего входного ящика. Здесь отсутствует влияние на других потребителей, потому что у них есть собственные копии в их входных ящиках.

В случае Event Hubs существует один набор неизменяемых событий и, поскольку они неизменяемые, требуется лишь одна копия каждого события. Аналогично потребители никогда не удаляют события из системы. Все потребители видят один и тот же набор событий. Ввиду этого потребители отвечают за отслеживания своей позиции в потоке событий. Для этого они отслеживают свои смещения в потоке событий. С этой целью в SDK встроен соответствующий API.

Хранение по времени

В традиционных системах обмена сообщениями потребитель отвечает за информирование системы, когда он заканчивает обработку сообщения. После этого система может избавиться от данного сообщения. Поскольку потребитель в Event Hubs отвечает за отслеживание своей позиции в потоке событий, как Event Hub узнает, когда потребитель обработал события? Если двумя словами, то никак. В случае Event Hubs вы настраиваете период хранения (retention period), и события хранятся в течение этого времени. То есть события удаляются автоматически, независимо от действий какого-либо потребителя.

Вследствие такого хранения по времени потребителю нужно проанализировать и обработать события до того, как истечет срок их существования. К счастью, нижележащая архитектура Event Hubs позволяет при необходимости масштабировать индивидуальные потребители.

Event Hubs поддерживает это за счет физического разбиения потока событий на разделы. Вы задаете количество разделов при подготовке Event Hub. Подробности см. в официальной документации (bit.ly/11QAxOY).

Event Hubs обеспечивает эластичность, необходимую для абсорбции и сохранения событий на то время, пока их не подхватят потребители.

По мере публикации событий в Event Hub они помещаются в разделы. Конкретное сообщение находится только в одном разделе. По умолчанию события равномерно распределяются между разделами по принципу карусели. Предусмотрены механизмы, обеспечивающие прикрепление к определенным разделам. Самый распространенный механизм позволяет вам задавать у события свойство ключа раздела, и все события с одинаковым ключом будут доставляться в один и тот же раздел.

Как разбитый на разделы поток событий помогает потребителям укладываться в лимиты времени? В контексте Event Hubs правильным термином на самом деле является группа потребителей (consumer group). Причина в том, что каждый потребитель в действительности состоит из нескольких экземпляров. Каждая группа включает по одному экземпляру на раздел. С этого момента вы должны понимать, что группа потребителей — это потребитель в целом, а экземпляр потребителя — член группы, заинтересованный в конкретном разделе.

Это означает, что группа потребителей может обрабатывать поток событий параллельно. Каждый экземпляр потребителя в группе способен обрабатывать раздел независимо от других экземпляров. Все эти экземпляры потребителей находятся на одной машине, где каждый экземпляр потребителя выполняется изолированно от другого. Их можно было бы распределить по нескольким машинам — вплоть до того, что каждый экземпляр потребителя выполнялся бы на выделенном компьютере. Тем самым Event Hubs обходит некоторые типичные проблемы, связанные с классическим шаблоном конкурирующих потребителей.

Здесь изоляция является ключевой концепцией. Во-первых, вы изолируете источники и потребители событий друг от друга, обеспечивая гибкую архитектурную композицию, а также выравнивание нагрузки. Во-вторых, группы потребителей изолируются друг от друга, уменьшая возможность каскадных сбоев по группам потребителей. В-третьих, экземпляры потребителя в данной группе изолируются друг от друга, чтобы обеспечить горизонтальное масштабирование для индивидуальных групп потребителей.

Использование Event Hubs

Есть несколько хороших учебных пособий, позволяющих начать работу с Event Hubs. Прочитайте официальную документацию (bit.ly/11QAxOY) и следуйте руководству, в котором используется нужная вам платформа.

Event Hubs использует модель публикации и подписки, но каждый потребитель имеет независимое представление одного и того же потока событий.

Сначала вы должны подготовить Event Hub. Этот процесс достаточно прямолинеен. Вы можете легко проверить это, используя пробную учетную запись Azure. В Azure Management Portal перейдите в раздел Service Bus. Создайте пространство имен Service Bus, если у вас еще нет такого пространства. После этого вы увидите вкладку Event Hubs с инструкциями по созданию Event Hub (рис. 2).

Создание Event Hub
Рис. 2. Создание Event Hub

Вы также должны настроить общую политику доступа к Event Hub, прежде чем начать с ним работу. Правила этой политики управляют защитой для Event Hub. На портале перейдите в только что созданный Event Hub и выберите вкладку Configure.

Выберите Manage для разрешений и присвойте политике какое-то название вроде «super» или «do-not-use-in-production» (не использовать в производстве). Затем переключитесь обратно на вкладку Dashboard и щелкните кнопку Connection Information внизу. Здесь вы должны куда-то скопировать строку подключения, а также имя, присвоенное собственному Event Hub.

Генерация событий

Код, который я покажу здесь, использует .NET SDK, но вы можете задействовать любую платформу, поддерживающую HTTP или AMQP. Вам понадобится сослаться на NuGet-пакет Microsoft Azure Service Bus. Нужные вам классы находятся в пространстве имен Microsoft.ServiceBus.Messaging. От вас требуется лишь создать клиент, создать событие и отправить его:

var client = EventHubClient.CreateFromConnectionString (
  connectionString, eventHubName);

var body = Encoding.UTF8.GetBytes("My first event");
var eventData = new EventData (body);

await client.SendAsync (eventData);

Несмотря на простоту, стоит обратить внимание на несколько интересных элементов. Тело события — это просто байтовый массив. Любым группам потребителей, обрабатывающим это событие, нужно будет знать, как интерпретировать эти байты. Скорее всего группам потребителей понадобится какая-то подсказка, чтобы определить, как десериализовать тело. Перед отправкой к событию можно присоединить метаданные:

eventData.Properties.Add ("event-type", "utf8string");

Это означает, что нужно использовать ключи и значения, известные как источникам, так и группам потребителей. Если вы хотите, чтобы набор событий гарантированно доставлялся в один и тот же раздел, то можете задать ключ раздела:

eventData.PartitionKey =
  "что-то осмысленное для вашей предметной области";

Вы получите более высокую производительность, если события не будут закрепляться за конкретными разделами. Однако в некоторых случаях вам потребуется, чтобы некий набор связанных событий направлялся одному экземпляру потребителя для обработки. События в разделе гарантированно располагаются в порядке их приема. Простого способа гарантировать порядок событий между разными разделами в Event Hub нет. Зачастую это и является мотивацией закрепления событий за конкретным разделом.

Например, если вы поддерживаете «интеллектуальные» автомобили, то захотите, чтобы все события от определенного автомобиля попадали в один и тот же раздел. Вы могли бы выбрать Vehicle Identification Number (VIN) (номерной знак) в качестве ключа раздела. Или же ваша система могла бы поддерживать умные дома с сотнями устройств в каждом доме, генерирующих события. В таком случае вы можете использовать идентификацию самого дома в качестве ключа раздела, чтобы все события ото всех устройств в одном и том же доме попадали в один и тот же раздел.

В целом, закрепление за разделами — опасная практика, и вы должны использовать эту возможность крайне осторожно. Плохой выбор ключа раздела может привести к неравномерному распределению событий по разделам. А это может в конечном счете создать проблемы с масштабированием групп потребителей. Хорошая новость в том, что вы можете многократно изменять архитектуру системы, чтобы избавиться от необходимости в использовании закрепления событий за разделами.

Использование событий

Возможно, вас беспокоит, как всем этим управлять. Ваши группы потребителей должны отслеживать свои смещения в потоке событий. Каждой группе нужен экземпляр на каждый раздел. К счастью, для всех этих операций есть API.

Добавьте ссылку на NuGet-пакет Microsoft Azure Service Bus Event Hub-EventProcessorHost. Нужные вам классы находятся в пространстве имен Microsoft.ServiceBus.Messaging. Чтобы приступить к работе, достаточно реализовать один интерфейс: IEventProcessor.

Реализовав свой процессор событий, вы создадите экземпляр EventProcessorHost, чтобы зарегистрировать этот процессор событий. Хост возьмет на себя всю тяжелую работу. При запуске он проанализирует ваш Event Hub, чтобы выяснить, сколько в нем разделов. Затем он создаст по одному экземпляру вашего процессора событий на каждый доступный раздел.

От вас требуется реализация трех методов. Первые два: OpenAsync и CloseAsync. Хост вызывает OpenAsync, когда экземпляр процессора событий впервые получает раздел в аренду. Это означает, что экземпляр процессора событий имеет монопольный доступ к разделу для соответствующей группы потребителей. Аналогично хост вызывает CloseAsync, когда аренда истекает или когда он закрывается. Приступая к работе, вы можете использовать очень простую реализацию:

public Task OpenAsync(PartitionContext context)
{
  return Task.FromResult(true);
}

public Task CloseAsync(PartitionContext context,
  CloseReason reason)
{
  return Task.FromResult(true);
}

Оба эти метода принимают аргумент PartitionContext. Третий метод тоже принимает этот аргумент. Вы можете изучить этот аргумент, если хотите узнать все детали о конкретном разделе, выданном в аренду процессору событий. Последний метод — то место, где вы реально принимаете события (рис. 3).

Рис. 3. Последний метод, доставляющий события

public async Task ProcessEventsAsync (PartitionContext context,
  IEnumerable<EventData> messages)
{
  foreach (var message in messages)
  {
    var eventType = message.Properties["event-type"];
    var bytes = message.GetBytes();

    if (eventType.ToString() == "utf8string") {
      var body = System.Text.Encoding.UTF8.GetString (bytes);
      // Делаем что-то с телом
    } else {
      // Фиксируем, что мы понятия не имеем,
      // что делать с этим событием
    }
  }

  await context.CheckpointAsync();
  // Этот код не является производственным
}

Как видите, все достаточно прямолинейно. Вы получаете перечисляемый набор событий, который можно перебрать и выполнить любую необходимую работу. В конце метода вызывается context.CheckpointAsync. Это сообщает хосту, что вы успешно обработали данный набор событий и хотели бы зафиксировать контрольную точку (checkpoint). Контрольная точка — это смещение последнего события в пакете.

Вот так ваша группа потребителей может отслеживать, какие события были обработаны для каждого раздела. После запуска хост пытается получить аренду любого доступного раздела. Начиная обработку для раздела, он проверяет информацию о контрольной точке в этом разделе. И лишь события, более свежие, чем соответствующие последнему смещению, посылаются их процессорам.

Хост также обеспечивает автоматическое выравнивание нагрузки между машинами. Например, у вас есть Event Hub с 16 разделами. То есть у вас будет 16 экземпляров своего процессора событий — по одному на каждый раздел. Если вы выполняете хост на одной машине, он создаст все 16 экземпляров на этой же машине. Если вы запустите другой хост на второй машине и она является частью той же группы потребителей, два хоста начнут выравнивать распределение экземпляров процессоров событий между двумя машинами. В итоге вы получите по восемь экземпляров процессоров событий на каждой машине. Если вы выключите вторую машину, первый хост вновь возьмет под свое крыло осиротевшие разделы.

Допустим реализацией IEventProcessor является MyEventProcessor. Тогда создание экземпляра хоста сводится к следующему:

var host = new EventProcessorHost(
  hostName,
  eventHubName,
  consumerGroupName,
  eventHubConnectionString,
  checkpointConnectionString);

await host.RegisterEventProcessorAsync<MyEventProcessor>();

Арументы eventHubConnectionString и eventHubName имеют те же значения, что и при отправке событий в предыдущем примере. Лучше всего использовать строки подключения с политиками общего доступа, которые ограничивают права только тем, что необходимо.

Аргумент hostName идентифицирует экземпляр EventProcessorHost. При запуске хоста в кластере (объединяющем несколько машин) рекомендуется предоставлять имя, отражающее идентификацию машины, на которой он выполняется.

Аргумент consumerGroupName идентифицирует логическую группу потребителей, которую представляет этот хост. Существует группа потребителей по умолчанию, на которую можно ссылаться по константе EventHubConsumerGroup.DefaultGroupName. Любое другое имя требует предварительной подготовки группы потребителей. Для этого создайте экземпляр Microsoft.ServiceBus.NamespaceManager и используйте такие методы, как CreateConsumerGroupAsync.

Наконец, вы должны предоставить строку подключения для учетной записи Azure Storage, используя checkpointConnectionString. В этой учетной записи хранилища хост отслеживает все состояние, касающееся разделов и смещений событий. Это состояние хранится в объектах (blobs) в виде чистого текста, который легко читается.

Существуют другие сервисы Azure, которые интегрируются с Event Hubs. Azure Stream Analytics (в настоящее время находится в виде предварительной версии) обеспечивает декларативный SQL-подобный синтаксис для преобразования и анализа потоков событий, поступающих в Event Hubs. Аналогично Event Hubs предоставляет адаптер для очень популярного Apache Storm, доступного сейчас в виде Preview в Azure через HDInsight.

Заключение

Архитектурный шаблон, обрисованный здесь, — это лишь начало. При реализации настоящей системы вы должны принимать во внимание множество других факторов. Эти факторы связаны, в том числе, с дополнительной безопасностью, подготовкой и управлением источниками событий, трансляцией протоколов, исходящими коммуникациями и др. Тем не менее, теперь вы вооружены знанием фундаментальных концепций, необходимых для создания системы, использующей брокер событий наподобие Event Hubs.


Кристофер Беннидж (Christopher Bennage) — член группы Microsoft Patterns & Practices. Любит творить разные вещи на компьютерах.

Выражаю благодарность за рецензирование статьи экспертам Microsoft Мустафе Элемали (Mostafa Elhemali) и Дэну Росанову (Dan Rosanova).