クイックスタート: マッピング データ フローを使用してデータを変換する

このクイックスタートでは、Azure Synapse Analytics を使い、マッピング データ フローを使用して Azure Data Lake Storage Gen2 (ADLS Gen2) ソースから ADLS Gen2 シンクにデータを変換するパイプラインを作成します。 このクイックスタートの構成パターンは、マッピング データ フローを使用してデータを変換するときに拡張することができます。

このクイックスタートでは、次の手順を実行します。

  • Azure Synapse Analytics でデータ フロー アクティビティが含まれるパイプラインを作成します。
  • 4 つの変換を使用して、マッピング データ フローを構築します。
  • パイプラインをテスト実行します。
  • Data Flow アクティビティを監視します。

前提条件

  • Azure サブスクリプション:Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。

  • Azure Synapse ワークスペース:「クイックスタート:Synapse ワークスペースを作成する」の手順に従い、Azure portal を使用して Synapse ワークスペースを作成します。

  • Azure ストレージ アカウント:ADLS ストレージを、"ソース" と "シンク" のデータ ストアとして使用します。 ストレージ アカウントがない場合の作成手順については、Azure のストレージ アカウントの作成に関するページを参照してください。

    このチュートリアルで変換するファイルは MoviesDB です。このファイルは、こちらにあります。 GitHub からファイルを取得するには、コンテンツを任意のテキスト エディターにコピーして、.csv ファイルとしてローカルに保存します。 ファイルをご自分のストレージ アカウントにアップロードするには、Azure portal を使用した BLOB のアップロードに関するページを参照してください。 例では、'sample-data' という名前のコンテナーを参照しています。

Azure Synapse ワークスペースが作成された後、Synapse Studio を開く方法は 2 つあります。

  • Azure portal で Synapse ワークスペースを開きます。 [はじめに] の下の [Synapse Studio を開く] カードで、 [開く] を選択します。
  • Azure Synapse Analytics を開き、ワークスペースにサインインします。

このクイックスタートでは、例として "adftest2020" という名前のワークスペースを使用します。 自動的に Synapse Studio のホーム ページに移動します。

Synapse Studio home page

Data Flow アクティビティを含むパイプラインの作成

パイプラインには、一連のアクティビティを実行するための論理フローが含まれています。 このセクションでは、Data Flow アクティビティが含まれるパイプラインを作成します。

  1. [統合] タブに移動します。パイプライン ヘッダーの横にあるプラス符号のアイコンを選択し、[パイプライン] を選択します。

    Create a new pipeline

  2. パイプラインの [プロパティ] 設定ページで、名前として「TransformMovies」と入力します。

  3. [アクティビティ] ペインの [Move and Transform](移動と変換) で、 [データ フロー] をパイプライン キャンバス上にドラッグします。

  4. [Adding data flow](データ フローの追加) ページのポップアップで、[Create new data flow](新しいデータ フローの作成) ->[データ フロー] を選択します。 完了したら、 [OK] をクリックします。

    Create a data flow

  5. [プロパティ] ページで、データ フローに TransformMovies という名前を付けます。

データ フロー キャンバスでの変換ロジックの作成

