Stream Analytics ジョブを IoT ハブに接続する

これで IoT ハブが BLOB コンテナーに接続されたので、Stream Analytics ジョブを接続して入力を処理してみましょう。

  1. Azure portal の左側のナビゲーション メニューで [+ リソースの作成] を選択し、[モノのインターネット (IoT)][Stream Analytics ジョブ] の順に選択します。

    Stream Analytics ジョブの作成

  2. ジョブに "polar-bear-analytics" という名前を付けて、前に作成したリソース グループに配置します。

  3. 場所として米国中南部を指定します。 これは、IoT ハブに使ったのと同じ場所です。

    重要

    運用システムを構築するときは、場所の選択が重要です。 IoT ハブは Analytics ジョブと同じリージョンに配置します。 同じデータ センターの内部を移動するデータは課金されませんが、異なるデータ センターの "" を移動するデータは通常は "課金されます"。 さらに、相互に接続されているサービスを同じリージョン内に配置すると、待機時間が短縮されます。

  4. [ホスティング環境][クラウド] に設定されていることを確認します。

  5. [ストリーミング ユニット] を "1" に設定します。

  6. [作成] ボタンを選択してリソースを作成します。

    Stream Analytics ジョブのパラメーターの指定

  7. ポータルで polar-bear-analytics Stream Analytics ジョブを開きます。 リソース グループに Stream Analytics ジョブが表示されない場合は、ビューの上部にある [最新の情報に更新] ボタンを、表示されるまでクリックします。

  8. 左側のメニューで [入力] をクリックします。

  9. [ストリーム入力の追加] > [IoT Hub] を選択して、Stream Analytics ジョブに入力を追加します。

    入力の追加

  10. [入力のエイリアス] ボックスに「CameraInput」と入力します。

  11. 前に作成した IoT ハブを選択します。

  12. [エンドポイント][メッセージング] に設定されていることを確認し、それ以外のすべては既定値のままとします。

  13. ビューの下部にある [保存] をクリックします。

    入力の作成

しばらくすると、新しい入力 "CameraInput" が Stream Analytics ジョブの入力の一覧に表示されます。 ここで作成する入力はこれだけですが、Stream Analytics ジョブへの入力はいくつでも追加できます。

Stream Analytics クエリ言語では、リレーショナル データベースのテーブルと同様に、各入力は異なるデータ ソースとして扱われます。 クエリ言語は豊富な表現を持ち、データベース テーブルの結合と同じように、入力ストリームを結合することさえ可能です。

データを取得するための Stream Analytics クエリを設定する

Stream Analytics ジョブの核心部分は、データ ストリームから情報を抽出するクエリです。 ライブ データ ストリームに対してデプロイする前に、サンプル データを使ってクエリをテストすることをお勧めします。サンプル データであれば、既知の入力のセットで予想される出力が生成されることを確認できます。

これからテストするサンプル データは、資産の取得元と同じ GitHub リポジトリにあります。

  1. sample-data.json をお使いのローカル コンピューターにダウンロードします。 リンクを右クリックし、[名前を付けて保存] またはお使いのブラウザーで同等の機能を選択します。

    この JSON ファイルには、クエリのテストに使用できるいくつかのサンプル IoT イベントが含まれています。 ファイルからのイベントの例を次に示します。

    {
        "deviceId": "polar_cam_0003",
        "latitude": 74.996653,
        "longitude": -96.60178,
        "url": "https://streaminglabstorage.blob.core.windows.net/photos/image_09.jpg",
        "timestamp": "2017-12-22T19:00:18.000Z",
        "EventProcessedUtcTime": "2017-12-22T19:04:35.2124006Z",
        "PartitionId": 1,
        "EventEnqueuedUtcTime": "2017-12-22T19:00:17.6720000Z",
        "IoTHub": {
            "MessageId": null,
            "CorrelationId": null,
            "ConnectionDeviceId": "polar_cam_0003",
            "ConnectionDeviceGenerationId": "636494537606587154",
            "EnqueuedTime": "2017-12-22T19:00:17.0710000Z",
            "StreamId": null
        }
    }
    
  2. ポータルで Stream Analytics ジョブに戻り、ビューの左側のメニューで [クエリ] をクリックします。

  3. 下のパネルで [ファイルからサンプル データをアップロードする] を選択します。

    サンプル データのアップロード

  4. 右側のフォルダー アイコンを選択し、ダウンロードした sample-data.json ファイルを選択します。

  5. [OK] をクリックしてファイルをアップロードします。

    sample-data.json のアップロード

  6. アップロードが完了したら、クエリ ウィンドウに次のクエリを入力します。 次に、[Test query](クエリのテスト) ボタンをクリックしてそれを実行します。

    SELECT * FROM CameraInput
    

    クエリのテスト

  7. 次の図のような出力が表示されることを確認します。 テスト データには 50 行が含まれ、それぞれがカメラ配列内のカメラのいずれかによって IoT ハブに送信されるイベントを表しています。 DEVICEID はカメラのデバイス ID であり、LATITUDELONGITUDE はカメラの場所を指定し、URL は撮影された画像が格納されている BLOB の URL であり、TIMESTAMP は写真が撮影された日時です。 他のフィールドは Azure によって追加されました。

    クエリの結果

