Azure の詳細

Event Hubs で分析と表示を実現する (第 2 部)

Bruno Terkaly

Microsoft Azure Event Hubs を使用して大量のデータを処理する際は、データの配信と利用の両方に注目する必要があります。今回は、モノのインターネット (IoT) のシナリオにおけるデータの表示について説明する 3 部構成の連載の第 2 部です。4 月号 (msdn.microsoft.com/magazine/dn948106) では、Azure Event Hubs を使用した大量データの取り込みについて説明しました。Azure Event Hubs は、Service Bus サービスを構成し、大規模に情報パブリッシャー (Raspberry Pi) と情報コンシューマー (C# プログラム) をサポートするテクノロジです。今回は、データの利用側に注目し、データを Event Hubs から読み取って永続的なストアに保存する C# コードを取り上げます。

今回は、4 つのトピックに分かれています。最初に、アーキテクチャを取り上げ、メッセージの利用と保存について示します。次に、Azure ポータルで行う必要がある作業について説明します。続いて、使用する 2 種類のデータ ストアについて解説し、それらを選んだ理由を示します。最後に、Event Hubs からメッセージを読み取る クライアント C# コードと、メッセージを Azure SQL Database と DocumentDB に保存するコードを説明してまとめます。

Event Hubs のアーキテクチャ

Azure Event Hubs からメッセージを読み取ったら、そのメッセージを Azure SQL Database と DocumentDB の両方に永続化します。Azure SQL Database は、何千ものデータベースにスケーリングできるリレーショナル DaaS (サービスとしてのデータベース) です。DocumentDB は、拡張性が高い、完全管理型の NoSQL ドキュメント データベース サービスです (DocumentDB の詳細については、azure.microsoft.com/ja-jp/documentation/services/documentdb/ を参照してください)。このアーキテクチャを図 1 に示します。

Azure Event Hubs アーキテクチャの概要
図 1 Azure Event Hubs アーキテクチャの概要

必要な作業

ソリューションを実現するためには、ポータルでいくつか作業を行う必要があります。Azure ポータルで行う作業には 2 つのカテゴリがあります。1 つは、Event Hubs、Azure SQL Database、および DocumentDB のプロビジョニングです。Event Hubs のプロビジョニングは、4 月号で取り上げました。

Event Hubs からメッセージを読み取って、そのメッセージを Azure SQL Database と DocumentDB に永続化するには、Azure ポータルから接続情報を入手する必要があります。すべての設定を、C# アプリケーション用 Visual Studio ソリューションの App.config に配置し、Event Hubs SDK を使用して通常のコンソール アプリケーションを作成します。

最後に、データベース操作をサポートするために、Azure SQL Database 用にいくつかストアド プロシージャを作成します。テーブルを作成するコードと 2 つのストアド プロシージャは Visual Studio ソリューションに含まれる DatabaseCode.txt で確認できます。データベース テーブルとストアド プロシージャの作成には、SQL Server Management Studio を使用します。Azure ポータルの接続情報を使用して、マイクロソフトのデータセンターにある Azure SQL Database を Visual Studio のローカル コピーに接続します。この方法の詳細については、azure.microsoft.com/ja-jp/documentation/articles/sql-database-manage-azure-ssms/ の Azure ドキュメントを参照してください。

4 月号では、Service Bus Event Hubs のプロビジョニング プロセスを説明しました。簡単なチュートリアルを確認するには、「Event Hubs の使用」(azure.microsoft.com/ja-jp/documentation/articles/service-bus-event-hubs-csharp-ephcs-getstarted/) を参照してください。

Event Hubs をプロビジョニングしたら、エンドポイントを受け取ります。C# コードを作成する際には、Event Hubs の名前とエンドポイントの接続情報をコピーして、App.config に配置する必要があります。また、Azure ポータルで Azure SQL Database をプロビジョニングすることも必要です。このプロセスの詳細については、azure.microsoft.com/ja-jp/documentation/articles/sql-database-get-started/ (英語) のドキュメントを参照してください。

データベースを作成したら、さらに 3 つの簡単な作業を行います。1 つ目は、気温テーブルの作成です。残りの 2 つは、古いメッセージのクリアと新しいメッセージの挿入に使用する 2 つのストアド プロシージャの作業です。Visual Studio プロジェクトには、気温テーブルの定義と 2 つのストアド プロシージャ (CleanTable と InsertTempData) を記述した DatabaseCode.txt というテキスト ファイルがあります。

Event Hubs と Azure SQL Database コンポーネントに対応したので、今度は DocumentDB に取り組みます。この NoSQL データ ストアのプロビジョニングの詳細については、azure.microsoft.com/ja-jp/documentation/articles/documentdb-introduction/ を参照してください。製品の誕生以来製品を見守ってきたチームのシニア プログラム マネージャーの 1 人、Ryan CrawCour のビデオも視聴できます。

ポータルで DocumentDB をプロビジョニングしたら、コレクション名とデータベース名を定義する必要があります。考え方としては、DocumentDB を一連のコレクションとして、コレクションを一連のドキュメントとしてとらえることです。データベースの世界にたとえると、コレクションがテーブル、ドキュメントがレコードに当たります。通常、NoSQL データ ストアにはスキーマがないため、このたとえは大雑把なものです。アプリケーションで使用するコレクション名は CityTempCollection、データベース名は TemperatureDB です。

データ ストレージのオプションについて

データ ストレージの説明の出発点として、Event Hubs にネイティブに用意されているデータストアよりも優れたセカンダリ データ ストアの必要性について説明します。たとえば、永続化を考えるわかりやすい理由の 1 つは、Event Hubs に保存されるメッセージは永続的なものではないことです。2 つ目は、どのようなクエリ言語を使用しても、メッセージをクエリできないことです。メッセージは Event Hub パーティションから順番に読み取ることしかできません。各クライアント リーダーはメッセージ ストアの独自のカーソルを保持します。3 つ目は、Event Hubs に用意されているイベント ストレージは 84 GB、既定の保持期間は 24 時間であることです。

さまざまなオプションがありますが、今回は気温関係のメッセージとデータのプライマリ データ ストアとして Azure SQL Database と DocumentDB を選択しました。Azure SQL Database の利点は 2 つあります。1 つは、クラウドにホストされており、いくつか役立つ機能を備えていることに関係します。Azure SQL Database は、ほぼ瞬時にプロビジョニングできるうえ、経済的で、三重のレプリケーションが行われているため非常に堅牢です。

このような明白なメリット以外にも、Azure SQL Database には、Power BI などのビジネス インテリジェンス ツールが多数組み込まれています。Power BI を使用すると、高度な視覚効果を利用する対話形式のレポートを作成できます。また、強力なダッシュボードを構築して、ブラウザーやモバイル デバイスから使用することもできます。

2 つ目のデータ ストアとして、DocumentDB を使用します。DocumentDB にもいくつかわかりやすい利点があります。完全管理型のサービスなので、インフラストラクチャや個々の仮想マシン (VM) に対応する手間がかかりません。さらに、Azure ベースのほとんどのサービスと同様、エンタープライズ サービスレベル契約が付属しています。また、DocumentDB にはスキーマもありません。

多くの場合、オブジェクト指向と継承においては、スキーマのないドキュメント ストアが適切だと考えられています。それは、継承とは、いくつか共通する属性と、いくつかのオブジェクトのサブタイプ固有の属性を持つオブジェクトがあることを意味するためです。

ほとんどのリレーショナル データベースでは、考えられるすべての属性を含むテーブルが必要です。その属性に該当しない多くのフィールドが null になります。これに対して、スキーマのないデータベースの場合、さまざまなオプションのプロパティのセットを保存できます。JavaScript を使用すると、不明なオプションのプロパティを確認して、適切な関数を呼び出し、テーブルに出力して表示できるため、HTML をレンダリングする場合はスキーマのないデータベースが有効です。

スキーマのないデータベースの他の利点 (欠点と考える人もいます) は、開発中の機敏性が向上することです。データベースを再構築することなく新しい機能を追加できるため、以前のバージョンで作成したデータとの下位互換性の確保が容易になります。このアプローチの短所は、オプションのフィールドに対するクエリの作成が複雑になる可能性があることです。

DocumentDB のような JSON ベースのデータ ストアが真価を発揮する分野の 1 つは、Node.js アプリケーションを使用してデータをモバイルや Web アプリケーションに公開する場合です。簡潔で表現力の高いネイティブ JSON 形式を使用すると、処理が簡単で自然になります。また、Microsoft .NET Framework、JavaScript、Ruby、Java、PHP、Python のいずれを使用している場合でも、HTTP と REST で JSON ベースのデータ ストアを簡単に使用できます。DocumentDB データの読み取りとモバイル アプリケーションへのデータの公開を行う Node.js アプリケーションについては、次回に取り上げる予定です。

メッセージの利用と永続化

Event Hubs のメッセージを利用する場合、3 つの一般的なアプローチがあります。当然のことながら、簡便さと柔軟性の間にはトレードオフがあります。簡単なのは、Stream Analytics を使用する方法です。もう 1 つは、直接レシーバーを使用する方法です。直接レシーバーは、コンシューマー グループ内にあるパーティションへのアクセスの連携を処理します。これにより、EventHubConsumerGroup オブジェクトのレシーバーを作成する際に、コードからパーティション ID に直接アクセスできます。また、EventProcessorHost などの上位レベルの抽象化を使用する方法もあります。これはシンプルですが十分な柔軟性を備えているため、この方法を使用します。

Stream Analytics は Event Hubs から簡単にメッセージを取り込むことができる完全管理型のサービスです。これを使用すると、メッセージの利用、変換、および Azure SQL Database への配置を簡単に行えます。このアプローチの詳細については、azure.microsoft.com/ja-jp/documentation/articles/stream-analytics-get-started/ を参照してください。

データベース、テーブル、および一連のクエリの作成は簡単です。たとえば、単純な select ステートメントを使用して Event Hubs からメッセージを読み取り ("SELECT DeviceId, Temperature FROM input")、そのメッセージを SQL Server に配置できます。クエリを再帰的に連鎖させ、パイプライン アプローチを使用してデータを変換する一連のクエリを作成することもできます。一連の SQL クエリを通じてメッセージをフィルター処理するこの機能は、強力な抽象化です。

Stream Analytics で使用できるすばらしい拡張機能の 1 つが、「ウィンドウ化」です。多くの場合、セット ベースの計算 (集計) や、一定の期間内に含まれるイベントのサブセットに対する操作の実行には要件があります。時間は複合イベント処理システムにとって非常に重要なものなので、システムでクエリ ロジックの時間コンポーネントを操作する単純な方法を用意しておくことが重要です。Azure Stream Analytics では、時間ごとのグループ化を示すウィンドウを通じて、これらのイベントのサブセットが定義されています。サポート対象のさまざまな時間ウィンドウの種類の詳細については、msdn.microsoft.com/ja-jp/library/azure/dn835019.aspx (英語) を参照してください。

直接レシーバーを使用すると、特定の Event Hub パーティション ID をターゲットに設定できます。パーティションは 2 つの方法で役立ちます。1 つは、パーティションを使用するとメッセージの発行と利用を拡張できること、もう 1 つは、データを個別のサイロに分離できることです。

今回は、EventProcessorHost アプローチを採用します。これは、パーティションのアクセスを管理する .NET コンシューマーのインテリジェント エージェントです。詳細については、ServiceBus のブログ記事 (bit.ly/1aO5I19、英語) を参照してください。EventProcessorHost を使用して、メッセージの読み取り、変換、および永続ストレージへの書き込みを簡単に行えます。

Event Hubs SDK を使用し、Azure SQL Database と DocumentDB の両方に書き込みを行うカスタム C# プログラムを作成します (「Event Hub プログラミング ガイド」(msdn.microsoft.com/ja-jp/library/azure/dn789972.aspx) 参照)。このソース コードはすべて bit.ly/1aSFF99 (英語) で入手できます。パスワードと秘密鍵は削除してありますが、ポータルから接続情報を取得できます。すべてのコードを取得するには、Git をインストールして、コマンド git clone を実行します。Visual Studio コンソール アプリケーションに表示されるファイルの一部を図 2 に示します。メイン ドライバー コードは Program.cs に含めています。

図 2 メッセージを使用および永続化するコード

+==============+
|  Section 1   |
+==============+
internal class Program
{
  private static string eventHubConnectionString =             
    ConfigurationManager.AppSettings["eventHubConnectionString"];
  static private string eventHubName = 
    ConfigurationManager.AppSettings["eventHubName"];
  static private string storageAccountName =
    ConfigurationManager.AppSettings["storageAccountName"];
  static private string storageAccountKey =
    ConfigurationManager.AppSettings["storageAccountKey"];
  static private string storageConnectionString =
    string.Format("DefaultEndpointsProtocol=https;" +
    "AccountName={0};AccountKey={1}",
    storageAccountName, storageAccountKey);
  static private string eventProcessorHostName = Guid.NewGuid().ToString();
  private static void Main(string[] args)
  {
    // Start the main message consumption engine
    +==============+
    |  Section 2   |
    +==============+
    var eventProcessorHost =
      new EventProcessorHost(eventProcessorHostName,
      eventHubName, EventHubConsumerGroup.DefaultGroupName,
      eventHubConnectionString, storageConnectionString);
    // Asynchronously consume message from Event Hub
    eventProcessorHost.
      RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();
    Console.WriteLine("Receiving. Press enter key to stop worker.");
    Console.ReadLine();
  }
}
+==============+
|  Section 3   |
+==============+
// SimpleEventProcessor.cs
class SimpleEventProcessor : IEventProcessor
{
  private DataManager dataManager = new DataManager(new SQLDatabaseManager());
  // ... Means omitted for brevity
  public SimpleEventProcessor()  ...
  async Task IEventProcessor.CloseAsync(PartitionContext context,
    CloseReason reason) ...
  Task IEventProcessor.OpenAsync(PartitionContext context) ...
  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, 
    IEnumerable<EventData> messages)
  {
    // Loop through messages to insert
    foreach (EventData eventData in messages)
    {
      string data = Encoding.UTF8.GetString(eventData.GetBytes());
      // Comma separated so divide up fields
      string[] msg = data.Split(',');
      if (msg.Length > 2)
      {
        +==============+
        |  Section 4   |
        +==============+
        // Insert into SQL
        dataManager.InsertSqlMessage(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
        // Insert into global DocumentDB object
        // (global because of thread timing issues)
        Globals.DocDb.InsertEntry(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
      }
      Console.WriteLine(string.Format("Message received.  Partition: '{0}', " +
        "Data: '{1}'", context.Lease.PartitionId, data));
    }
    // Call checkpoint every 5 minutes, so that worker can resume
    // processing from the 5 minutes back if it restarts
    if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
    {
      await context.CheckpointAsync();
      lock (this)
      {
        this.checkpointStopWatch.Reset();
      }
    }
  }
}
+==============+
|  Section 5   |
+==============+
// DocumentDBManager.cs
public class DocumentDbManager
{
  // Omitted variables for brevity
  public string collectionName = "CityTempCollection";
  public string databaseName = "TemperatureDB";
  public async Task<bool> InsertEntry(
    string city, int month, double temperature)
  {
    dynamic document = null;
    try
    {
      +==============+
      |  Section 6   |
      +==============+
      // Check if City exists
      document = client.CreateDocumentQuery(documentCollection.DocumentsLink)
        .Where(d => d.Id == city).AsEnumerable().FirstOrDefault();
    }
    catch (Exception ex)
    {
      throw;
    }
    bool docExists = (document != null);
    // Document DOESN'T exist, yet
    if (!docExists)
    {
      +==============+
      |  Section 7   |
      +==============+
      var cityMonthTemperature = new CityMonthTemperature
      {
        Id = city,
        City = city
      };
      cityMonthTemperature.Temperatures[month - 1] = temperature;
      try
      {
        +==============+
        |  Section 8   |
        +==============+
        // Create, and set document to the return, yes --
        // here is where you reset the document object
        document = await client.CreateDocumentAsync(
          documentCollection.DocumentsLink, cityMonthTemperature);
      }
      catch (DocumentClientException ex)
      // Omitted for brevity   
    }
  }
}

セクション 1 には特に不思議なところはありません。App.config から接続情報の一部を受け取ります。ポータルから入手できるすべての接続情報を取得できます。セクション 2 では、Event Hubs SDK の中核オブジェクトである EventProcessorHost のインスタンスを実際に作成して、メッセージを取得できるようにします。セクション 3 では、このオブジェクトに関連する基盤のイベントが SimpleEventProcessor クラス内にあることがわかります。

非同期コールバックの ProcessEventsAsync は SDK によって呼び出されます。これは、パラメーター IEnumerable<EventData> messages を渡します。これを解析して、Event Hubs に保存されているメッセージを取得します。その後、messages パラメーターを解析して、解析したメッセージをセクション 4 で SQL Database と DocumentDB に挿入します。Visual Studio ソリューションで、InsertSqlMessage と InsertEntry の低レベルのコードの詳細をすべて確認できます。

セクション 5 は、DocumentDB への挿入操作を実際に行うクラスを示しています。データベース名は TemperatureDB、コレクション名は CityTempCollection です。セクション 6 は、LINQ クエリを使用して都市を検索するクエリを示しています。このロジックでは、都市が以前に挿入されたかどうかをチェックします。必要な処理は、都市が存在する場合に気温データを更新することです。

セクション 7 は、都市が挿入されていないシナリオを示しています。挿入が行われたら JSON データに変換される、単純な .NET オブジェクトを作成します。この変換は、基盤となる SDK によって処理されます。気温配列の適切な月のオフセットに気温を挿入します。最後に、セクション 8 で、実際にドキュメント オブジェクトを更新します。

このコード スニペットでは、簡潔にするため、都市が既に挿入されているシナリオで気温を更新するコードを省略していますが、GitHub リポジトリ (bit.ly/1aSFF99、英語) でこのコードと Visual Studio ソリューション全体を確認できます。

DocumentDB はプレビュー段階のサービスの一部であるため、新しい Azure プレビュー ポータル (portal.azure.com) を使用する必要があります。DocumentDB の便利な機能の 1 つは、ドキュメントとデータの作成、データのクエリ、既存のデータの表示などを行えるドキュメント エクスプローラーです (図 3 参照)。

DocumentDB を使用したデータの
図 3 DocumentDB を使用したデータの作成、クエリ、および表示

4 月号では、AMQP トランスポート プロトコルを使用してメッセージを Event Hubs に挿入する、Linux で実行する C プログラムを作成しました。このコードは、Azure にホストされた Linux VM で実行したため、Debian ベースの Raspberry Pi 実装に簡単に移植できました。手短に言えば、第 1 部ではメッセージ発行の提供について、今回はメッセージの利用とメッセージの永続化について説明しました。最後の次回は、永続化したデータをモバイル クライアントに公開し、基盤となるデータを表示する機能について取り上げます。


Bruno Terkaly* は、デバイスに依存しない、業界をリードするアプリケーションやサービスを開発できるようにすることを目標にするマイクロソフトのプリンシパル ソフトウェア エンジニアです。テクノロジが実現可能かどうかという視点を通り越して、米国で最高のクラウド商談やモバイル商談を進めることを担当しています。ISV が評価、開発、配置を行う際に、アーキテクチャに関するガイダンスを提供したり、技術的な細かいサポート作業を行うことによって、パートナーがアプリケーションを市場に投入できるようにサポートしています。また、フィードバックを提供したり、ロードマップに影響を与えて、クラウドやモバイルのエンジニアリング グループと密接に連携することも行っています。*

この記事のレビューに協力してくれたマイクロソフト技術スタッフの Ryan CrawCour と Dan Rosanova に心より感謝いたします。
Ryan CrawCour は、20 年にわたりデータベースに携わるベテランです。はるか昔に、SQL Server 4.2 用に初めてのストアド プロシージャの作成を開始しました。多くのカーソル、結合、ストアド プロシージャなどの後、NoSQL ソリューションの自由で魅力的な世界の探求を始めました。Ryan は現在、レドモンドの DocumentDB 製品チームのプログラム マネージャーとして、このまったく新しい NoSQL DaaS (サービスとしてのデータベース) の将来を形作るために取り組んでいます。

Dan Rosanova は、Azure Service Bus チームで Event Hubs、メッセージング (キューとトピック)、およびリレーを担当するシニア プログラム マネージャーです。Dan は、メッセージングと分散コンピューティング分野で 16 年の経験を持ち、主にハイパースケールの進化的計算関連の仕事をしています。