Azure Stream Analytics を使用したリアルタイム IoT データ ストリームの処理

この記事では、モノのインターネット (IoT) デバイスからデータを収集するストリーム処理ロジックの作成方法について学習します。 実際のモノのインターネット (IoT) ユース ケースを使用して、迅速で経済的なソリューションを構築する方法を紹介します。

前提条件

シナリオ

Contoso は工業オートメーションの領域で活動する会社で、自社の製造工程を自動化しています。 この工場の機械には、リアルタイムでデータのストリームを生成することのできるセンサーがあります。 このシナリオにおいて、生産現場マネージャーは、センサー データからリアルタイムの詳細情報を取得し、パターンを見つけて、それらに対処したいと考えています。 センサー データに対して Stream Analytics クエリ言語 (SAQL) を使用して、データの受信ストリームから興味深いパターンを検出できます。

この例では、データは Texas Instruments 社のセンサー タグ デバイスから生成されます。 データのペイロードは、次のサンプル スニペットに示すように JSON 形式です。

{
    "time": "2016-01-26T20:47:53.0000000",  
    "dspl": "sensorE",  
    "temp": 123,  
    "hmdt": 34  
}  

実際のシナリオでは、何百ものこのようなセンサーがストリームとしてイベントを生成することになります。 ゲートウェイ デバイスがコードを実行し、これらのイベントを Azure Event Hubs または Azure IoT Hub にプッシュできれば理想的です。 Stream Analytics ジョブでそれらのイベントを Event Hubs または IoT Hubs から取り込み、そのストリームに対してリアルタイム分析クエリを実行します。 結果はその後、いずれかのサポートされている出力に送信することになります。

このガイドでは使いやすさを考えて、現実のセンサー タグ デバイスからキャプチャしたサンプル データ ファイルを用意しています。 このサンプル データに対してクエリを実行し、結果を確認できます。 以降のチュートリアルでは、ジョブを入力と出力に関連付け、それらを Azure サービスにデプロイする方法を学習します。

Stream Analytics のジョブの作成

  1. Azure Portal に移動します。

  2. 左側のナビゲーション メニューで、[すべてのサービス] を選択し、[分析] を選択し、[Stream Analytics jobs] (Stream Analytics ジョブ) の上にマウス ポインターを置き、[作成] を選択します。

    Stream Analytics ジョブの [作成] ボタンの選択を示すスクリーンショット。

  3. [新しい Stream Analytics ジョブ] ページで、次の手順のようにします。

    1. [サブスクリプション] で、自分の Azure サブスクリプションを選択します。

    2. [リソース グループ] で、既存のリソース グループを選ぶか、リソース グループを作成します。

    3. [名前] に、Stream Analytics ジョブの一意の名前を入力します。

    4. Stream Analytics ジョブを作成するリージョンを選びます。 処理速度を向上させ、コストを削減するために、リソース グループとすべてのリソースに同じ場所を使用してください。

    5. [Review + create](レビュー + 作成) を選択します。

      Stream Analytics ジョブ ページを示すスクリーンショット。

  4. [確認および作成] ページで、設定を確認し、 [作成] を選択します。

  5. デプロイが成功したら、[リソースに移動] を選んで Stream Analytics ジョブの [Stream Analytics ジョブ] ページに移動します。

Azure Stream Analytics クエリの作成

ジョブが作成されたら、クエリを記述します。 入力または出力をジョブに接続しなくても、サンプル データに対してクエリをテストできます。

  1. GitHub から HelloWorldASA-InputStream.json をダウンロードします。

  2. Azure portal の [Azure Stream Analytics ジョブ] ページで、左側のメニューから [ジョブ トポロジ] の下の [クエリ] を選択します。

  3. [Upload sample input] (サンプル入力のアップロード) を選択し、ダウンロードした HelloWorldASA-InputStream.json ファイルを選択して、[OK] を選択します。

    [Upload sample input] (サンプル入力のアップロード) が選択された [クエリ] ページのスクリーンショット。

  4. データのプレビューが 入力プレビュー テーブルに自動的に設定されることに注意してください。

    [Input preview] (入力のプレビュー) タブのサンプル入力データを示すスクリーンショット。

クエリ: 生データのアーカイブ