Data Flow を作成すると、データ フロー キャンバスが自動的に表示されます。 この手順では、ADLS ストレージ内の MoviesDB.csv を取得し、1910 年から 2000 年までのコメディの平均評価を集計するデータ フローを作成します。 次に、このファイルを ADLS ストレージに書き戻します。

  1. データ フロー キャンバスの上にある [Data flow debug](データ フローのデバッグ) スライダーをオンにスライドします。 デバッグ モードを使用すると、ライブ Spark クラスターに対する変換ロジックの対話型テストが可能になります。 Data Flow クラスターのウォームアップには 5 から 7 分かかるため、ユーザーが Data Flow の開発を計画している場合は、最初にデバッグを有効にすることをお勧めします。 詳細については、デバッグ モードに関するページを参照してください。

    Slide the debug on

  2. データ フロー キャンバスで [Add Source](ソースの追加) ボックスをクリックして、ソースを追加します。

  3. ソースに MoviesDB という名前を付けます。 [新規] をクリックして、新しいソース データセットを作成します。

    Create a new source dataset

  4. [Azure Data Lake Storage Gen2] を選択します。 [続行] をクリックして続行します。

    Choose Azure Data Lake Storage Gen2

  5. [DelimitedText] を選択します。 [続行] をクリックして続行します。

  6. データセットに MoviesDB という名前を付けます。 リンクされたサービスのドロップダウンで、 [新規] を選択します。

  7. リンクされたサービスの作成画面で、ADLS Gen2 のリンクされたサービスに ADLSGen2 という名前を付けて、使用する認証方法を指定します。 次に、接続の資格情報を入力します。 このクイックスタートでは、アカウント キーを使用してストレージ アカウントに接続しています。 [テスト接続] をクリックすると、資格情報が正しく入力されたことを確認できます。 完了したら [作成] をクリックします。

    Create a source linked service

  8. データセットの作成画面に戻ったら、 [ファイル パス] フィールドに、ファイルが配置されている場所を入力します。 このクイックスタートでは、"MoviesDB.csv" ファイルはコンテナー "sample-data" に配置されています。 ファイルにはヘッダーが含まれているため、 [First row as header](最初の行をヘッダーにする) をオンにします。 ストレージ内のファイルからヘッダー スキーマを直接インポートするには、 [From connection/store](接続/ストアから) を選択します。 完了したら、 [OK] をクリックします。

    Source dataset settings

  9. デバッグ クラスターが起動している場合は、ソース変換の [Data Preview](データのプレビュー) タブに移動し、 [更新] をクリックして、データのスナップショットを取得します。 データ プレビューを使用すると、変換が正しく構成されていることを確認できます。

    Data preview

  10. データ フロー キャンバスでソース ノードの横にあるプラス アイコンをクリックして、新しい変換を追加します。 最初に追加する変換は、フィルターです。

    Add a filter

  11. フィルター変換に FilterYears という名前を付けます。 [フィルター適用] の横にある式ボックスをクリックして、式ビルダーを開きます。 ここでフィルター条件を指定します。

  12. データ フローの式ビルダーでは、さまざまな変換で使用する式を対話形式で作成できます。 式には、組み込み関数、入力スキーマの列、ユーザー定義のパラメーターを含めることができます。 式の作成方法の詳細については、Data Flow の式ビルダーに関するページを参照してください。

    このクイックスタートでは、1910 年から 2000 年の間に公開された、ジャンルがコメディの映画をフィルター処理します。 現在、年は文字列になっているため、toInteger() 関数を使用して整数に変換する必要があります。 以上演算子 (>=) と以下演算子 (<=) を使用して、年のリテラル値 1910 と 2000 に対する比較を行います。 これらの式を && (and) 演算子と結合します。 式は次のようになります。

    toInteger(year) >= 1910 && toInteger(year) <= 2000

    コメディ映画を見つけるには、rlike() 関数を使用して、ジャンル列でパターン 'Comedy' を検索します。 rlike 式を年の比較と結合すると、次の式が得られます。

    toInteger(year) >= 1910 && toInteger(year) <= 2000 && rlike(genres, 'Comedy')

    Specify filtering condition

    デバッグ クラスターがアクティブになっている場合は、 [更新] をクリックして使用された入力と比較した式の出力を表示して、ロジックを確認できます。 データ フローの式言語を使用してこのロジックを実現する方法に対する正解は複数あります。

    式の操作が完了したら、 [Save and Finish](保存して終了する) をクリックします。

  13. フィルターが正しく機能していることを確認するには、データ プレビューをフェッチします。

  14. 次に追加する変換は、 [Schema modifier](スキーマ修飾子) の下にある [集計] 変換です。

    Add an Aggregate

  15. 集計変換に AggregateComedyRatings という名前を付けます。 [グループ化] タブで、ドロップダウンから [year] を選択し、映画の公開年ごとに集計をグループ化します。

    Aggregate settings 1

  16. [集計] タブに移動します。左側のテキスト ボックスで、集計列に AverageComedyRating という名前を付けます。 式ビルダーを使用して集計式を入力するには、右側の式ボックスをクリックします。

    Aggregate settings 2

  17. [Rating] の平均値を取得するには、avg() 集計関数を使用します。 Rating は文字列で、avg() で受け入れられるのは数値入力なので、toInteger() 関数を使用して値を数値に変換する必要があります。 式は次のようになります。

    avg(toInteger(Rating))

    完了したら、 [Save and Finish](保存して終了する) をクリックします。

    Average comedy rating

  18. 変換出力を表示するには、 [Data Preview](データのプレビュー) タブに移動します。 yearAverageComedyRating の 2 つの列だけがあることに注目してください。

    Aggregate Data Preview

  19. 次に、 [Destination](変換先) の下で [シンク] 変換を追加します。

    Add a Sink

  20. シンクに Sink という名前を付けます。 [新規] をクリックして、シンク データセットを作成します。

  21. [Azure Data Lake Storage Gen2] を選択します。 [続行] をクリックして続行します。

  22. [DelimitedText] を選択します。 [続行] をクリックして続行します。

  23. シンク データセットに MoviesSink という名前を付けます。 リンクされたサービスの場合、手順 7 で作成した ADLS Gen2 のリンクされたサービスを選択します。 データの書き込み先となる出力フォルダーを入力します。 このクイックスタートでは、コンテナー "sample-data" 内のフォルダー "output" に書き込んでいます。 フォルダーは、事前に存在している必要はなく、動的に作成することができます。 [First row as header](最初の行をヘッダーにする) をオンに設定し、 [スキーマのインポート][なし] を選択します。 完了したら、 [OK] をクリックします。

    Sink dataset properties

これで、データ フローの構築が完了しました。 これをパイプラインで実行する準備ができました。

Data Flow を実行して監視する

パイプラインを発行する前にデバッグすることができます。 この手順では、データ フロー パイプラインのデバッグ実行をトリガーします。 データのプレビューではデータが書き込まれませんが、デバッグ実行によってシンクの変換先にデータが書き込まれます。

  1. パイプライン キャンバスに移動します。 [デバッグ] をクリックして、デバッグ実行をトリガーします。

    Debug pipeline

  2. Data Flow アクティビティのパイプライン デバッグでは、アクティブなデバッグ クラスターが使用されますが、それでも初期化には少なくとも 1 分かかります。 進行状況は [出力] タブで追跡することができます。実行が正常に完了したら、眼鏡のアイコンをクリックして [監視] ウィンドウを開きます。

    Debugging output

  3. [監視] ウィンドウには、各変換手順で使用した行数と所要時間が表示されます。

    Transformation monitoring

  4. 変換をクリックすると、データの列とパーティション分割に関する詳細情報が表示されます。

    Transformation details

このクイックスタートに正しく従った場合は、シンク フォルダーに 83 行と 2 列が書き込まれているはずです。 BLOB ストレージをチェックすることで、データを確認できます。

次のステップ

次の記事に進んで、Azure Synapse Analytics のサポートを確認します。