Azure Data Factory の ForEach アクティビティForEach activity in Azure Data Factory

適用対象: はいAzure Data Factory はいAzure Synapse Analytics (プレビュー) APPLIES TO: yesAzure Data Factory yesAzure Synapse Analytics (Preview)

ForEach アクティビティは、パイプライン内の繰り返し制御フローを定義します。The ForEach Activity defines a repeating control flow in your pipeline. このアクティビティは、コレクションを反復処理するために使用され、指定されたアクティビティをループで実行します。This activity is used to iterate over a collection and executes specified activities in a loop. このアクティビティのループの実装は、プログラミング言語の Foreach ループ構造に似ています。The loop implementation of this activity is similar to Foreach looping structure in programming languages.

構文Syntax

プロパティは、この記事の後の方で説明します。The properties are described later in this article. items プロパティはコレクションであり、次の構文に示すように、コレクション内の各項目は @item() を使用して参照されます。The items property is the collection and each item in the collection is referred to by using the @item() as shown in the following syntax:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

型のプロパティType properties

プロパティProperty 説明Description 使用できる値Allowed values 必須Required
namename ForEach アクティビティの名前。Name of the for-each activity. StringString はいYes
typetype ForEach に設定する必要がありますMust be set to ForEach StringString はいYes
isSequentialisSequential ループを順番に実行するか、または並行して実行するかを指定します。Specifies whether the loop should be executed sequentially or in parallel. 一度に最大 20 のループ反復処理を並行して実行できます。Maximum of 20 loop iterations can be executed at once in parallel). たとえば、isSequential が False に設定された状態で、10 個の異なるソースとシンク データセットがあるコピー アクティビティに対して ForEach アクティビティ反復処理を実行すると、一度にすべてのコピーが実行されます。For example, if you have a ForEach activity iterating over a copy activity with 10 different source and sink datasets with isSequential set to False, all copies are executed at once. 既定値は False です。Default is False.

"isSequential" が False に設定されている場合は、複数の実行可能ファイルを実行するための正しい構成が存在することを確認してください。If "isSequential" is set to False, ensure that there is a correct configuration to run multiple executables. そうでない場合は、書き込みの競合が発生しないようにするために、このプロパティを慎重に使用する必要があります。Otherwise, this property should be used with caution to avoid incurring write conflicts. 詳細については、「Parallel execution (並列実行)」セクションを参照してください。For more information, see Parallel execution section.
BooleanBoolean いいえ。No. 既定値は False です。Default is False.
batchCountbatchCount 並列実行の数を制御するために使用するバッチの数 (IsSequential が false に設定されている場合)。Batch count to be used for controlling the number of parallel execution (when isSequential is set to false). これはコンカレンシーの上限ですが、For-Each アクティビティは常にこの数値で実行されるわけではありませんThis is the upper concurrency limit, but the for-each activity will not always execute at this number Integer (最大 50)Integer (maximum 50) いいえ。No. 既定値は 20 です。Default is 20.
アイテムItems 反復処理される JSON 配列を返す式。An expression that returns a JSON Array to be iterated over. 式 (これは JSON 配列を返します)Expression (which returns a JSON Array) はいYes
ActivitiesActivities 実行されるアクティビティ。The activities to be executed. アクティビティの一覧List of Activities はいYes

並列実行Parallel execution

isSequential が False に設定されている場合、このアクティビティは最大 20 の同時実行反復処理と並行して反復処理します。If isSequential is set to false, the activity iterates in parallel with a maximum of 20 concurrent iterations. この設定は、慎重に使用する必要があります。This setting should be used with caution. 同時実行反復処理が同じフォルダーではあっても、異なるファイルへの書き込みである場合、このアプローチは適切です。If the concurrent iterations are writing to the same folder but to different files, this approach is fine. 同時実行反復処理がまったく同じファイルへの同時書き込みである場合、このアプローチはエラーの原因になる可能性があります。If the concurrent iterations are writing concurrently to the exact same file, this approach most likely causes an error.

反復処理の式言語Iteration expression language

