Azure Stream Analytics からの BLOB ストレージと Azure Data Lake Gen2 出力

Data Lake Storage Gen2 によって、Azure Storage は、Azure 上にエンタープライズ データ レイクを構築するための基盤となります。 Data Lake Storage Gen2 は、当初から、何百ものギガビット単位のスループットを維持しつつ、複数のペタバイト単位の情報を利用可能にする目的で設計されているため、大量のデータを簡単に管理することができます。 Data Lake Storage Gen2 の基礎部分は、BLOB ストレージに階層型名前空間を追加したものです。

Azure Blob Storage を使用すると、大量の非構造化データをクラウドに保存する場合に、コスト効果の高いスケーラブルなソリューションを実現できます。 BLOB ストレージとその使用法の概要については、Azure Portal を使用した BLOB のアップロード、ダウンロード、および一覧表示に関するページを参照してください。

注意

AVRO 形式と Parquet 形式に固有の動作については、概要ページの関連セクションをご覧ください。

出力の構成

次の表に、BLOB または Azure Data Lake Storage Gen2 出力を作成するためのプロパティの名前とその説明を示します。

プロパティ名 説明
出力エイリアス クエリの出力をこの BLOB ストレージに出力するためにクエリで使用されるわかりやすい名前です。
ストレージ アカウント ご自分の出力を送信するストレージ アカウントの名前です。
ストレージ アカウント キー ストレージ アカウントに関連付けられている秘密キー。
コンテナー Azure Blob service に格納される BLOB の論理グループです。 BLOB を Blob service にアップロードするとき、その BLOB のコンテナーを指定する必要があります。

動的コンテナー名は省略可能です。 コンテナー名でサポートされる動的 {field} は 1 つだけです。 このフィールドは出力データに存在し、コンテナー名ポリシーに従っている必要があります。

フィールドのデータ型は string である必要があります。 CONCAT や LTRIM などの組み込みの文字列関数を使用してクエリ内で定義することにより、複数の動的フィールドを使用したり、静的テキストと動的フィールドを組み合わせたりすることができます。
イベントのシリアル化の形式 出力データのシリアル化形式。 JSON、CSV、Avro、Parquet がサポートされています。 Delta Lake は、ここにオプションとして一覧表示されます。 Delta Lake が選択されている場合、データは Parquet 形式になります。 Delta lake の詳細情報
Delta のパス名 イベントのシリアル化形式が Delta Lake の場合に必要です。 指定されたコンテナー内の Delta Lake テーブルを書き込むために使用されるパス。 これにはテーブル名が含まれます。 詳細と例。
書き込みモード 書き込みモードは、Azure Stream Analytics で出力ファイルに書き込む方法を制御します。 1 回だけの配信は、書き込みモードが [1 回] になっている場合にのみ行われます。 詳細については、次のセクションを参照してください。
パーティション列 省略可能。 出力データからパーティションへの {field} 名。 サポートされているパーティション列は 1 つだけです。
パスのパターン イベントのシリアル化形式が Delta Lake の場合に必要です。 指定したコンテナー内のご自分の BLOB を書き込むために使用されるファイル パス パターンです。

パス パターンでは、日付と時刻の変数 ({date}、{time}) のインスタンスを 1 つ以上使用して、BLOB が書き込まれる頻度を指定できます。

書き込みモードが [1 回] の場合、{date} と {time} の両方を使用する必要があります。

カスタム BLOB パーティション分割を使用して、イベント データの 1 つのカスタム {field} 名を指定することで、BLOB をパーティション分割できます。 このフィールド名は英数字であり、スペース、ハイフン、およびアンダースコアを含めることができます。 カスタム フィールドには、次の制限があります。
  • 書き込みモードが [1 回] になっている場合、動的カスタム {field} 名は使用できません。
  • フィールド名は大文字小文字が区別されません。 たとえば、このサービスでは列 ID と列 id を区別できません。
  • 入れ子になったフィールドは使用できません。 代わりに、ジョブ クエリ内で別名を使用して、フィールドを "フラット化" します。
  • 式はフィールド名として使用できません。

この機能により、パスでカスタム日付/時刻書式指定子の構成を使用できるようになります。 カスタム日時書式は、一度に 1 つを {datetime:<specifier>} キーワードで囲んで指定する必要があります。 <specifier> の使用可能な入力は、yyyy、MM、M、dd、d、HH、H、mm、m、ss、または s です。 {datetime:<specifier>} キーワードは、カスタムの日付/時刻の構成を形成するために、パス内で複数回使用できます。

例:
  • 例 1: cluster1/logs/{date}/{time}
  • 例 2: cluster1/logs/{date}
  • 例 3: cluster1/{client_id}/{date}/{time}
  • 例 4: cluster1/{datetime:ss}/{myField} (この場合、クエリは SELECT data.myField AS myField FROM Input;)
  • 例 5: cluster1/year={datetime:yyyy}/month={datetime:MM}/day={datetime:dd}

作成されたフォルダー構造のタイム スタンプは、ローカル時間ではなく、UTC に従います。 System.Timestamp は、時間ベースのパーティション分割すべてに使用される時間です。

ファイルの名前付けでは、次の規則を使用します。

{Path Prefix Pattern}/schemaHashcode_Guid_Number.extension

ここで、GUID は、BLOB ファイルに書き込むために作成された内部ライターに割り当てられた一意識別子を表します。 この数値は、BLOB ブロックのインデックスを表します。

出力ファイル例:
  • Myoutput/20170901/00/45434_gguid_1.csv
  • Myoutput/20170901/01/45434_gguid_1.csv

この機能の詳細については、「Azure Stream Analytics でのカスタム BLOB 出力のパーティション分割」を参照してください。
日付の形式 イベントのシリアル化形式が Delta Lake の場合に必要です。 日付トークンがプレフィックス パスで使用されている場合、ファイルを編成する日付形式を選択できます。 例:YYYY/MM/DD
時刻の形式 イベントのシリアル化形式が Delta Lake の場合に必要です。 時刻トークンがプレフィックス パスで使用されている場合、ファイルを編成する時刻形式を指定します。
最小行数 バッチあたりの最小行数。 Parquet の場合、すべてのバッチによって新しいファイルが作成されます。 現在の既定値は 2,000 行であり、最大許容値は 10,000 行です。
最大時間 バッチあたりの最大待機時間。 この時間が経過すると、最小行数の要件が満たされていなくても、バッチは出力に書き込まれます。 現在の既定値は 1 分であり、最大許容値は 2 時間です。 BLOB 出力にパス パターンの頻度がある場合は、待機時間をパーティションの時間の範囲より長くすることはできません。
エンコード CSV または JSON 形式を使用する場合は、エンコードを指定する必要があります。 現時点でサポートされているエンコード形式は UTF-8 だけです。
区切り記号 CSV のシリアル化のみに適用されます。 Stream Analytics では、CSV データをシリアル化するために、多数の一般的な区切り記号がサポートされています。 サポートしている値は、コンマ、セミコロン、スペース、タブ、および縦棒です。
Format JSON のシリアル化のみに適用されます。 [改行区切り] を指定すると、各 JSON オブジェクトを改行で区切って、出力が書式設定されます。 [改行区切り] を選択した場合、JSON は一度に 1 オブジェクトずつ読み取られます。 コンテンツ全体は、それ自体では有効な JSON になりません。 [配列] を指定すると、JSON オブジェクトの配列として出力が書式設定されます。 この配列が閉じられるのは、ジョブが停止したとき、または Stream Analytics が次の時間枠に移動したときだけです。 一般に、改行区切りの JSON を使うことが推奨されます。そうすれば、出力ファイルがまだ書き込まれている間に、特別な処理は必要ありません。

厳密に 1 回の配信 (パブリック プレビュー)

ストリーミング入力を読み取るときにエンド ツー エンドで 1 回だけ配信されるということは、処理されたデータが重複することなく Azure Data Lake Storage Gen2 出力に 1 回だけ書き込まれることを意味します。 この機能が有効になっている場合、Stream Analytics ジョブにより、前回の出力時間からユーザーが開始した再起動後も、データ損失や重複が出力として生成されないことが保証されます。 これにより、重複除去ロジックの実装やトラブルシューティングを行う必要がないため、ストリーミング パイプラインが大幅に簡素化されます。

書き込みモード

Stream Analytics による Blob Storage または ADLS Gen2 アカウントへの書き込み方法は 2 通りあります。 1 つは、結果が入ってくるのと同じファイルまたは一連のファイルに結果を追加する方法です。 もう 1 つは、時間パーティションのすべてのデータが使用可能な場合に、時間パーティションのすべての結果の後に書き込む方法です。 1 回だけの配信は、書き込みモードが [1 回] になっている場合に有効になります。

Delta Lake の書き込みモード オプションはありません。 ただし、Delta Lake の出力では、差分ログを使用して 1 回だけの保証も提供されます。 時間パーティションは必要なく、ユーザーが定義したバッチ処理パラメーターに基づいて継続的に結果が書き込まれます。

