タイムスタンプ (Azure Stream Analytics)

すべてのデータストリームイベントにはタイムスタンプが関連付けられています。 既定では、イベントハブと IoT Hub からのイベントは、イベントハブまたは IoT Hub によってイベントが受信された日時に基づいてタイムスタンプされます。Blob storage からのイベントは、blob の最終更新時刻によってタイムスタンプがあります。 ジョブを再起動または再実行した場合、イベントのタイムスタンプは変わりません。

多くのストリーミングアプリケーションでは、到着時間ではなく、イベントが発生した正確なタイムスタンプを使用する必要があります。 たとえば、販売時点のアプリケーションでは、支払いイベントがイベントインジェストサービスに到達したときではなく、支払いがログに記録された時刻に対応するイベントのタイムスタンプが必要になる場合があります。 さらに、地理的に分散されたシステムとネットワーク待ち時間は、予期しない到着時間に影響を与える可能性があります。これにより、ストリーミングアプリケーションでアプリケーション時間をより確実に使用できるようになります。 このような場合は、TIMESTAMP BY 句を使用して、カスタムタイムスタンプ値を指定できます。 値には、イベントペイロードまたは DATETIME 型の式の任意のフィールドを指定できます。 ISO 8601 形式のいずれかに準拠した文字列値   もサポートされています。

カスタムタイムスタンプ (TIMESTAMP BY 句) を使用すると、Azure Stream Analytics によって、次の2つの理由により、タイムスタンプに対して順不同のイベントが発生する可能性があることに注意してください。

  • 個々のイベントプロデューサーは、システムクロックが異なる場合があります。
  • 個々のイベントプロデューサーからのイベントは、たとえばプロデューサーのサイトでネットワークが利用できなくなるなど、転送中に遅延する場合があります。

イベントプロデューサー間の誤順序は大きくなる可能性がありますが、1つのプロデューサーからのイベント内の誤順序は一般に小さいものでも、存在しない場合もあります。 クエリが各イベントプロデューサーのデータのみを個別に処理する場合、独自のタイムラインで各プロデューサーからのイベントを処理する方が、プロデューサー間の時間のずれを管理するよりも効率的です。 Azure Stream Analytics は、 <over spec> サブ句を使用して独立したタイムラインでイベントの処理を有効にすることで、サブストリームをサポートします。 OVER 句の使用がジョブの処理に及ぼす影響については、「OVER 句がイベントの順序を操作する」を参照してください。

構文

TIMESTAMP BY scalar_expression [OVER <over spec> ]  
      
<over spec> ::= 
      { column_name | expression } [,...n ]  

解説

イベントのタイムスタンプの取得

イベントのタイムスタンプは、system.string () プロパティを使用して、クエリの任意の部分の SELECT ステートメントで取得できます。

OVER 句はイベントの順序とやり取りします。

OVER 句を使用すると、Azure Stream Analytics によるイベント処理のいくつかの側面が変更されます。

  1. 順序を指定しない最大許容範囲は、の単一の値のタプル内で適用され <over spec> ます。 つまり、イベントは、同じイベントプロデューサーからの他のイベントに対して順序が過剰に到着した場合にのみ、順序が不適切であると見なされます。

    たとえば、同じイベントプロデューサーからのイベントが常に並べ替えられ、即時処理が発生する場合、値 ' 0 ' を使用できます。 一方、ここで大きな値を使用すると、順序が逆のイベントがアセンブルされるのを待機している間に、処理の遅延が発生します。

  2. 到着遅延許容の最大値はグローバルに適用されます (OVER が使用されていない場合と同様)。 つまり、(TIMESTAMP BY 句で) 選択されたタイムスタンプが到着時刻から遠すぎている場合、イベントは遅延受信と見なされます。

    ここでは大きな値を使用しても処理の遅延が発生しないことに注意してください。イベントはすぐに処理されます (または、順序の許容範囲の上限に従って)。 数日間の値は、妥当ではありません。 ただし、非常に長い値を使用すると、ジョブを処理するために必要なメモリの量に影響を与える可能性があります。

  3. 各イベントプロデューサーの出力イベントは計算されるときに生成されます。これは、出力イベントの順序が逆順のタイムスタンプを持つ可能性があることを意味します。ただし、の1つの値のタプル内で順序付けられ <over spec> ます。