クエリの最も単純な形式は、すべての入力データを指定された出力にアーカイブするパススルー クエリです。 このクエリは、新しい Azure Stream Analytics ジョブで設定された既定のクエリです。

  1. [クエリ] ウィンドウで、次のクエリを入力し、ツールバーの [Test query] (クエリのテスト) を選択します。

    SELECT
        *
    INTO
        youroutputalias
    FROM
        yourinputalias
    
  2. 下部ウィンドウの [Test results] (テスト結果) タブで結果を確認します。

    サンプル クエリとその結果を示すスクリーンショット。

クエリ: 条件に基づいたデータのフィルター処理

条件に基づいて結果をフィルター処理するようにクエリを更新してみましょう。 たとえば、次のクエリは、sensorA からのイベントを表示します。

  1. 次のサンプルを使用してこのクエリを更新します。

    SELECT 
        time,
        dspl AS SensorName,
        temp AS Temperature,
        hmdt AS Humidity
    INTO
       youroutputalias
    FROM
        yourinputalias
    WHERE dspl='sensorA'
    
  2. [Test query] (クエリのテスト) を選択してクエリの結果を確認します。

    フィルターを使用したクエリ結果を示すスクリーンショット。

クエリ: ビジネス ワークフローをトリガーするアラート

クエリについて、もう少し詳しく説明します。 あらゆる種類のセンサーを対象に、30 秒間隔で平均温度を監視し、平均温度が 100 度を超える場合にのみ結果を表示するのであれば、

  1. クエリを次のように更新します。

    SELECT 
        System.Timestamp AS OutputTime,
        dspl AS SensorName,
        Avg(temp) AS AvgTemperature
    INTO
       youroutputalias
    FROM
        yourinputalias TIMESTAMP BY time
    GROUP BY TumblingWindow(second,30),dspl
    HAVING Avg(temp)>100
    
  2. [Test query] (クエリのテスト) を選択してクエリの結果を確認します。

    繰り返しウィンドウを使用するクエリを示すスクリーンショット。

    ご覧のように、結果に含まれるのは 245 行のみで、平均温度が 100 度を超えるセンサーの名前が一覧表示されます。 このクエリでは、センサー名である dspl 別に、30 秒のタンブリング ウィンドウでイベントのストリームをグループ化しています。 一時的なクエリでは、時間の進み方を指定する必要があります。 ここでは、一時的な計算すべてに時間を関連付けるため、TIMESTAMP BY 句を使用して OUTPUTTIME 列を指定しました。 詳細については、時間管理ウィンドウ関数に関するページを参照してください。

クエリ: イベントがないことを検出する

入力イベントがないことを検出するためのクエリは、どのように記述すればよいのでしょうか。 センサーが最後にデータを送信してから 5 秒間イベントを送信しなかったタイミングを見つけましょう。

  1. クエリを次のように更新します。

    SELECT 
        t1.time,
        t1.dspl AS SensorName
    INTO
       youroutputalias
    FROM
        yourinputalias t1 TIMESTAMP BY time
    LEFT OUTER JOIN yourinputalias t2 TIMESTAMP BY time
    ON
        t1.dspl=t2.dspl AND
        DATEDIFF(second,t1,t2) BETWEEN 1 and 5
    WHERE t2.dspl IS NULL
    
  2. [Test query] (クエリのテスト) を選択してクエリの結果を確認します。

    イベントの欠落を検出するクエリを示すスクリーンショット。

    ここでは、同じデータ ストリームに対して LEFT OUTER JOIN を使用しています (自己結合)。 INNER JOIN では、一致が見つかった場合にのみ結果が返されます。 これに対して、LEFT OUTER JOIN では、結合の左側のイベントに一致するデータがない場合、その右側の列がすべて NULL となった行が返されます。 この手法は、イベントの欠落を見つけるのに便利です。 詳細については、JOIN に関するページを参照してください。

まとめ

この記事の目的は、Stream Analytics クエリ言語を使ったさまざまなクエリを記述し、その結果をブラウザーで確認する方法を紹介することです。 ただし、この記事はほんの導入部に過ぎません。 Stream Analytics はさまざまな入出力に対応していることに加え、Azure Machine Learning の関数も利用できることから、データ ストリームを分析するうえで強力な手段となります。 クエリの作成方法の詳細については、一般的なクエリ パターンに関する記事を参照してください。