注意

厳密に 1 回の配信のためにプレビュー機能を使用しない場合は、[結果が到着したときにアペンドする] を選択してください。

構成

Blob Storage または ADLS Gen2 アカウントへの 1 回限りの配信を受信するには、次の設定を構成する必要があります。

  • [書き込みモード][時間パーティションのすべての結果が利用可能になった後に 1 回] を選びます。
  • {date} と {time} の両方を指定して [パス パターン] を指定します。
  • 日付の形式時刻の形式を指定します。

制限事項

  • サブストリームはサポートされていません。
  • パス パターンは必須プロパティになり、{date} と {time} の両方を含める必要があります。 動的カスタム {field} 名は使用できません。 カスタム パス パターンの詳細情報。
  • 最後の出力時間の前または後のカスタム時間にジョブが開始された場合、ファイルが上書きされるリスクがあります。 たとえば、時間形式が HH の場合、ファイルは 1 時間ごとに生成されます。 ジョブを午前 8 時 15 分に停止し、そのジョブを午前 8 時 30 分に再開した場合、午前 8 時から午前 9 時の間に生成されたファイルには、午前 8 時 30 分から午前 9 時までのデータのみが含まれます。 午前 8 時から午前 8 時 15 分までのデータは上書きされるため、失われます。

BLOB 出力ファイル

出力として Blob Storage を使用しているときに、BLOB に新しいファイルが作成されるのは、次の場合です。

  • ファイルが許可されるブロックの最大数 (現在は 50,000) を超えている場合。 BLOB の最大許容サイズに到達することなく、ブロックの最大許容数に到達する場合があります。 たとえば、出力レートが高い場合は、表示されるブロックあたりのバイト数が増え、ファイルのサイズが増加します。 出力レートが低い場合は、各ブロックのデータが少なくなり、ファイルのサイズも小さくなります。
  • 出力のスキーマが変更されている場合、および出力形式で固定スキーマが必要な場合 (CSV、Avro、Parquet)。
  • ジョブを停止し起動するユーザーが外部からジョブを再起動した場合、またはシステム メンテナンスやエラーの復旧のために内部的にジョブを再起動した場合。
  • クエリが完全にパーティション分割されており、新しいファイルが出力パーティションごとに作成される場合。 これは、PARTITION BY を使用するか、互換性レベル 1.2 で導入されたネイティブ並列処理を使用することによります
  • ストレージ アカウントのファイルまたはコンテナーがユーザーによって削除された場合。
  • パス プレフィックス パターンを使用して時間で出力をパーティション分割する場合、クエリが次の時間に移動すると、新しい BLOB が使用されます。
  • カスタム フィールドによって出力をパーティション分割する場合、BLOB が存在しないと、パーティション キーごとに新しい BLOB が作成されます。
  • パーティション キーのカーディナリティが 8,000 を超えるカスタム フィールドによって出力をパーティション分割する場合、パーティション キーごとに新しい BLOB が作成されます。

パーティション分割

パーティション キーについては、パス パターンにご自分のイベント フィールドからの {date} および {time} トークンを使用します。 YYYY/MM/DD、DD/MM/YYYY、MM-DD-YYYY などの日付形式を選択します。 時間形式には HH を使用します。 BLOB 出力を 1 つのカスタム イベント属性 {fieldname} または {datetime:<specifier>} でパーティション分割できます。 出力ライターの数は、完全並列化可能なクエリに対する入力のパーティション分割に従います。

出力バッチ サイズ

メッセージの最大サイズについては、Azure Storage の制限に関する記事を参照してください。 BLOB ブロックの最大サイズは 4 MB で、BLOB の最大ブロック数は 50,000 です。

制限事項

  • パス パターンでスラッシュ記号 / が使用されている場合 (たとえば /folder2/folder3)、空のフォルダーが作成されますが、Storage Explorer には表示されません
  • 新しい BLOB ファイルが不要な場合は、Azure Stream Analytics によって同じファイルに追加されます。 Event Grid などの Azure サービスが、BLOB ファイルの更新でトリガーされるように構成されている場合、これによって追加のトリガーが生成される可能性があります。
  • 既定では、Azure Stream Analytics によって BLOB に追加されます。 出力形式が JSON 配列の場合は、シャットダウン時か、時間でパーティション分割された出力の次の時間パーティションに出力が移動されたときに、ファイルが完了します。 不完全な再起動など、場合によっては、JSON 配列の右 "]" が欠落している可能性があります。

次のステップ