Azure portal を使用して Azure Data Factory パイプラインのアクティビティを分岐または連結する
適用対象: Azure Data Factory
Azure Synapse Analytics
このチュートリアルでは、いくつかの制御フロー機能を紹介する Data Factory パイプラインを作成します。 このパイプラインでは、Azure Blob Storage 内のコンテナーから同じストレージ アカウント内の別のコンテナーへの単純なコピーを行います。 コピー アクティビティが成功した場合、成功したコピー操作の詳細 (書き込まれたデータの量など) がパイプラインによって成功電子メールで送信されます。 コピー アクティビティが失敗した場合、コピー失敗の詳細 (エラー メッセージなど) がパイプラインによって失敗電子メールで送信されます。 チュートリアル全体を通じて、パラメーターを渡す方法が示されます。
シナリオの概要:
このチュートリアルでは、以下の手順を実行します。
- データ ファクトリを作成します。
- Azure Storage のリンクされたサービスを作成します。
- Azure BLOB データセットを作成します。
- コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
- アクティビティの出力を後続のアクティビティに送信します。
- パラメーターの引き渡しとシステム変数を利用します。
- パイプラインの実行を開始します。
- パイプラインとアクティビティの実行を監視します。
このチュートリアルでは、Azure Portal を使用します。 その他のメカニズムを使用して Azure Data Factory を操作する場合は、目次から「クイック スタート」を参照してください。
前提条件
- Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。
- Azure Storage アカウント。 BLOB ストレージをソース データ ストアとして使用します。 Azure ストレージ アカウントがない場合、ストレージ アカウントの作成手順については、「ストレージ アカウントの作成」を参照してください。
- Azure SQL データベース。 データベースをシンク データ ストアとして使用します。 Azure SQL Database のデータベースがない場合の作成手順については、Azure SQL Database のデータベースの作成に関する記事を参照してください。
BLOB テーブルを作成する
メモ帳を起動します。 次のテキストをコピーし、input.txt ファイルとしてディスクに保存します。
John,Doe Jane,Doe
Azure Storage Explorer などのツールを使用して、次の手順を実行します。
- adfv2branch コンテナーを作成します。
- adfv2branch コンテナーに input フォルダーを作成します。
- input.txt ファイルをコンテナーにアップロードします。
電子メール ワークフロー エンドポイントを作成する
パイプラインから電子メール送信をトリガーするには、Logic Apps を使用してワークフローを定義します。 ロジック アプリ ワークフローの作成の詳細については、ロジック アプリを作成する方法に関するページを参照してください。
成功電子メールのワークフロー
CopySuccessEmail
という名前のロジック アプリ ワークフローを作成します。 ワークフロー トリガーを When an HTTP request is received
として定義し、Office 365 Outlook – Send an email
のアクションを追加します。
要求トリガー用に、Request Body JSON Schema
に次の JSON を入力します。
{
"properties": {
"dataFactoryName": {
"type": "string"
},
"message": {
"type": "string"
},
"pipelineName": {
"type": "string"
},
"receiver": {
"type": "string"
}
},
"type": "object"
}
ロジック アプリ デザイナー内の要求は、次の図のようになります。
[電子メールの送信] アクション用に、要求本文の JSON スキーマで渡されるプロパティを利用して、電子メールを書式設定する方法をカスタマイズします。 たとえば次のようになります。
ワークフローを保存します。 成功電子メール ワークフローの HTTP Post 要求 URL をメモしておきます。
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
失敗電子メールのワークフロー
同じ手順に従って、CopyFailEmail という別の Logic Apps ワークフローを作成します。 要求トリガー内の Request Body JSON schema
は同じです。 Subject
のように電子メールの書式を変更し、失敗電子メールになるように調整します。 たとえば次のようになります。
ワークフローを保存します。 失敗電子メール ワークフローの HTTP Post 要求 URL をメモしておきます。
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
2 つのワークフローの URL を取得できました。
//Success Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
//Fail Request Url
https://prodxxx.eastus.logic.azure.com:443/workflows/000000/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=000000
Data Factory の作成
Web ブラウザー (Microsoft Edge または Google Chrome) を起動します。 現在、Data Factory の UI がサポートされる Web ブラウザーは Microsoft Edge と Google Chrome だけです。
左側のメニューで、 [リソースの作成]>[データ + 分析]>[Data Factory] の順に選択します。
[新しいデータ ファクトリ] ページで、 [名前] に「ADFTutorialDataFactory」と入力します。
Azure データ ファクトリの名前は グローバルに一意にする必要があります。 次のエラーが発生した場合は、データ ファクトリの名前を変更して (yournameADFTutorialDataFactory など) 作成し直してください。 Data Factory アーティファクトの名前付け規則については、Data Factory の名前付け規則に関する記事を参照してください。
データ ファクトリ名 "ADFTutorialDataFactory" は利用できません。
データ ファクトリを作成する Azure サブスクリプションを選択します。
[リソース グループ] について、次の手順のいずれかを行います。
[Use existing (既存のものを使用)] を選択し、ドロップダウン リストから既存のリソース グループを選択します。
[新規作成] を選択し、リソース グループの名前を入力します。
リソース グループの詳細については、 リソース グループを使用した Azure のリソースの管理に関するページを参照してください。
バージョンとして [V2] を選択します。
データ ファクトリの 場所 を選択します。 サポートされている場所のみがドロップダウン リストに表示されます。 データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。
[ダッシュボードにピン留めする] をオンにします。
Create をクリックしてください。
ダッシュボードに、 [Deploying data factory](データ ファクトリをデプロイしています) というステータスを示したタイルが表示されます。
作成が完了すると、図に示されているような [Data Factory] ページが表示されます。
[Author & Monitor](作成と監視) タイルをクリックして、別のタブで Azure Data Factory ユーザー インターフェイス (UI) を起動します。
パイプラインを作成する
この手順では、1 つのコピー アクティビティと 2 つの Web アクティビティが含まれたパイプラインを作成します。 このパイプラインの作成には次の機能を使用します。
- データセットによってアクセスされるパイプラインのパラメーター。
- 成功/失敗電子メールを送信するロジック アプリ ワークフローを呼び出す Web アクティビティ。
- (成功および失敗に基づいた) あるアクティビティと別のアクティビティの接続
- アクティビティからの出力を後続のアクティビティへの入力として使用する
Data Factory UI のホーム ページで [調整] タイルをクリックします。
パイプラインのプロパティ ウィンドウで [パラメーター] タブに切り替え、 [新規] ボタンを使用して、種類が文字列の 3 つのパラメーター (sourceBlobContainer、sinkBlobContainer、receiver) を追加します。
- sourceBlobContainer - ソース BLOB データセットによって使用されるパイプライン内のパラメーターです。
- sinkBlobContainer – シンク BLOB データセットによって使用されるパイプライン内のパラメーターです。
- receiver – このパラメーターは、このパラメーターによって電子メール アドレスが指定されている受信者に成功または失敗の電子メールを送信するパイプライン内の 2 つの Web アクティビティによって使用されます。
[アクティビティ] ツール ボックスで [データ フロー] を展開し、 [コピー] アクティビティをパイプライン デザイナー画面にドラッグ アンド ドロップします。
下部にある [コピー] アクティビティのプロパティ ウィンドウで、 [ソース] タブに切り替えて [+ 新規] をクリックします。 この手順でコピー アクティビティのソース データセットを作成します。
[新しいデータセット] ウィンドウで [Azure BLOB ストレージ] を選択し、 [完了] をクリックします。
[AzureBlob1] という新しいタブが表示されます。 データセットの名前を「SourceBlobDataset」に変更します。
プロパティ ウィンドウにある [接続] タブに切り替えて、 [リンクされたサービス] で [新規] をクリックします。 この手順では、Azure ストレージ アカウントをデータ ファクトリにリンクするためのリンクされたサービスを作成します。
[New Linked Service](新しいリンクされたサービス) ウィンドウで、次の手順を行います。
- [名前] に「AzureStorageLinkedService」と入力します。
- [ストレージ アカウント名] で Azure ストレージ アカウントを選択します。
- [保存] をクリックします。
フォルダーには「
@pipeline().parameters.sourceBlobContainer
」、ファイル名には「emp.txt
」と入力します。 sourceBlobContainer パイプライン パラメーターを使用して、データセットのフォルダー パスを設定します。パイプラインのタブに切り替えるか、またはツリービューにあるパイプラインをクリックします。 [Source Dataset](ソース データセット) で SourceBlobDataset が選択されていることを確認します。
プロパティ ウィンドウで [シンク] タブに切り替えて、 [Sink Dataset](シンク データセット) の [+ 新規] をクリックします。 ソース データセットの作成と同様のこの手順では、コピー アクティビティのシンク データセットを作成します。
[新しいデータセット] ウィンドウで [Azure BLOB ストレージ] を選択し、 [完了] をクリックします。
データセットの [General](一般) 設定ページで、 [名前] に「SinkBlobDataset」と入力します。
[接続] タブに切り替えて、次の手順を実行します。
上部にあるパイプラインのタブに切り替えます。 [アクティビティ] ツール ボックスの [General](一般) を展開し、 [Web] アクティビティをパイプライン デザイナー画面にドラッグ アンド ドロップします。 アクティビティの名前を「SendSuccessEmailActivity」に設定します。 Web アクティビティでは、任意の REST エンドポイントを呼び出すことができます。 このアクティビティの詳細については、Web アクティビティに関する記事を参照してください。 このパイプラインでは、Web アクティビティを使用して Logic Apps 電子メール ワークフローを呼び出します。
[General](一般) タブから [設定] タブに切り替えて、次の手順を実行します。
[URL] で、成功電子メールを送信するロジック アプリ ワークフローの URL を指定します。
[メソッド] に [POST] を選択します。
[ヘッダー] セクションにある + [ヘッダーの追加] リンクをクリックします。
[コンテンツの種類] というヘッダーを追加し、それを application/json に設定します。
[本文] に次の JSON を指定します。
{ "message": "@{activity('Copy1').output.dataWritten}", "dataFactoryName": "@{pipeline().DataFactory}", "pipelineName": "@{pipeline().Pipeline}", "receiver": "@pipeline().parameters.receiver" }
メッセージの本文には次のプロパティが含まれます。
Message –
@{activity('Copy1').output.dataWritten
から渡される値。 前のコピー アクティビティのプロパティにアクセスし、dataWritten の値を渡します。 失敗の場合、@{activity('CopyBlobtoBlob').error.message
の代わりにエラー出力を渡します。Data Factory Name –
@{pipeline().DataFactory}
から渡される値。これはシステム変数であり、対応するデータ ファクトリ名へのアクセスを可能にします。 サポートされているシステム変数の一覧については、「システム変数」の記事を参照してください。Pipeline Name –
@{pipeline().Pipeline}
から渡される値。 これもシステム変数であり、対応するパイプライン名へのアクセスを可能にします。Receiver – "@pipeline().parameters.receiver") から渡される値。 パイプライン パラメーターにアクセスします。
コピー アクティビティの横にある緑のボタンを Web アクティビティにドラッグ アンド ドロップして、 [コピー] アクティビティを [Web] アクティビティに接続します。
[アクティビティ] ツール ボックスからパイプライン デザイナー画面に別の [Web] アクティビティをドラッグ アンド ドロップし、 [名前] を「SendFailureEmailActivity」に設定します。
[設定] タブに切り替えて、次の手順を実行します。
[URL] で、失敗電子メールを送信するロジック アプリ ワークフローの URL を指定します。
[メソッド] に [POST] を選択します。
[ヘッダー] セクションにある + [ヘッダーの追加] リンクをクリックします。
[コンテンツの種類] というヘッダーを追加し、それを application/json に設定します。
[本文] に次の JSON を指定します。
{ "message": "@{activity('Copy1').error.message}", "dataFactoryName": "@{pipeline().DataFactory}", "pipelineName": "@{pipeline().Pipeline}", "receiver": "@pipeline().parameters.receiver" }
パイプライン デザイナーの [コピー] アクティビティを選択して +-> ボタンをクリックし、[エラー] を選択します。
コピー アクティビティの横にある赤いボタンを 2 番目の Web アクティビティ SendFailureEmailActivity にドラッグします。 パイプラインが次の図のように表示されるよう、アクティビティを移動できます。
パイプラインを検証するために、ツール バーの [検証] ボタンをクリックします。 >> ボタンをクリックして [Pipeline Validation Output](パイプラインの検証の出力) ウィンドウを閉じます。
エンティティ (データセットやパイプラインなど) を Data Factory サービスに発行するには、 [すべて公開] を選択します。 [正常に発行されました] というメッセージが表示されるまで待機します。
成功するパイプラインの実行をトリガーする
パイプラインの実行をトリガーするために、ツール バーの [トリガー] をクリックし、 [Trigger Now](今すぐトリガー) をクリックします。
[Pipeline Run](パイプラインの実行) ウィンドウで、次の手順を実行します。
sourceBlobContainer パラメーターに「adftutorial/adfv2branch/input」を入力します。
sinkBlobContainer パラメーターに「adftutorial/adfv2branch/output」を入力します。
receiver のメール アドレスを入力します。
[完了] をクリックします。
成功したパイプラインの実行を監視する
パイプラインの実行を監視するには、左側の [監視] タブに切り替えます。 手動でトリガーしたパイプラインの実行が表示されます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。
このパイプラインの実行に関連付けられているアクティビティの実行を表示するには、 [アクション] 列にある最初のリンクをクリックします。 上部の [パイプライン] をクリックすると、前のビューに戻ることができます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。
失敗するパイプラインの実行をトリガーする
左側で [編集] タブに切り替えます。
パイプラインの実行をトリガーするために、ツール バーの [トリガー] をクリックし、 [Trigger Now](今すぐトリガー) をクリックします。
[Pipeline Run](パイプラインの実行) ウィンドウで、次の手順を実行します。
- sourceBlobContainer パラメーターに「adftutorial/dummy/input」を入力します。 ダミー フォルダーが adftutorial コンテナーに存在しないことを確認します。
- sinkBlobContainer パラメーターに「adftutorial/dummy/output」を入力します。
- receiver のメール アドレスを入力します。
- [完了] をクリックします。
失敗したパイプラインの実行を監視する
パイプラインの実行を監視するには、左側の [監視] タブに切り替えます。 手動でトリガーしたパイプラインの実行が表示されます。 [最新の情報に更新] ボタンを使用して、一覧を更新します。
エラーの詳細を表示するには、パイプラインの実行の [エラー] リンクをクリックします。
このパイプラインの実行に関連付けられているアクティビティの実行を表示するには、 [アクション] 列にある最初のリンクをクリックします。 [最新の情報に更新] ボタンを使用して、一覧を更新します。 パイプラインのコピー アクティビティが失敗したことがわかります。 Web アクティビティによる指定された受信者への失敗電子メールの送信は成功しました。
[アクション] 列の [エラー] リンクをクリックして、エラーの詳細を表示します。
次のステップ
このチュートリアルでは、以下の手順を実行しました。
- データ ファクトリを作成します。
- Azure Storage のリンクされたサービスを作成します。
- Azure BLOB データセットを作成します。
- コピー アクティビティと Web アクティビティを含むパイプラインを作成します。
- アクティビティの出力を後続のアクティビティに送信します。
- パラメーターの引き渡しとシステム変数を利用します。
- パイプラインの実行を開始します。
- パイプラインとアクティビティの実行を監視します。
これで、「概念」セクションに進み、Azure Data Factory の詳細について学習できるようになりました。