クエリを調整する

Stream Analytics クエリ言語の主な機能の 1 つは、ユーザーが長さを指定する "時間のウィンドウ" を使って結果をグループする機能です。

ウィンドウ化は、GROUP BY 句のクエリ キーワードを使って指定します。

ウィンドウ化キーワード 説明
TumblingWindow タンブリング ウィンドウ関数は、データ ストリームを個別の時間セグメントに分割して、関数を実行するときに使用します。 例: "タイム ゾーンごとに 10 秒間隔でツイート数を表示する"。 タンブリング ウィンドウの主な差別化要素は、重複せずに繰り返すことであり、1 つのイベントが複数のタンブリング ウィンドウに属することはできません。
HoppingWindow ホッピング ウィンドウ関数は、一定の期間だけ前に進みます。 重複できるタンブリング ウィンドウと考えると簡単で、イベントは複数のホッピング ウィンドウ結果セットに属することができます。 例: "5 秒ごとに、過去 10 秒間のツイート数を表示する"。
SlidingWindow タンブリング ウィンドウまたはホッピング ウィンドウとは異なり、スライディング ウィンドウ関数は、イベント発生時に出力のみを生成します。 すべてのウィンドウに少なくとも 1 つのイベントがあり、€ (イプシロン) ごとに継続的に前に移動します。 ホッピング ウィンドウと同様に、イベントは複数のスライディング ウィンドウに属することができます。 例: "過去 10 秒間に 10 回より多くツイートされたすべてのトピックのツイート数を表示する"。
SessionWindow セッション ウィンドウ関数は、類似した時刻に到着するイベントをグループ化することにより、データが存在しない期間をフィルターで除外します。 これには、タイムアウト、最大期間、パーティション分割キー (省略可能) の 3 つの主なパラメーターがあります。 例: "5 分以内に相互に発生したツイートの数を表示する"。
  1. 次のクエリを実行して、1 分ごとにカメラがトリガーされた回数をカウントします。

    SELECT System.Timestamp as [Time Ending],
        COUNT(*) AS [Times Triggered]
    FROM CameraInput TIMESTAMP BY timestamp
    GROUP BY TumblingWindow(n, 1)
    

    TIMESTAMP BY は、Stream Analytics クエリ言語の重要な要素です。 上のクエリからそれを省略した場合、カメラの場所で発生したイベントの数ではなく、1 分ごとに "イベント ハブに" 到着したイベントの数を照会することになります。 TIMESTAMP BY では、イベントの時刻として入力ストリームのフィールドを指定することができます。

  2. 次のような出力が表示されることを確認します。

    TumblingWindow を使用したクエリ結果

ライブ クエリを作成する

次に、同じカメラで 10 秒以内に撮影された 2 つの写真を確認します。 "これは、ライブ データ ストリームに対して使用するクエリです"。 前提として、シロクマは動きが比較的遅いので、10 秒より長い間隔で撮影された写真は無視しますが、同じカメラで 10 秒以内に 2 つの写真が撮影されている場合は、そのうちの 1 つにシロクマが移っているかどうかを調べる価値があります。

  1. 次のクエリを入力し、[テスト] をクリックして実行します。

    SELECT C1.deviceId, C1.latitude, C1.longitude, C1.url, C1.timestamp
      FROM CameraInput C1 TIMESTAMP BY timestamp
      JOIN CameraInput C2 TIMESTAMP BY timestamp
        ON C1.deviceId = C2.deviceId
            AND DATEDIFF(ss, C1, C2) BETWEEN 0 AND 10
            AND C1.timestamp != C2.timestamp
    
  2. 今度は出力に 6 行が含まれ、それぞれは同じカメラによって 10 秒以下の間隔で撮影された 2 枚の写真を表し、一方の写真の URL が含まれます。

    10 秒以内に 2 つの写真を撮影したカメラ

  3. ビューの上部にある [保存] ボタンをクリックしてクエリを保存し、終了します。

  4. 確認を求められたら、[はい] を選択します。 これは、Stream Analytics ジョブを実行したときに実行されるクエリです。

テストしたクエリでは単純なロジックが採用されています。同じカメラで 10 秒以内に 2 つの写真が撮影された場合、シロクマが "いる可能性があります"。 しかし、最終的な目標は、シロクマが実際に "いる" かどうかを高い信頼度で判定することです。 それは、機械学習で Stream Analytics を補うことを意味します。

自分の知識をチェックする

1.

Azure Stream Analytics のクエリでの "ウィンドウ化" の概念は何ですか?