Azure portal を使用して Azure Data Factory パイプラインのアクティビティを分岐または連結する

適用対象: Azure Data Factory Azure Synapse Analytics

このチュートリアルでは、いくつかの制御フロー機能を紹介する Data Factory パイプラインを作成します。 このパイプラインでは、Azure Blob Storage 内のコンテナーから同じストレージ アカウント内の別のコンテナーへの単純なコピーを行います。 コピー アクティビティが成功した場合、成功したコピー操作の詳細 (書き込まれたデータの量など) がパイプラインによって成功電子メールで送信されます。 コピー アクティビティが失敗した場合、コピー失敗の詳細 (エラー メッセージなど) がパイプラインによって失敗電子メールで送信されます。 チュートリアル全体を通じて、パラメーターを渡す方法が示されます。

シナリオの概要:Diagram shows Azure Blob Storage, which is the target of a copy, which, on success, sends an email with details or, on failure, sends an email with error details.

このチュートリアルでは、以下の手順を実行します。

  • データ ファクトリを作成します。
  • Azure Storage のリンクされたサービスを作成します。
  • Azure BLOB データセットを作成します。
  • コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
  • アクティビティの出力を後続のアクティビティに送信します。
  • パラメーターの引き渡しとシステム変数を利用します。
  • パイプラインの実行を開始します。
  • パイプラインとアクティビティの実行を監視します。

このチュートリアルでは、Azure Portal を使用します。 その他のメカニズムを使用して Azure Data Factory を操作する場合は、目次から「クイック スタート」を参照してください。

前提条件

  • Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
  • Azure Storage アカウント。 BLOB ストレージをソース データ ストアとして使用します。 Azure ストレージ アカウントがない場合、ストレージ アカウントの作成手順については、「ストレージ アカウントの作成」を参照してください。
  • Azure SQL データベース。 データベースをシンク データ ストアとして使用します。 Azure SQL Database のデータベースがない場合の作成手順については、Azure SQL Database のデータベースの作成に関する記事を参照してください。

BLOB テーブルを作成する

  1. メモ帳を起動します。 次のテキストをコピーし、input.txt ファイルとしてディスクに保存します。

    John,Doe
    Jane,Doe
    
  2. Azure Storage Explorer などのツールを使用して、次の手順を実行します。

    1. adfv2branch コンテナーを作成します。
    2. adfv2branch コンテナーに input フォルダーを作成します。
    3. input.txt ファイルをコンテナーにアップロードします。

電子メール ワークフロー エンドポイントを作成する

パイプラインから電子メール送信をトリガーするには、Logic Apps を使用してワークフローを定義します。 ロジック アプリ ワークフローの作成の詳細については、ロジック アプリを作成する方法に関するページを参照してください。

成功電子メールのワークフロー

CopySuccessEmail という名前のロジック アプリ ワークフローを作成します。 ワークフロー トリガーを When an HTTP request is received として定義し、Office 365 Outlook – Send an email のアクションを追加します。

Success email workflow

要求トリガー用に、Request Body JSON Schema に次の JSON を入力します。

{
    "properties": {
        "dataFactoryName": {
            "type": "string"
        },
        "message": {
            "type": "string"
        },
        "pipelineName": {
            "type": "string"
        },
        "receiver": {
            "type": "string"
        }
    },
    "type": "object"
}

ロジック アプリ デザイナー内の要求は、次の図のようになります。

Logic App designer - request

[電子メールの送信] アクション用に、要求本文の JSON スキーマで渡されるプロパティを利用して、電子メールを書式設定する方法をカスタマイズします。 たとえば次のようになります。

Logic App designer - send email action

ワークフローを保存します。 成功電子メール ワークフローの HTTP Post 要求 URL をメモしておきます。

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

失敗電子メールのワークフロー

同じ手順に従って、CopyFailEmail という別の Logic Apps ワークフローを作成します。 要求トリガー内の Request Body JSON schema は同じです。 Subject のように電子メールの書式を変更し、失敗電子メールになるように調整します。 たとえば次のようになります。

Logic App designer - fail email workflow

ワークフローを保存します。 失敗電子メール ワークフローの HTTP Post 要求 URL をメモしておきます。

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

2 つのワークフローの URL を取得できました。

//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000

