Azure Stream Analytics - Delta Lake テーブルへの書き込み

Delta Lake は、データ レイクに信頼性、品質とパフォーマンスをもたらすオープンな形式です。 Azure Stream Analytics を使用すると、コードを 1 行も記述することなく、ストリーミング データを Delta Lake テーブルに直接書き込むことができます。

ストリーム分析ジョブは、ネイティブの Delta Lake 出力コネクタを介して、Azure Data Lake Storage Gen2 アカウント内の Delta テーブル (新規または事前に作成されたもの) に書き込むよう構成できます。 このコネクタは、追加モードでの Delta テーブルへの高速インジェスト用に最適化されており、厳密に 1 回だけのセマンティクスを提供するため、データが失われたり重複したりすることがなくなります。 Azure Event Hubs から Delta テーブルにリアルタイム データ ストリームをインジェストする場合、アドホックの対話型分析またはバッチ分析を実行できます。

Delta Lake の構成

Delta Lake にデータを書き込むには、Azure Data Lake Storage Gen2 アカウントに接続する必要があります。 次の表に、Delta Lake の構成に関連するプロパティを示します。

プロパティ名 説明
イベントのシリアル化の形式 出力データのシリアル化形式。 JSON、CSV、AVRO、Parquet がサポートされています。 Delta Lake は、ここにオプションとして一覧表示されます。 Delta Lake が選択されている場合、データは Parquet 形式になります。
Delta Path の名前 指定されたコンテナー内の Delta Lake テーブルを書き込むために使用されるパス。 これにはテーブル名が含まれます。 詳細については、次のセクションで説明します。
パーティション列 省略可能。 出力データからパーティションへの {field} 名。 サポートされているパーティション列は 1 つだけです。 列の値は文字列型である必要があります。

ADLS Gen2 構成の完全な一覧については、「ALDS Gen2 の概要」を参照してください。

Delta Path の名前

Delta Path Name は、Azure Data Lake Storage Gen2 に格納されている Delta Lake テーブルの場所と名前を指定するために使用されます。

1 つ以上のパス セグメントを使用して、Delta テーブルへのパスと Delta テーブル名を定義するよう選択できます。 パスのセグメントは、仮想ディレクトリの名前に対応した連続する区切り記号文字 (スラッシュ "/" など) の間の文字列です。

このセグメント名は英数字であり、スペース、ハイフン、およびアンダースコアを含めることができます。 最後のパス セグメントがテーブル名として使用されます。

Delta Path の名前の制限には、次のものがあります。

  • フィールド名は大文字小文字が区別されません。 たとえば、このサービスでは列 IDid を区別できません。
  • 動的 {field} 名は使用できません。 たとえば、{ID} はテキスト {ID} として扱われます。
  • 名前を構成するパスのセグメントの数が 254 個を超えないようにしてください。

Delta Path の名前の例は次のようになります。

  • 例 1: WestUS/CA/factory1/device-table
  • 例 2: Test/demo
  • 例 3: mytable

出力ファイル例:

  1. 選択したコンテナーの下のディレクトリ パスは WestEurope/CA/factory1、Delta テーブル フォルダー名は device-table になります。
  2. 選択したコンテナーの下のディレクトリ パスは Test、Delta テーブル フォルダー名は demo になります。
  3. 選択したコンテナーの下の Delta テーブル フォルダー名は mytable になります。

新しいテーブルの作成

Delta Path 名で指定された場所に同じ名前の Delta Lake テーブルがまだ存在していない場合、既定では、Azure Stream Analytics は新しい Delta テーブルを作成します。 この新しいテーブルは以下の構成で作成されます。

テーブルへの書き込み

Delta Path 名で指定された場所に同じ名前の Delta Lake テーブルが既に存在する場合、既定では、Azure Stream Analytics は既存のテーブルに新しいレコードを書き込みます。

厳密に 1 回の配信

トランザクション ログを使用すると、Delta Lake では厳密に 1 回の処理が保証されます。 また、Azure Stream Analytics では、1 回のジョブ実行中にデータを Azure Data Lake Storage Gen2 に出力するときに、厳密に 1 回の配信が行われます。

スキーマの適用

スキーマの適用とは、データ品質を確保するために、テーブルへの新しい書き込みはすべて、書き込み時にターゲット テーブルのスキーマと互換性があるように強制されることを意味します。

出力データのすべてのレコードは、既存のテーブルのスキーマに投影されます。 出力が新しい Delta テーブルに書き込まれている場合、テーブル スキーマは最初のレコードで作成されます。 受信データに既存のテーブル スキーマと比較して余分な列が 1 つある場合は、余分の列を含めずにテーブルに書き込まれます。 受信データに既存のテーブル スキーマと比較して列が 1 つない場合は、その列を null にしてテーブルに書き込まれます。

Delta テーブルのスキーマとストリーミング ジョブのレコードのスキーマの間に交差がない場合、それはスキーマ変換エラーの一例とみなされます。 スキーマ変換エラーとみなされるケースはこれだけではありません。

スキーマ変換が失敗する場合、ジョブの動作は、ジョブ レベルで構成された出力データ エラーの処理ポリシーに従います。

Delta ログのチェックポイント

Stream Analytics ジョブは、Delta Log チェックポイントを V1 形式で定期的に作成します。 Delta Log チェックポイントは Delta テーブルのスナップショットであり、通常は Stream Analytics ジョブによって生成されたデータ ファイルの名前が含まれます。 データ ファイルの数が多い場合は、チェックポイントが大きくなり、Stream Analytics ジョブでメモリの問題が発生する可能性があります。

制限事項

  • 動的パーティション キー (Delta Path でのレコード スキーマの列名の指定) はサポートされていません。
  • 複数のパーティション列はサポートされていません。 複数のパーティション列が必要な場合は、クエリで複合キーを使用し、それをパーティション列として指定することをお勧めします。
    • 複合キーは、たとえば、"[入力] から [blobOutput] に compositeColumn として [concat (col1, col2)] を選択" クエリで作成できます。
  • Delta Lake への書き込みは追加のみです。
  • クエリ テストのスキーマ チェックは使用できません。
  • Stream Analytics では、小さなファイルの圧縮は行われません。
  • すべてのデータ ファイルは圧縮されずに作成されます。
  • Date 型と Decimal 型はサポートされていません。
  • ライター バージョン 7 以上のライター機能を持つ既存のテーブルへの書き込みは失敗します。
  • Stream Analytics ジョブでデータのバッチをデータ レイクに書き込むときに、複数のファイル追加アクションを生成できます。 1 つのバッチに対して生成されたファイルの追加アクションが多すぎると、Stream Analytics ジョブがスタックする可能性があります。
    • 生成されるファイル追加アクションの数は、多くの要因によって決まります。
    • バッチに対して生成されるファイル追加アクションの数を減らすために、次の手順を実行できます。
  • Stream Analytics ジョブでは、シングル パート V1 チェックポイントの読み取りと書き込みのみが可能です。 マルチパート チェックポイントとチェックポイント V2 形式はサポートされていません。

次のステップ