June 2015
Volume 30 Number 6
Windows Azure изнутри - Event Hubs для аналитики и визуализации. Часть 2
Продукты и технологии:
Microsoft Azure, Azure Event Hubs, Visual Studio 2013
В статье рассматриваются:
- использование данных с помощью Event Hubs;
- применение разных хранилищ данных;
- создание, сохранение и запрос сообщений.
Исходный код можно скачать по ссылке
Применяя Microsoft Azure Event Hubs для обработки огромных массивов данных, важно уделить особое внимание доставке и использованию этих данных. Это вторая из трех статей, посвященных визуализации данных в сценариях для Интернета вещей (Internet of Things, IoT). В позапрошлом номере (msdn.microsoft.com/magazine/dn948106) я обсудил получение и обработку данных в огромных масштабах, используя Azure Event Hubs, технологию, которая является частью предложения Service Bus и поддерживает как публикаторы информации (Raspberry Pi), так и ее потребители (программа на C#) с возможностью произвольного масштабирования. В этой статья сосредоточусь на стороне потребителя — на C#-коде, который читает данные из Event Hubs и сохраняет их в постоянном хранилище.
Эта статья разбита на четыре раздела. Во-первых, я начну с архитектуры и проиллюстрирую использование и хранение сообщений. Во-вторых, я рассмотрю задачи, которые нужно выполнять на Azure Portal. В-третьих, я опишу два разных хранилища данных и объясню, почему я выбрал именно их. В-четвертых, я завершу статью пояснениями по клиентскому коду на C#, который читает сообщения из Event Hubs, а также по коду, который сохраняет сообщения в Azure SQL Database и DocumentDB.
Архитектура Event Hubs
После чтения сообщений из Azure Event Hubs вам понадобится сохранить их как в Azure SQL Database, так и в DocumentDB. Azure SQL Database — это реляционная база данных в виде сервиса (Database as a Service, DaaS), которая масштабируется до тысяч баз данных. DocumentDB — полностью управляемый, масштабируемый сервис NoSQL-базы данных документов (подробнее о DocumentDB см. по ссылке bit.ly/1oWZP7i). Эта архитектура показана на рис. 1.
Рис. 1. Общая схема архитектуры Azure Event Hubs
Azure Datacenter | Информационный центр Azure |
Event Hubs | Event Hubs |
Event Consumer | Потребитель событий |
C# Program with Event Hub SDK | Программа на C# с Event Hub SDK |
SQL Server/Azure SQL Database | SQL Server/Azure SQL Database |
DocumentDB | DocumentDB |
Задачи, выполняемые на портале
Чтобы реализовать решение, нужно выполнить несколько задач на портале. На Azure Portal выполняются две категории задач. Первая — это подготовка Event Hubs, Azure SQL Database и DocumentDB. Подготовку Event Hubs мы выполнили в прошлой статье.
Чтобы читать сообщения из Event Hubs, а затем сохранять их в Azure SQL Database и DocumentDB, надо получить информацию о соединении от Azure Portal. Все настройки будут помещены в App.config в решение Visual Studio для приложения на C#. Я создам обычное консольное приложение, используя Event Hubs SDK.
Наконец, чтобы упростить операции с базами данных, я напишу пару хранимых процедур для Azure SQL Database. Вы найдете код для создания таблицы и двух хранимых процедур в файле DatabaseCode.txt, входящем в решение Visual Studio. Для создания таблицы и хранимых процедур можно задействовать SQL Server Management Studio. Для подключения Azure SQL Database в информационном центре Microsoft к вашей локальной копии Visual Studio используйте информацию о соединении от Azure Portal. Подробнее о том, как это делается, см. Azure Documentation по ссылке bit.ly/1K1BIeM.
В прошлой статье я продемонстрировал процесс подготовки (provisioning process) для Service Bus Event Hubs. Если вам нужны какие-то простые учебные пособия, прочитайте «Get Started with Event Hubs» по ссылке bit.ly/1F2gp9H.
После подготовки Event Hubs вы получите конечную точку. Вам потребуется скопировать информацию о соединении, чтобы все необходимое было помещено в App.config, когда вы будете писать код на C#. Также понадобится подготовить Azure SQL Database на Azure Portal. Подробно об этом процессе см. в документации по ссылке bit.ly/1Enln5c.
После создания базы данных нужно выполнить еще три простые задачи. Во-первых, создать таблицу температур. А во-вторых и в-третьих, создать две хранимые процедуры, с помощью которых вы будете удалять старые сообщения и вставлять новые. В проекте Visual Studio есть текстовый файл DatabaseCode.txt с определением таблицы для температур, а также две хранимые процедуры: CleanTable и InsertTempData.
Теперь переключимся на DocumentDB. Узнать больше о подготовке этого NoSQL-хранилища данных можно по ссылке bit.ly/1IBnGQ5. Вы также можете посмотреть видеоролик от Райена Крокура (CrawCour), одного из старших менеджеров программ в группе, которая шествует над этим продуктом с момента его появления.
Подготовив DocumentDB на портале, вы должны определить имя набора и имя базы данных. Рассматривайте DocumentDB как серию наборов, а набор — как серию документов. Аналогия в мире баз данных: набор — это таблица, а документ — запись. Это довольно вольная аналогия, так как NoSQL-хранилища данных обычно не имеют схем. В нашем приложение набору присвоено имя CityTempCollection, а базе данных — TemperatureDB.
Варианты хранилищ данных
Для начала разберемся в необходимости дополнительных хранилищ данных помимо того, которое изначально предоставляется Event Hubs. Например, одна из очевидных причин — сообщения в Event Hubs не хранятся постоянно. Вторая причина — нет возможности запрашивать сообщения с помощью любого языка запросов. С разделов Event Hub сообщения можно лишь последовательно считывать, где каждый клиентский «читатель» поддерживает собственный курсор в хранилище сообщений. Третья — Event Hubs выделяет вам 84 Гб хранилища событий со сроком хранения по умолчанию в течение 24 часов.
Хотя вариантов много, я выбрал Azure SQL Database и DocumentDB в качестве основных хранилищ для сообщений/данных, относящихся к температурам. У Azure SQL Database два крупных преимущества. Первое из них связано с тем, что эта хранилище находится в облаке, что дает несколько других плюсов: оно почти мгновенно подготавливается, экономично и реплицируется в трех экземплярах, что обеспечивает весьма высокую надежность.
Помимо эти очевидных выгод, второе крупное преимущество заключается в наличии для Azure SQL Database множества разнообразных средств бизнес-анализа, таких как Power BI. Это позволяет формировать интерактивные отчеты с весьма сложной визуализацией. Вы можете создавать мощные информационные панели (dashboards), которые можно потом использовать из браузера и с мобильного устройства.
В качестве второго хранилища данных используйте DocumentDB. Оно дает некоторые очевидные преимущества. Это полностью управляемый сервис, поэтому вам не придется беспокоиться об инфраструктуре и индивидуальных виртуальных машинах (VM). Кроме того, предоставляется соглашение по корпоративному уровню обслуживанию, как и в случае большинства сервисов на основе Azure. Наконец, DocumentDB тоже свободно от схем.
Хранилища документов без схем часто считаются желательными в контексте объектной ориентации и наследования. Дело в том, что наследование подразумевает, что у вас есть объекты, у которых какие-то атрибуты общие, а какие-то — специфичны для подтипа объектов.
В случае большинства реляционных баз данных вам понадобилась бы таблица для всех возможных атрибутов, где многие из них оставались бы равными null для неприменимых полей. Однако в базе данных без схемы можно хранить разные наборы дополнительных свойств. Это хорошо работает при рендеринге HTML, так как JavaScript может выполнять проверку на неизвестное дополнительное свойство и вызывать соответствующую функцию для вывода в отображаемую на экране таблицу.
Одно из других преимуществ базы данных без схемы (что может кем-то рассматриваться как недостаток) заключается в том, что она обеспечивает дополнительную гибкость во время разработки. Вы можете добавлять новый функционал без реорганизации базы данных. Это облегчает поддержание обратной совместимости с данными, созданными предыдущей версией. Минус этого подхода в том, что написание запросов дополнительных полей может стать сложным и запутанным.
Одна из областей, где хранилище данных на основе JSON вроде DocumentDB по-настоящему блистает, — использование приложений Node.js для предоставления данных мобильным или веб-приложениям. Родной формат JSON, компактный и выразительный, упрощает работу с ним и делает ее более естественной. Использовать хранилища данных на основе JSON с HTTP и REST так же удобно независимо от того, что вы применяете — Microsoft .NET Framework, JavaScript, Ruby, Java, PHP или Python. Я сосредоточусь на приложениях Node.js, которые читают данные DocumentDB и предоставляют их мобильным приложениям, в третьей статье из этой серии.
Использование и сохранение сообщений
Существует три распространенных подхода к использованию сообщений из Event Hubs. Как всегда, неизбежен компромисс между простотой и гибкостью. Самый простой способ — применение Stream Analytics, а самый сложный, но и самый гибкий — использование прямых приемников (direct receivers). Прямые приемники отвечают за свою координацию доступа к разделам в пределах группы потребителей (consumer group), при этом ваш код напрямую адресуется к идентификатору раздела при создании приемника для объекта EventHubConsumerGroup. Третий способ основан на абстракциях более высокого уровня, таких как EventProcessorHost. Я буду использовать этот способ, потому что он достаточно прост и в то же время достаточно гибок.
Stream Analytics — это полностью управляемый сервис, который облегчает получение и обработку сообщений от Event Hubs. Эти сообщения легко использовать, преобразовывать и помещать в Azure SQL Database. Подробнее об этом подходе см. по ссылке bit.ly/1IRvPDc.
Вы создаете базу данных, таблицу и серию запросов. Например, для чтения сообщений из Event Hubs и их отправки в SQL Server используется простое выражение «SELECT DeviceId, Temperature FROM input». Можно даже рекурсивно объединять запросы в цепочки и создавать серии запросов, которые преобразуют данные, используя конвейерный подход. Такая возможность фильтрации сообщений серией SQL-запросов является весьма мощной абстракцией.
Одно из более интересных расширений, доступных через Stream Analytics, имеет дело с временными окнами (time windows). Часто предъявляется требование выполнять вычисления (агрегацию) над неким набором или другие операции над подмножествами событий, которые укладываются в определенный период времени. Поскольку в сложных системах обработки событий время является критичным фактором, важно иметь простой способ работы с временным компонентом логики запроса в системе. В Azure Stream Analytics эти подмножества событий определяются окнами, которые представляют группы, объединенные по времени. Подробное объяснение различных типов поддерживаемых временных окон см. по ссылке bit.ly/1DIizfM.
Прямые приемники позволяют адресоваться к конкретным идентификаторам разделов Event Hub. Разделы приносят пользу в двух отношениях. Во-первых, они позволяют масштабировать публикацию и потребление сообщений. Во-вторых, они дают возможность разделять данные.
Я воспользуюсь подходом с EventProcessorHost, который является «интеллектуальным» агентом для .NET-потребителей; он управляет доступом и смещением на каждом разделе. Более подробное описание см. в блоге ServiceBus (bit.ly/1aO5I19). EventProcessorHost упрощает чтение сообщений, их преобразование и запись в постоянное хранилище.
Я напишу программу на C#, которая использует Event Hubs SDK и помещает данные как в Azure SQL Database, так и в DocumentDB (советую прочитать руководство «Event Hubs Programming Guide» по ссылке bit.ly/1IBrpNz). Весь исходный код можно получить с bit.ly/1aSFF99. Я убрал из исходного кода пароль и секретные ключи, но информацию о соединении вы получите сами от портала. Чтобы получить весь код, установите Git и выполните команду git clone. Некоторые файлы консольного приложения показаны на рис. 2. Код основного драйвера вы найдете в Program.cs.
Рис. 2. Код для использования и сохранения сообщений
+==============+
| Раздел 1 |
+==============+
internal class Program
{
private static string eventHubConnectionString =
ConfigurationManager.AppSettings[
"eventHubConnectionString"];
static private string eventHubName =
ConfigurationManager.AppSettings["eventHubName"];
static private string storageAccountName =
ConfigurationManager.AppSettings["storageAccountName"];
static private string storageAccountKey =
ConfigurationManager.AppSettings["storageAccountKey"];
static private string storageConnectionString =
string.Format("DefaultEndpointsProtocol=https;" +
"AccountName={0};AccountKey={1}",
storageAccountName, storageAccountKey);
static private string eventProcessorHostName =
Guid.NewGuid().ToString();
private static void Main(string[] args)
{
// Запускаем основной механизм использования сообщений
+==============+
| Раздел 2 |
+==============+
var eventProcessorHost =
new EventProcessorHost(eventProcessorHostName,
eventHubName, EventHubConsumerGroup.DefaultGroupName,
eventHubConnectionString, storageConnectionString);
// Асинхронно используем сообщения от Event Hub
eventProcessorHost.RegisterEventProcessorAsync
<SimpleEventProcessor>().Wait();
Console.WriteLine("Receiving. Press enter key
to stop worker.");
Console.ReadLine();
}
}
+==============+
| Раздел 3 |
+==============+
// SimpleEventProcessor.cs
class SimpleEventProcessor : IEventProcessor
{
private DataManager dataManager =
new DataManager(new SQLDatabaseManager());
// ...Опущены для краткости
public SimpleEventProcessor() ...
async Task IEventProcessor.CloseAsync(
PartitionContext context, CloseReason reason) ...
Task IEventProcessor.OpenAsync(PartitionContext context) ...
async Task IEventProcessor.ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
// Перебираем сообщения в цикле для вставки
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(
eventData.GetBytes());
// Поля разделяются запятыми
string[] msg = data.Split(',');
if (msg.Length > 2)
{
+==============+
| Раздел 4 |
+==============+
// Вставка в SQL
dataManager.InsertSqlMessage(msg[0], Convert.ToInt32(
msg[1]), Convert.ToDouble(msg[2]));
// Вставляем в глобальный объект DocumentDB
// (глобальный из-за проблем с синхронизацией потоков)
Globals.DocDb.InsertEntry(msg[0], Convert.ToInt32(
msg[1]), Convert.ToDouble(msg[2]));
}
Console.WriteLine(string.Format(
"Message received. Partition: '{0}', " +
"Data: '{1}'", context.Lease.PartitionId, data));
}
// Вызываем CheckpointAsync каждые 5 мин, чтобы рабочая
// роль могла возобновить обработку с 5 мин назад,
// если она перезапускается
if (this.checkpointStopWatch.Elapsed >
TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
lock (this)
{
this.checkpointStopWatch.Reset();
}
}
}
}
+==============+
| Раздел 5 |
+==============+
// DocumentDBManager.cs
public class DocumentDbManager
{
// Опущено для краткости
public string collectionName = "CityTempCollection";
public string databaseName = "TemperatureDB";
public async Task<bool> InsertEntry(
string city, int month, double temperature)
{
dynamic document = null;
try
{
+==============+
| Раздел 6 |
+==============+
// Проверяем, есть ли такой город (city)
document = client.CreateDocumentQuery(
documentCollection.DocumentsLink)
.Where(d => d.Id == city).AsEnumerable().
FirstOrDefault();
}
catch (Exception ex)
{
throw;
}
bool docExists = (document != null);
// Документа пока нет
if (!docExists)
{
+==============+
| Раздел 7 |
+==============+
var cityMonthTemperature = new CityMonthTemperature
{
Id = city,
City = city
};
cityMonthTemperature.Temperatures[month - 1] =
temperature;
try
{
+==============+
| Раздел 8 |
+==============+
// Создаем и настраиваем document к возврату -
// именно здесь вы сбрасываете объект document
document = await client.CreateDocumentAsync(
documentCollection.DocumentsLink,
cityMonthTemperature);
}
catch (DocumentClientException ex)
// Опущено для краткости
}
}
}
В разделе 1 нет никаких сюрпризов. Здесь код получает некоторую информацию о соединении из App.config. Вы можете получить всю информацию о соединении от портала. В разделе 2 создается экземпляр EventProcessorHost — базовый объект в Event Hubs SDK, позволяющий извлекать сообщения. Нижележащие события связываются с этим объектом в классе SimpleEventProcessor в разделе 3.
Асинхронный обратный вызов, ProcessEventsAsync, вызывается SDK. При этом передается параметр IEnumerable<EventData> messages, который вы разбираете, чтобы получить сообщения, хранившиеся в Event Hubs. Полученные сообщения вставляются в SQL Database и DocumentDB в разделе 4. Все низкоуровневые детали для InsertSqlMessage и InsertEntry вы найдете в решении Visual Studio.
В разделе 5 представлен класс, который выполняет саму операцию вставки в DocumentDB. Имя этой базы данных — TemperatureDB, а имя набора — CityTempCollection. В разделе 6 представлен запрос, который ищет город, используя LINQ-запрос. Название города уже могло быть вставлено в базу данных, поэтому выполняется проверка. Если город существует, данные по температурам обновляются.
В разделе 7 представлен сценарий, где город не добавлен. Вы создаете простой .NET-объект, который преобразуется в JSON-данные, как только происходит вставка. Это преобразование выполняется нижележащим SDK. Вы вставляете показания температуры по соответствующему для конкретного месяца смещению в массиве Temperatures. Наконец, в разделе 8 происходит обновление самого объекта документа.
Код, который выполняет только обновление температуры в сценарии, где город уже вставлен, опущен для краткости, но вы найдете его в репозитарии GitHub наряду со всем решением Visual Studio (bit.ly/1aSFF99).
Поскольку DocumentDB является частью предварительного предложения, вы должны использовать новый портал Azure Preview Portal на portal.azure.com. Одна из удобных особенностей DocumentDB — наличие Document Explorer, который позволяет создавать документы и данные, а также запрашивать и просматривать существующие данные (рис. 3).
Рис. 3. Использование DocumentDB для создания, запроса и просмотра данных
В прошлой статье я создал программу на C, работающую в Linux для использования транспортного протокола AMQP с целью вставки сообщений в Event Hubs. Тот код выполнялся в Linux VM, размещенной в Azure, поэтому я мог бы легко портировать его в реализацию Raspberry Pi на основе Debian. Если очень кратко, то первая статья была о публикации генерируемых сообщений, текущая — об использовании сообщений и их сохранении в постоянном хранилище, а заключительная статья будет посвящена возможности предоставлять постоянные данные мобильным клиентам и обеспечивать визуализацию нижележащих данных.
Бруно Теркали (Bruno Terkaly) — ведущий инженер программного обеспечения в Microsoft. Его основная цель — обеспечить разработку лидирующих в отрасли приложений и сервисов, способных работать на любых устройствах. Отвечает за развитие главных возможностей облачных и мобильных технологий на территории США и за ее пределами. Помогает партнерам в выводе их приложений на рынок, обеспечивая руководство при проектировании архитектур и глубокие технические знания на этапах оценки, разработки и развертывания приложений, создаваемых независимыми поставщиками ПО (ISV). Кроме того, тесно взаимодействует с группами облачных и мобильных технологий, организуя обратную связь и влияя на их дорожные карты.
Выражаю благодарность за рецензирование статьи экспертам Microsoft Райену Крокуру (Ryan CrawCour), Дэну Розанову (Dan Rosanova).