June 2015

Volume 30 Number 6


Windows Azure изнутри - Event Hubs для аналитики и визуализации. Часть 2

Бруно Теркали

Продукты и технологии:

Microsoft Azure, Azure Event Hubs, Visual Studio 2013

В статье рассматриваются:

  • использование данных с помощью Event Hubs;
  • применение разных хранилищ данных;
  • создание, сохранение и запрос сообщений.

Исходный код можно скачать по ссылке

Применяя Microsoft Azure Event Hubs для обработки огромных массивов данных, важно уделить особое внимание доставке и использованию этих данных. Это вторая из трех статей, посвященных визуализации данных в сценариях для Интернета вещей (Internet of Things, IoT). В позапрошлом номере (msdn.microsoft.com/magazine/dn948106) я обсудил получение и обработку данных в огромных масштабах, используя Azure Event Hubs, технологию, которая является частью предложения Service Bus и поддерживает как публикаторы информации (Raspberry Pi), так и ее потребители (программа на C#) с возможностью произвольного масштабирования. В этой статья сосредоточусь на стороне потребителя — на C#-коде, который читает данные из Event Hubs и сохраняет их в постоянном хранилище.

Эта статья разбита на четыре раздела. Во-первых, я начну с архитектуры и проиллюстрирую использование и хранение сообщений. Во-вторых, я рассмотрю задачи, которые нужно выполнять на Azure Portal. В-третьих, я опишу два разных хранилища данных и объясню, почему я выбрал именно их. В-четвертых, я завершу статью пояснениями по клиентскому коду на C#, который читает сообщения из Event Hubs, а также по коду, который сохраняет сообщения в Azure SQL Database и DocumentDB.

Архитектура Event Hubs

После чтения сообщений из Azure Event Hubs вам понадобится сохранить их как в Azure SQL Database, так и в DocumentDB. Azure SQL Database — это реляционная база данных в виде сервиса (Database as a Service, DaaS), которая масштабируется до тысяч баз данных. DocumentDB — полностью управляемый, масштабируемый сервис NoSQL-базы данных документов (подробнее о DocumentDB см. по ссылке bit.ly/1oWZP7i). Эта архитектура показана на рис. 1.

Общая схема архитектуры Azure Event Hubs
Рис. 1. Общая схема архитектуры Azure Event Hubs

Azure Datacenter Информационный центр Azure
Event Hubs Event Hubs
Event Consumer Потребитель событий
C# Program with Event Hub SDK Программа на C# с Event Hub SDK
SQL Server/Azure SQL Database SQL Server/Azure SQL Database
DocumentDB DocumentDB

Задачи, выполняемые на портале

Чтобы реализовать решение, нужно выполнить несколько задач на портале. На Azure Portal выполняются две категории задач. Первая — это подготовка Event Hubs, Azure SQL Database и DocumentDB. Подготовку Event Hubs мы выполнили в прошлой статье.

Чтобы читать сообщения из Event Hubs, а затем сохранять их в Azure SQL Database и DocumentDB, надо получить информацию о соединении от Azure Portal. Все настройки будут помещены в App.config в решение Visual Studio для приложения на C#. Я создам обычное консольное приложение, используя Event Hubs SDK.

Наконец, чтобы упростить операции с базами данных, я напишу пару хранимых процедур для Azure SQL Database. Вы найдете код для создания таблицы и двух хранимых процедур в файле DatabaseCode.txt, входящем в решение Visual Studio. Для создания таблицы и хранимых процедур можно задействовать SQL Server Management Studio. Для подключения Azure SQL Database в информационном центре Microsoft к вашей локальной копии Visual Studio используйте информацию о соединении от Azure Portal. Подробнее о том, как это делается, см. Azure Documentation по ссылке bit.ly/1K1BIeM.

В прошлой статье я продемонстрировал процесс подготовки (provisioning process) для Service Bus Event Hubs. Если вам нужны какие-то простые учебные пособия, прочитайте «Get Started with Event Hubs» по ссылке bit.ly/1F2gp9H.

После подготовки Event Hubs вы получите конечную точку. Вам потребуется скопировать информацию о соединении, чтобы все необходимое было помещено в App.config, когда вы будете писать код на C#. Также понадобится подготовить Azure SQL Database на Azure Portal. Подробно об этом процессе см. в документации по ссылке bit.ly/1Enln5c.

После создания базы данных нужно выполнить еще три простые задачи. Во-первых, создать таблицу температур. А во-вторых и в-третьих, создать две хранимые процедуры, с помощью которых вы будете удалять старые сообщения и вставлять новые. В проекте Visual Studio есть текстовый файл DatabaseCode.txt с определением таблицы для температур, а также две хранимые процедуры: CleanTable и InsertTempData.

Теперь переключимся на DocumentDB. Узнать больше о подготовке этого NoSQL-хранилища данных можно по ссылке bit.ly/1IBnGQ5. Вы также можете посмотреть видеоролик от Райена Крокура (CrawCour), одного из старших менеджеров программ в группе, которая шествует над этим продуктом с момента его появления.

Подготовив DocumentDB на портале, вы должны определить имя набора и имя базы данных. Рассматривайте DocumentDB как серию наборов, а набор — как серию документов. Аналогия в мире баз данных: набор — это таблица, а документ — запись. Это довольно вольная аналогия, так как NoSQL-хранилища данных обычно не имеют схем. В нашем приложение набору присвоено имя CityTempCollection, а базе данных — TemperatureDB.

Варианты хранилищ данных

Для начала разберемся в необходимости дополнительных хранилищ данных помимо того, которое изначально предоставляется Event Hubs. Например, одна из очевидных причин — сообщения в Event Hubs не хранятся постоянно. Вторая причина — нет возможности запрашивать сообщения с помощью любого языка запросов. С разделов Event Hub сообщения можно лишь последовательно считывать, где каждый клиентский «читатель» поддерживает собственный курсор в хранилище сообщений. Третья — Event Hubs выделяет вам 84 Гб хранилища событий со сроком хранения по умолчанию в течение 24 часов.

Хотя вариантов много, я выбрал Azure SQL Database и DocumentDB в качестве основных хранилищ для сообщений/данных, относящихся к температурам. У Azure SQL Database два крупных преимущества. Первое из них связано с тем, что эта хранилище находится в облаке, что дает несколько других плюсов: оно почти мгновенно подготавливается, экономично и реплицируется в трех экземплярах, что обеспечивает весьма высокую надежность.

Помимо эти очевидных выгод, второе крупное преимущество заключается в наличии для Azure SQL Database множества разнообразных средств бизнес-анализа, таких как Power BI. Это позволяет формировать интерактивные отчеты с весьма сложной визуализацией. Вы можете создавать мощные информационные панели (dashboards), которые можно потом использовать из браузера и с мобильного устройства.

В качестве второго хранилища данных используйте DocumentDB. Оно дает некоторые очевидные преимущества. Это полностью управляемый сервис, поэтому вам не придется беспокоиться об инфраструктуре и индивидуальных виртуальных машинах (VM). Кроме того, предоставляется соглашение по корпоративному уровню обслуживанию, как и в случае большинства сервисов на основе Azure. Наконец, DocumentDB тоже свободно от схем.

Хранилища документов без схем часто считаются желательными в контексте объектной ориентации и наследования. Дело в том, что наследование подразумевает, что у вас есть объекты, у которых какие-то атрибуты общие, а какие-то — специфичны для подтипа объектов.

В случае большинства реляционных баз данных вам понадобилась бы таблица для всех возможных атрибутов, где многие из них оставались бы равными null для неприменимых полей. Однако в базе данных без схемы можно хранить разные наборы дополнительных свойств. Это хорошо работает при рендеринге HTML, так как JavaScript может выполнять проверку на неизвестное дополнительное свойство и вызывать соответствующую функцию для вывода в отображаемую на экране таблицу.

Одно из других преимуществ базы данных без схемы (что может кем-то рассматриваться как недостаток) заключается в том, что она обеспечивает дополнительную гибкость во время разработки. Вы можете добавлять новый функционал без реорганизации базы данных. Это облегчает поддержание обратной совместимости с данными, созданными предыдущей версией. Минус этого подхода в том, что написание запросов дополнительных полей может стать сложным и запутанным.

Одна из областей, где хранилище данных на основе JSON вроде DocumentDB по-настоящему блистает, — использование приложений Node.js для предоставления данных мобильным или веб-приложениям. Родной формат JSON, компактный и выразительный, упрощает работу с ним и делает ее более естественной. Использовать хранилища данных на основе JSON с HTTP и REST так же удобно независимо от того, что вы применяете — Microsoft .NET Framework, JavaScript, Ruby, Java, PHP или Python. Я сосредоточусь на приложениях Node.js, которые читают данные DocumentDB и предоставляют их мобильным приложениям, в третьей статье из этой серии.

Использование и сохранение сообщений

Существует три распространенных подхода к использованию сообщений из Event Hubs. Как всегда, неизбежен компромисс между простотой и гибкостью. Самый простой способ — применение Stream Analytics, а самый сложный, но и самый гибкий — использование прямых приемников (direct receivers). Прямые приемники отвечают за свою координацию доступа к разделам в пределах группы потребителей (consumer group), при этом ваш код напрямую адресуется к идентификатору раздела при создании приемника для объекта EventHubConsumerGroup. Третий способ основан на абстракциях более высокого уровня, таких как EventProcessorHost. Я буду использовать этот способ, потому что он достаточно прост и в то же время достаточно гибок.

Stream Analytics — это полностью управляемый сервис, который облегчает получение и обработку сообщений от Event Hubs. Эти сообщения легко использовать, преобразовывать и помещать в Azure SQL Database. Подробнее об этом подходе см. по ссылке bit.ly/1IRvPDc.

Вы создаете базу данных, таблицу и серию запросов. Например, для чтения сообщений из Event Hubs и их отправки в SQL Server используется простое выражение «SELECT DeviceId, Temperature FROM input». Можно даже рекурсивно объединять запросы в цепочки и создавать серии запросов, которые преобразуют данные, используя конвейерный подход. Такая возможность фильтрации сообщений серией SQL-запросов является весьма мощной абстракцией.

Одно из более интересных расширений, доступных через Stream Analytics, имеет дело с временными окнами (time windows). Часто предъявляется требование выполнять вычисления (агрегацию) над неким набором или другие операции над подмножествами событий, которые укладываются в определенный период времени. Поскольку в сложных системах обработки событий время является критичным фактором, важно иметь простой способ работы с временным компонентом логики запроса в системе. В Azure Stream Analytics эти подмножества событий определяются окнами, которые представляют группы, объединенные по времени. Подробное объяснение различных типов поддерживаемых временных окон см. по ссылке bit.ly/1DIizfM.

Прямые приемники позволяют адресоваться к конкретным идентификаторам разделов Event Hub. Разделы приносят пользу в двух отношениях. Во-первых, они позволяют масштабировать публикацию и потребление сообщений. Во-вторых, они дают возможность разделять данные.

Я воспользуюсь подходом с EventProcessorHost, который является «интеллектуальным» агентом для .NET-потребителей; он управляет доступом и смещением на каждом разделе. Более подробное описание см. в блоге ServiceBus (bit.ly/1aO5I19). EventProcessorHost упрощает чтение сообщений, их преобразование и запись в постоянное хранилище.

Я напишу программу на C#, которая использует Event Hubs SDK и помещает данные как в Azure SQL Database, так и в DocumentDB (советую прочитать руководство «Event Hubs Programming Guide» по ссылке bit.ly/1IBrpNz). Весь исходный код можно получить с bit.ly/1aSFF99. Я убрал из исходного кода пароль и секретные ключи, но информацию о соединении вы получите сами от портала. Чтобы получить весь код, установите Git и выполните команду git clone. Некоторые файлы консольного приложения показаны на рис. 2. Код основного драйвера вы найдете в Program.cs.

Рис. 2. Код для использования и сохранения сообщений

+==============+
|   Раздел 1   |
+==============+
internal class Program
{
  private static string eventHubConnectionString =
    ConfigurationManager.AppSettings[
    "eventHubConnectionString"];
  static private string eventHubName =
    ConfigurationManager.AppSettings["eventHubName"];
  static private string storageAccountName =
    ConfigurationManager.AppSettings["storageAccountName"];
  static private string storageAccountKey =
    ConfigurationManager.AppSettings["storageAccountKey"];
  static private string storageConnectionString =
    string.Format("DefaultEndpointsProtocol=https;" +
    "AccountName={0};AccountKey={1}",
    storageAccountName, storageAccountKey);
  static private string eventProcessorHostName =
    Guid.NewGuid().ToString();

  private static void Main(string[] args)
  {

    // Запускаем основной механизм использования сообщений
    +==============+
    |   Раздел 2   |
    +==============+
    var eventProcessorHost =
      new EventProcessorHost(eventProcessorHostName,
      eventHubName, EventHubConsumerGroup.DefaultGroupName,
      eventHubConnectionString, storageConnectionString);

    // Асинхронно используем сообщения от Event Hub
    eventProcessorHost.RegisterEventProcessorAsync
      <SimpleEventProcessor>().Wait();

    Console.WriteLine("Receiving. Press enter key
      to stop worker.");
    Console.ReadLine();
  }
}
+==============+
|   Раздел 3   |
+==============+

// SimpleEventProcessor.cs

class SimpleEventProcessor : IEventProcessor
{
  private DataManager dataManager =
    new DataManager(new SQLDatabaseManager());
  // ...Опущены для краткости
  public SimpleEventProcessor()  ...
  async Task IEventProcessor.CloseAsync(
    PartitionContext context, CloseReason reason) ...
  Task IEventProcessor.OpenAsync(PartitionContext context) ...

  async Task IEventProcessor.ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
  {

    // Перебираем сообщения в цикле для вставки
    foreach (EventData eventData in messages)
    {
      string data = Encoding.UTF8.GetString(
        eventData.GetBytes());
      // Поля разделяются запятыми
      string[] msg = data.Split(',');
      if (msg.Length > 2)
      {
        +==============+
        |   Раздел 4   |
        +==============+
        // Вставка в SQL
        dataManager.InsertSqlMessage(msg[0], Convert.ToInt32(
          msg[1]), Convert.ToDouble(msg[2]));

        // Вставляем в глобальный объект DocumentDB
        // (глобальный из-за проблем с синхронизацией потоков)
        Globals.DocDb.InsertEntry(msg[0], Convert.ToInt32(
          msg[1]), Convert.ToDouble(msg[2]));
      }

      Console.WriteLine(string.Format(
        "Message received.  Partition: '{0}', " +
        "Data: '{1}'", context.Lease.PartitionId, data));
    }

    // Вызываем CheckpointAsync каждые 5 мин, чтобы рабочая
    // роль могла возобновить обработку с 5 мин назад,
    // если она перезапускается
    if (this.checkpointStopWatch.Elapsed >
      TimeSpan.FromMinutes(5))
    {
      await context.CheckpointAsync();
      lock (this)
      {
        this.checkpointStopWatch.Reset();
      }
    }
  }
}
+==============+
|   Раздел 5   |
+==============+

// DocumentDBManager.cs
public class DocumentDbManager
{
  // Опущено для краткости
  public string collectionName = "CityTempCollection";
  public string databaseName = "TemperatureDB";

  public async Task<bool> InsertEntry(
    string city, int month, double temperature)
  {
    dynamic document = null;
    try
    {
      +==============+
      |   Раздел 6   |
      +==============+
      // Проверяем, есть ли такой город (city)
      document = client.CreateDocumentQuery(
        documentCollection.DocumentsLink)
        .Where(d => d.Id == city).AsEnumerable().
        FirstOrDefault();
    }
    catch (Exception ex)
    {
      throw;
    }

    bool docExists = (document != null);

    // Документа пока нет
    if (!docExists)
    {
      +==============+
      |   Раздел 7   |
      +==============+
      var cityMonthTemperature = new CityMonthTemperature
      {
        Id = city,
        City = city
      };
      cityMonthTemperature.Temperatures[month - 1] =
        temperature;

      try
      {
        +==============+
        |   Раздел 8   |
        +==============+
        // Создаем и настраиваем document к возврату -
        // именно здесь вы сбрасываете объект document
        document = await client.CreateDocumentAsync(
          documentCollection.DocumentsLink,
          cityMonthTemperature);
      }
      catch (DocumentClientException ex)
      // Опущено для краткости
    }
  }
}

В разделе 1 нет никаких сюрпризов. Здесь код получает некоторую информацию о соединении из App.config. Вы можете получить всю информацию о соединении от портала. В разделе 2 создается экземпляр EventProcessorHost — базовый объект в Event Hubs SDK, позволяющий извлекать сообщения. Нижележащие события связываются с этим объектом в классе SimpleEventProcessor в разделе 3.

Асинхронный обратный вызов, ProcessEventsAsync, вызывается SDK. При этом передается параметр IEnumerable<EventData> messages, который вы разбираете, чтобы получить сообщения, хранившиеся в Event Hubs. Полученные сообщения вставляются в SQL Database и DocumentDB в разделе 4. Все низкоуровневые детали для InsertSqlMessage и InsertEntry вы найдете в решении Visual Studio.

В разделе 5 представлен класс, который выполняет саму операцию вставки в DocumentDB. Имя этой базы данных — TemperatureDB, а имя набора — CityTempCollection. В разделе 6 представлен запрос, который ищет город, используя LINQ-запрос. Название города уже могло быть вставлено в базу данных, поэтому выполняется проверка. Если город существует, данные по температурам обновляются.

В разделе 7 представлен сценарий, где город не добавлен. Вы создаете простой .NET-объект, который преобразуется в JSON-данные, как только происходит вставка. Это преобразование выполняется нижележащим SDK. Вы вставляете показания температуры по соответствующему для конкретного месяца смещению в массиве Temperatures. Наконец, в разделе 8 происходит обновление самого объекта документа.

Код, который выполняет только обновление температуры в сценарии, где город уже вставлен, опущен для краткости, но вы найдете его в репозитарии GitHub наряду со всем решением Visual Studio (bit.ly/1aSFF99).

Поскольку DocumentDB является частью предварительного предложения, вы должны использовать новый портал Azure Preview Portal на portal.azure.com. Одна из удобных особенностей DocumentDB — наличие Document Explorer, который позволяет создавать документы и данные, а также запрашивать и просматривать существующие данные (рис. 3).

Использование DocumentDB для создания, запроса и просмотра данных
Рис. 3. Использование DocumentDB для создания, запроса и просмотра данных

В прошлой статье я создал программу на C, работающую в Linux для использования транспортного протокола AMQP с целью вставки сообщений в Event Hubs. Тот код выполнялся в Linux VM, размещенной в Azure, поэтому я мог бы легко портировать его в реализацию Raspberry Pi на основе Debian. Если очень кратко, то первая статья была о публикации генерируемых сообщений, текущая — об использовании сообщений и их сохранении в постоянном хранилище, а заключительная статья будет посвящена возможности предоставлять постоянные данные мобильным клиентам и обеспечивать визуализацию нижележащих данных.


Бруно Теркали (Bruno Terkaly) — ведущий инженер программного обеспечения в Microsoft. Его основная цель — обеспечить разработку лидирующих в отрасли приложений и сервисов, способных работать на любых устройствах. Отвечает за развитие главных возможностей облачных и мобильных технологий на территории США и за ее пределами. Помогает партнерам в выводе их приложений на рынок, обеспечивая руководство при проектировании архитектур и глубокие технические знания на этапах оценки, разработки и развертывания приложений, создаваемых независимыми поставщиками ПО (ISV). Кроме того, тесно взаимодействует с группами облачных и мобильных технологий, организуя обратную связь и влияя на их дорожные карты.

Выражаю благодарность за рецензирование статьи экспертам Microsoft Райену Крокуру (Ryan CrawCour), Дэну Розанову (Dan Rosanova).