制限事項と制約事項

TIMESTAMP BY OVER 句の使用には次の制限があります。

  1. TIMESTAMP BY OVER 句は、クエリのすべての入力に使用するか、いずれの値にも使用しないようにしてください。

  2. TIMESTAMP BY OVER 句は、完全な並列ジョブまたは単一パーティションジョブでのみサポートされています。

  3. 入力ストリームに複数のパーティションがある場合は、OVER 句を PARTITION BY 句と一緒に使用する必要があります。 PartitionId 列は、TIMESTAMP BY 列の一部として指定する必要があります。

  4. TIMESTAMP BY OVER 句を使用する場合、句の列名は、GROUP BY ステートメントではグループ化キーとして、ストリーム間で結合するときはすべての結合述語で使用する必要があります。

  5. SELECT ステートメントまたは他のクエリ句で作成された列を TIMESTAMP BY 句で使用することはできません。入力ペイロードのフィールドを使用する必要があります。 たとえば、 クロス適用 の結果をタイムスタンプのターゲット値として使用することはできません。 ただし、クロス適用を実行する1つの Azure Stream Analytics ジョブを使用し、2番目のジョブを使用してによるタイムスタンプを実行できます。

  6. Timestamp () は、timestamp () の値を設定するものであるため、timestamp BY では使用できません。

例1–ペイロードからタイムスタンプフィールドにアクセスする

EntryTimeペイロードのフィールドをイベントのタイムスタンプとして使用する

SELECT  
      EntryTime,  
      LicensePlate,  
      State   
FROM input TIMESTAMP BY EntryTime  

例 2-イベントのタイムスタンプとしてペイロードから UNIX 時間を使用する

UNIX システムでは、通常、世界協定時刻 (UTC)、木曜日、1970年1月1日00:00:00 からの経過時間 (ミリ秒) として定義されている POSIX (またはエポック) の時間を使用します。

この例では、エポック時間を含む numeric ' epochtime ' フィールドをイベントタイムスタンプとして使用する方法を示します。

SELECT  
      System.Timestamp(),  
      LicensePlate,  
      State  
FROM input TIMESTAMP BY DATEADD(millisecond, epochtime, '1970-01-01T00:00:00Z')  

例3–異種タイムスタンプ

2種類のイベント ' A ' と ' B ' を含む異種データストリームを処理するとします。 イベント ' A ' には、フィールド ' タイムスタンプ A ' のタイムスタンプデータと、イベント ' B ' のタイムスタンプがフィールド ' タイムスタンプ B ' に含まれています。

この例では、によってタイムスタンプを記述し、両方の種類のイベント/タイムスタンプを処理できるようにする方法を示します。

SELECT  
      System.Timestamp(),  
      eventType,  
      eventValue,  
FROM input TIMESTAMP BY  
      (CASE eventType   
            WHEN 'A' THEN timestampA  
            WHEN 'B' THEN timestampB  
      ELSE NULL END) 

例4–パーティション分割されたクエリで複数のタイムラインを処理する

さまざまな料金所の Id に時間ポリシーを適用することなく、さまざまな送信者 (有料局) からのデータを処理します。 入力データは、TollId に基づいてパーティション分割されます。

SELECT
      TollId,
      COUNT(*) AS Count
FROM input
      TIMESTAMP BY EntryTime OVER TollId, PartitionId
      PARTITION BY PartitionId
GROUP BY TUMBLINGWINDOW(minute,3), TollId, PartitionId

参照

System.string ()
時間のずれポリシー
Azure Stream Analytics での時間の処理について
Unix 時間