Data Factory の作成

  1. Web ブラウザー (Microsoft Edge または Google Chrome) を起動します。 現在、Data Factory の UI がサポートされる Web ブラウザーは Microsoft Edge と Google Chrome だけです。

  2. 左側のメニューで、 [リソースの作成]>[データ + 分析]>[Data Factory] の順に選択します。

    Data Factory selection in the "New" pane

  3. [新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。

    New data factory page

    Azure データ ファクトリの名前は グローバルに一意にする必要があります。 次のエラーが発生した場合は、データ ファクトリの名前を変更して (yournameADFTutorialDataFactory など) 作成し直してください。 Data Factory アーティファクトの名前付け規則については、Data Factory の名前付け規則に関する記事を参照してください。

    データ ファクトリ名 "ADFTutorialDataFactory" は利用できません。

  4. データ ファクトリを作成する Azure サブスクリプションを選択します。

  5. [リソース グループ] について、次の手順のいずれかを行います。

    • [Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。

    • [新規作成] を選択し、リソース グループの名前を入力します。

      リソース グループの詳細については、 リソース グループを使用した Azure のリソースの管理に関するページを参照してください。

  6. バージョンとして [V2] を選択します。

  7. データ ファクトリの 場所 を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。

  8. [ダッシュボードにピン留めする] をオンにします。

  9. Create をクリックしてください。

  10. ダッシュボードに、 [Deploying data factory](データ ファクトリをデプロイしています) というステータスを示したタイルが表示されます。

    deploying data factory tile

  11. 作成が完了すると、図に示されているような [Data Factory] ページが表示されます。

    Data factory home page

  12. [Author & Monitor](作成と監視) タイルをクリックして、別のタブで Azure Data Factory ユーザー インターフェイス (UI) を起動します。

パイプラインを作成する

この手順では、1 つのコピー アクティビティと 2 つの Web アクティビティが含まれたパイプラインを作成します。 このパイプラインの作成には次の機能を使用します。

  • データセットによってアクセスされるパイプラインのパラメーター。
  • 成功/失敗電子メールを送信するロジック アプリ ワークフローを呼び出す Web アクティビティ。
  • (成功および失敗に基づいた) あるアクティビティと別のアクティビティの接続
  • アクティビティからの出力を後続のアクティビティへの入力として使用する
  1. Data Factory UI のホーム ページで [調整] タイルをクリックします。

    Screenshot that shows the ADF home page.

  2. パイプラインのプロパティ ウィンドウで [パラメーター] タブに切り替え、 [新規] ボタンを使用して、種類が文字列の 3 つのパラメーター (sourceBlobContainer、sinkBlobContainer、receiver) を追加します。

    • sourceBlobContainer - ソース BLOB データセットによって使用されるパイプライン内のパラメーターです。
    • sinkBlobContainer – シンク BLOB データセットによって使用されるパイプライン内のパラメーターです。
    • receiver – このパラメーターは、このパラメーターによって電子メール アドレスが指定されている受信者に成功または失敗の電子メールを送信するパイプライン内の 2 つの Web アクティビティによって使用されます。

    New pipeline menu

  3. [アクティビティ] ツール ボックスで [データ フロー] を展開し、 [コピー] アクティビティをパイプライン デザイナー画面にドラッグ アンド ドロップします。

    Drag-drop copy activity

  4. 下部にある [コピー] アクティビティのプロパティ ウィンドウで、 [ソース] タブに切り替えて [+ 新規] をクリックします。 この手順でコピー アクティビティのソース データセットを作成します。

    Screenshot that shows how to create a source dataset for teh copy activity.

  5. [新しいデータセット] ウィンドウで [Azure BLOB ストレージ] を選択し、 [完了] をクリックします。

    Select Azure Blob Storage

  6. [AzureBlob1] という新しいタブが表示されます。 データセットの名前を「SourceBlobDataset」に変更します。

    Dataset general settings

  7. プロパティ ウィンドウにある [接続] タブに切り替えて、 [リンクされたサービス] で [新規] をクリックします。 この手順では、Azure ストレージ アカウントをデータ ファクトリにリンクするためのリンクされたサービスを作成します。

    Dataset connection - new linked service

  8. [New Linked Service](新しいリンクされたサービス) ウィンドウで、次の手順を行います。

    1. [名前] に「AzureStorageLinkedService」と入力します。
    2. [ストレージ アカウント名] で Azure ストレージ アカウントを選択します。
    3. [保存] をクリックします。

    New Azure Storage linked service

  9. フォルダーには「@pipeline().parameters.sourceBlobContainer」、ファイル名には「emp.txt」と入力します。 sourceBlobContainer パイプライン パラメーターを使用して、データセットのフォルダー パスを設定します。

    Source dataset settings

  10. パイプラインのタブに切り替えるか、またはツリービューにあるパイプラインをクリックします。 [Source Dataset](ソース データセット)SourceBlobDataset が選択されていることを確認します。

Source dataset

  1. プロパティ ウィンドウで [シンク] タブに切り替えて、 [Sink Dataset](シンク データセット)[+ 新規] をクリックします。 ソース データセットの作成と同様のこの手順では、コピー アクティビティのシンク データセットを作成します。

    New sink dataset button

  2. [新しいデータセット] ウィンドウで [Azure BLOB ストレージ] を選択し、 [完了] をクリックします。

  3. データセットの [General](一般) 設定ページで、 [名前] に「SinkBlobDataset」と入力します。

  4. [接続] タブに切り替えて、次の手順を実行します。

    1. [リンクされたサービス]AzureStorageLinkedService を選択します。

    2. フォルダーに「@pipeline().parameters.sinkBlobContainer」と入力します。

    3. ファイル名に「@CONCAT(pipeline().RunId, '.txt')」と入力します。 この式では、ファイル名に現在のパイプライン実行の ID を使用します。 サポートされているシステム変数および式の一覧については、システム変数に関するページと式言語に関するページを参照してください。

      Sink dataset settings

  5. 上部にあるパイプラインのタブに切り替えます。 [アクティビティ] ツール ボックスの [General](一般) を展開し、 [Web] アクティビティをパイプライン デザイナー画面にドラッグ アンド ドロップします。 アクティビティの名前を「SendSuccessEmailActivity」に設定します。 Web アクティビティでは、任意の REST エンドポイントを呼び出すことができます。 このアクティビティの詳細については、Web アクティビティに関する記事を参照してください。 このパイプラインでは、Web アクティビティを使用して Logic Apps 電子メール ワークフローを呼び出します。

    Drag-drop first Web activity

  6. [General](一般) タブから [設定] タブに切り替えて、次の手順を実行します。

    1. [URL] で、成功電子メールを送信するロジック アプリ ワークフローの URL を指定します。

    2. [メソッド][POST] を選択します。

    3. [ヘッダー] セクションにある + [ヘッダーの追加] リンクをクリックします。

    4. [コンテンツの種類] というヘッダーを追加し、それを application/json に設定します。

    5. [本文] に次の JSON を指定します。

      {
          "message": "@{activity('Copy1').output.dataWritten}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      メッセージの本文には次のプロパティが含まれます。

      • Message – @{activity('Copy1').output.dataWritten から渡される値。 前のコピー アクティビティのプロパティにアクセスし、dataWritten の値を渡します。 失敗の場合、@{activity('CopyBlobtoBlob').error.message の代わりにエラー出力を渡します。

      • Data Factory Name – @{pipeline().DataFactory} から渡される値。これはシステム変数であり、対応するデータ ファクトリ名へのアクセスを可能にします。 サポートされているシステム変数の一覧については、「システム変数」の記事を参照してください。

      • Pipeline Name – @{pipeline().Pipeline} から渡される値。 これもシステム変数であり、対応するパイプライン名へのアクセスを可能にします。

      • Receiver – "@pipeline().parameters.receiver") から渡される値。 パイプライン パラメーターにアクセスします。

        Settings for the first Web activity

  7. コピー アクティビティの横にある緑のボタンを Web アクティビティにドラッグ アンド ドロップして、 [コピー] アクティビティを [Web] アクティビティに接続します。

    Connect Copy activity with the first Web activity

  8. [アクティビティ] ツール ボックスからパイプライン デザイナー画面に別の [Web] アクティビティをドラッグ アンド ドロップし、 [名前] を「SendFailureEmailActivity」に設定します。

    Name of the second Web activity

  9. [設定] タブに切り替えて、次の手順を実行します。

    1. [URL] で、失敗電子メールを送信するロジック アプリ ワークフローの URL を指定します。

    2. [メソッド][POST] を選択します。

    3. [ヘッダー] セクションにある + [ヘッダーの追加] リンクをクリックします。

    4. [コンテンツの種類] というヘッダーを追加し、それを application/json に設定します。

    5. [本文] に次の JSON を指定します。

      {
          "message": "@{activity('Copy1').error.message}",
          "dataFactoryName": "@{pipeline().DataFactory}",
          "pipelineName": "@{pipeline().Pipeline}",
          "receiver": "@pipeline().parameters.receiver"
      }
      

      Settings for the second Web activity

  10. パイプライン デザイナーの [コピー] アクティビティを選択して +-> ボタンをクリックし、[エラー] を選択します。

    Screenshot that shows how to select Error on the Copy activity in the pipeline designer.

  11. コピー アクティビティの横にある赤いボタンを 2 番目の Web アクティビティ SendFailureEmailActivity にドラッグします。 パイプラインが次の図のように表示されるよう、アクティビティを移動できます。

    Full pipeline with all activities

  12. パイプラインを検証するために、ツール バーの [検証] ボタンをクリックします。 >> ボタンをクリックして [Pipeline Validation Output](パイプラインの検証の出力) ウィンドウを閉じます。

    Validate pipeline

  13. エンティティ (データセットやパイプラインなど) を Data Factory サービスに発行するには、 [すべて公開] を選択します。 [正常に発行されました] というメッセージが表示されるまで待機します。

    Publish

成功するパイプラインの実行をトリガーする

  1. パイプラインの実行をトリガーするために、ツール バーの [トリガー] をクリックし、 [Trigger Now](今すぐトリガー) をクリックします。

    Trigger a pipeline run

  2. [Pipeline Run](パイプラインの実行) ウィンドウで、次の手順を実行します。

    1. sourceBlobContainer パラメーターに「adftutorial/adfv2branch/input」を入力します。

    2. sinkBlobContainer パラメーターに「adftutorial/adfv2branch/output」を入力します。

    3. receiverメール アドレスを入力します。

    4. [完了] をクリックします。

      Pipeline run parameters

成功したパイプラインの実行を監視する

  1. パイプラインの実行を監視するには、左側の [監視] タブに切り替えます。 手動でトリガーしたパイプラインの実行が表示されます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。

    Successful pipeline run

  2. このパイプラインの実行に関連付けられているアクティビティの実行を表示するには、 [アクション] 列にある最初のリンクをクリックします。 上部の [パイプライン] をクリックすると、前のビューに戻ることができます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。

    Screenshot that shows how to view the list of activity runs.

失敗するパイプラインの実行をトリガーする

  1. 左側で [編集] タブに切り替えます。

  2. パイプラインの実行をトリガーするために、ツール バーの [トリガー] をクリックし、 [Trigger Now](今すぐトリガー) をクリックします。

  3. [Pipeline Run](パイプラインの実行) ウィンドウで、次の手順を実行します。

    1. sourceBlobContainer パラメーターに「adftutorial/dummy/input」を入力します。 ダミー フォルダーが adftutorial コンテナーに存在しないことを確認します。
    2. sinkBlobContainer パラメーターに「adftutorial/dummy/output」を入力します。
    3. receiverメール アドレスを入力します。
    4. [完了] をクリックします。

失敗したパイプラインの実行を監視する

  1. パイプラインの実行を監視するには、左側の [監視] タブに切り替えます。 手動でトリガーしたパイプラインの実行が表示されます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。

    Failure pipeline run

  2. エラーの詳細を表示するには、パイプラインの実行の [エラー] リンクをクリックします。

    Pipeline error

  3. このパイプラインの実行に関連付けられているアクティビティの実行を表示するには、 [アクション] 列にある最初のリンクをクリックします。 [最新の情報に更新] ボタンを使用して、一覧を更新します。 パイプラインのコピー アクティビティが失敗したことがわかります。 Web アクティビティによる指定された受信者への失敗電子メールの送信は成功しました。

    Activity runs

  4. [アクション] 列の [エラー] リンクをクリックして、エラーの詳細を表示します。

    Activity run error

次のステップ

このチュートリアルでは、以下の手順を実行しました。

  • データ ファクトリを作成します。
  • Azure Storage のリンクされたサービスを作成します。
  • Azure BLOB データセットを作成します。
  • コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
  • アクティビティの出力を後続のアクティビティに送信します。
  • パラメーターの引き渡しとシステム変数を利用します。
  • パイプラインの実行を開始します。
  • パイプラインとアクティビティの実行を監視します。

これで、「概念」セクションに進み、Azure Data Factory の詳細について学習できるようになりました。