ForEach アクティビティでは、反復処理される配列をプロパティ items として指定します。In the ForEach activity, provide an array to be iterated over for the property items." ForEach アクティビティで 1 つの列挙を反復処理するには、@item() を使用します。Use @item() to iterate over a single enumeration in ForEach activity. たとえば、items が配列: [1, 2, 3] である場合、@item() は最初の反復処理で 1 を、2 番目の反復処理で 2 を、3 番目の反復処理で 3 を返します。For example, if items is an array: [1, 2, 3], @item() returns 1 in the first iteration, 2 in the second iteration, and 3 in the third iteration.

1 つのアクティビティを反復処理するIterating over a single activity

シナリオ: Azure BLOB 内の同じソース ファイルから Azure BLOB 内の複数の宛先ファイルにコピーします。Scenario: Copy from the same source file in Azure Blob to multiple destination files in Azure Blob.

パイプラインの定義Pipeline definition

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

BLOB データセットの定義Blob dataset definition

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

実行パラメーターの値Run parameter values

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

複数のアクティビティを反復処理するIterate over multiple activities

ForEach アクティビティで複数のアクティビティ (例: コピー アクティビティと Web アクティビティ) を反復処理できます。It's possible to iterate over multiple activities (for example: copy and web activities) in a ForEach activity. このシナリオでは、複数のアクティビティを個別のパイプラインに抽象化することをお勧めします。In this scenario, we recommend that you abstract out multiple activities into a separate pipeline. 次に、ForEach アクティビティを含むパイプラインの ExecutePipeline アクティビティを使用して、複数のアクティビティを含む個別のパイプラインを呼び出すことができます。Then, you can use the ExecutePipeline activity in the pipeline with ForEach activity to invoke the separate pipeline with multiple activities.

構文Syntax

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Example

シナリオ: ExecutePipeline アクティビティを使用して ForEach アクティビティ内で InnerPipeline を反復処理します。Scenario: Iterate over an InnerPipeline within a ForEach activity with Execute Pipeline activity. 内部パイプラインは、パラメーター化されたスキーマ定義でコピーします。The inner pipeline copies with schema definitions parameterized.

マスター パイプラインの定義Master Pipeline definition

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

内部パイプラインの定義Inner pipeline definition

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

ソース データセットの定義Source dataset definition

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

シンク データセットの定義Sink dataset definition

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

マスター パイプラインのパラメーターMaster pipeline parameters

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

出力の集計Aggregating outputs

foreach アクティビティの出力を集計するには、VariablesAppend Variable アクティビティを使用してください。To aggregate outputs of foreach activity, please utilize Variables and Append Variable activity.

最初に、パイプライン内で array "変数" を宣言します。First, declare an array variable in the pipeline. 次に、各 foreach ループ内で Append Variable アクティビティを呼び出します。Then, invoke Append Variable activity inside each foreach loop. その後、配列から集計を取得できます。Subsequently, you can retrieve the aggregation from your array.

制限事項と回避策Limitations and workarounds

ForEach アクティビティと提案される回避策のいくつかの制限を次に示します。Here are some limitations of the ForEach activity and suggested workarounds.

制限事項Limitation 回避策Workaround
別の ForEach ループ (または Until ループ) 内に ForEach ループを入れ子にすることはできません。You can't nest a ForEach loop inside another ForEach loop (or an Until loop). 入れ子にされたループが含まれる内側パイプライン上で外側の ForEach ループが含まれる外側パイプラインが反復される 2 段のパイプラインを設計します。Design a two-level pipeline where the outer pipeline with the outer ForEach loop iterates over an inner pipeline with the nested loop.
ForEach アクティビティには並列処理のために最大 50 の batchCount と、最大 100,000 の項目が含まれています。The ForEach activity has a maximum batchCount of 50 for parallel processing, and a maximum of 100,000 items. 内側パイプライン上で ForEach アクティビティが含まれる外側パイプラインが反復される 2 段のパイプラインを設計します。Design a two-level pipeline where the outer pipeline with the ForEach activity iterates over an inner pipeline.

次のステップNext steps

Data Factory でサポートされている他の制御フロー アクティビティを参照してください。See other control flow activities supported by Data Factory: