Azure Data Factory 和 Azure Synapse Analytics 中的 ForEach 活動

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

ForEach 活動會在 Azure Data Factory 或 Synapse 管線中定義重複的控制流程。 此活動用於逐一查看整個集合,然後以迴圈執行指定的活動。 此活動的迴圈實作與程式設計語言中的 Foreach 迴圈結構相似。

使用 UI 建立 ForEach 活動

若要在管線中使用 ForEach 活動,請完成下列步驟:

  1. 您可以使用任何陣列類型變數或來自其他活動的輸出作為 ForEach 活動的輸入。 若要建立陣列變數,請選取管線畫布的背景,然後選取 [變數] 索引標籤,以新增陣列類型變數,如下所示。

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. 在管線 [活動] 窗格中搜尋 ForEach,然後將 ForEach 活動拖曳至管線畫布。

  3. 在畫布上選取新的 ForEach 活動 (如未選取) 和其 [設定] 索引標籤以編輯詳細資料。

    Shows the UI for a Filter activity.

  4. 選取 [項目] 欄位,然後選取 [新增動態內容] 連結以開啟動態內容編輯器窗格。

    Shows the  Add dynamic content  link for the Items property.

  5. 選取您要在動態內容編輯器中篩選的輸入陣列。 在此範例中,我們會選取在第一個步驟中建立的變數。

    Shows the dynamic content editor with the variable created in the first step selected

  6. 選取 ForEach 活動上的活動編輯器,為輸入項目陣列中的每個項目新增一或多個要執行的活動。

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. 在您於 ForEach 活動內建立的任何活動中,您可以參考 ForEach 活動從項目清單中逐一查看的目前項目。 您可以在使用動態運算式來指定屬性值的任何位置參考目前的項目。 在動態內容編輯器中,選取 ForEach 迭代器來傳回目前的項目。

    Shows the dynamic content editor with the ForEach iterator selected.

語法

本文稍後將說明這些屬性。 項目屬性為集合,而集合中的每個項目則使用 @item() 表示,如下列語法所示:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

類型屬性

屬性 說明 允許的值 必要
NAME for-each 活動的名稱。 String Yes
type 必須設定為 ForEach String Yes
isSequential 指定應該循序或以平行方式執行迴圈。 以平行方式可一次執行最多 50 個迴圈反覆項目。 例如,如果您的 ForEach 活動會反覆查詢 10 個不同來源和接收資料集的複製活動,且 isSequential 設為 False,則所有複本會都執行一次。 預設值是 False。

如果 isSequential 設定為 False,請確認有正確的設定可執行多個可執行檔。 否則,應謹慎使用這個屬性,以避免引發寫入衝突。 如需詳細資訊,請參閱平行執行一節。
布林值 否。 預設值是 False。
batchCount 批次計數,用於控制平行執行的數目 (當 isSequential 設定為 false 時)。 這是並行上限,但 for-each 活動不一定會以此數目執行 整數 (最大值 50) 否。 預設為 20。
項目 傳回要反覆查詢之 JSON 陣列的運算式。 運算式 (傳回 JSON 陣列) Yes
活動 要執行的活動。 活動清單 Yes

平行執行

如果 isSequential 設為 false,活動會以平行方式逐一查看,並行的反覆項目數最多為 50。 此設定應謹慎使用。 如果並行的反覆項目會寫入相同資料夾的不同檔案,這種方法是正常的。 如果並行的反覆項目會同時寫入相同的檔案,這種方法很可能會導致錯誤。

反覆項目運算式語言

在 ForEach 活動中,為屬性項目提供可反覆查詢的陣列。使用 @item() 反覆查詢 ForEach 活動中的單一列舉。 例如,如果項目是陣列:[1, 2, 3],@item() 在第一個反覆項目中會傳回 1,在第二個反覆項目中會傳回 2,在第三個反覆項目中會傳回 3。 您也可以使用 @range(0,10) 之類的運算式,逐一查看十個項目,以 0 開始以 9 結束。

反覆查詢單一活動

案例:從 Azure Blob 中相同的來源檔案複製到 Azure Blob 中的多個目的地檔案。

管線定義

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Blob 資料集定義

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

執行參數值

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

反覆查詢多個活動

可反覆查詢 ForEach 活動中的多個活動 (例如:複製和 Web 活動)。 在此案例中,我們建議您將多個活動提取至不同的管線。 然後,您可以在管線中使用 ExecutePipeline 活動,以 ForEach 活動叫用不同的管線來搭配多個活動。

語法

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

範例

案例:使用執行管線活動反覆查詢 ForEach 活動內的 InnerPipeline。 內部管線使用參數化的結構描述定義複製。

主要管線定義

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

內部管線定義

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

來源資料集定義

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

接收資料集定義

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

主要管線參數

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

彙總輸出

若要彙總 foreach 活動的輸出,請利用 VariableAppend Variable 活動。

首先,在管線中宣告 arrayvariable。 然後在每個 foreach 迴圈內叫用 Append Variable 活動。 接著,也可以從您的陣列中擷取彙總。

限制和因應措施

以下是 ForEach 活動和建議因應措施的一些限制。

限制 因應措施
您無法將 ForEach 迴圈內嵌在其他 ForEach 迴圈 (或 Until 迴圈) 內。 設計兩個層級的管線,其中外部管線具有外部 ForEach 迴圈,會逐一查看具有巢狀迴圈的內部管線。
ForEach 活動的平行處理 batchCount 上限為 50,最多可處理 100,000 個項目。 設計兩個層級的管線,其中外部管線具有 ForEach 活動,會逐一查看內部管線。
SetVariable 無法在平行執行的 ForEach 活動內使用,因為變數是整個管線的全域變數,其範圍不會限定為 ForEach 或任何其他活動。 請考慮使用循序 ForEach,或在 ForEach 內使用執行管線 (子管線中處理的變數/參數)。

查看其他支援的控制流程活動: