Attività ForEach in Azure Data FactoryForEach activity in Azure Data Factory

L'attività ForEach definisce un flusso di controllo ripetuto nella pipeline.The ForEach Activity defines a repeating control flow in your pipeline. Questa attività viene usata per eseguire l'iterazione di una raccolta e attività specifiche in un ciclo.This activity is used to iterate over a collection and executes specified activities in a loop. L'implementazione di cicli di questa attività è simile alla struttura di esecuzione cicli Foreach nei linguaggi di programmazione.The loop implementation of this activity is similar to Foreach looping structure in programming languages.

SintassiSyntax

Le proprietà sono descritte più avanti in questo articolo.The properties are described later in this article. La proprietà items rappresenta la raccolta e ogni elemento della raccolta viene definito tramite @item(), come illustrato nella sintassi seguente:The items property is the collection and each item in the collection is referred to by using the @item() as shown in the following syntax:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Proprietà del tipoType properties

ProprietàProperty DescrizioneDescription Valori consentitiAllowed values ObbligatorioRequired
namename Nome dell'attività ForEach.Name of the for-each activity. StringString Yes
typetype Deve essere impostato su ForEachMust be set to ForEach StringString Yes
isSequentialisSequential Specifica se il ciclo deve essere eseguito in sequenza o in parallelo.Specifies whether the loop should be executed sequentially or in parallel. È possibile eseguire un numero massimo di 20 iterazioni del ciclo simultanee in parallelo.Maximum of 20 loop iterations can be executed at once in parallel). Se si dispone ad esempio di un'iterazione di attività ForEach su un'attività di copia con 10 set di dati di origine e sink diversi con isSequential impostato su False, tutte le copie vengono eseguite simultaneamente.For example, if you have a ForEach activity iterating over a copy activity with 10 different source and sink datasets with isSequential set to False, all copies are executed at once. Il valore predefinito è False.Default is False.

Se "isSequential" è impostato su False, assicurarsi che sia presente una configurazione corretta per usare più eseguibili.If "isSequential" is set to False, ensure that there is a correct configuration to run multiple executables. In caso contrario, questa proprietà deve essere usata con attenzione per evitare di incorrere in conflitti di scrittura.Otherwise, this property should be used with caution to avoid incurring write conflicts. Per altre informazioni, vedere la sezione Esecuzione parallela.For more information, see Parallel execution section.
BooleanoBoolean No.No. Il valore predefinito è False.Default is False.
batchCountbatchCount Numero di batch da usare per controllare il numero di esecuzione parallela (quando isSequential è impostato su Falso).Batch count to be used for controlling the number of parallel execution (when isSequential is set to false). Valore intero (massimo 50)Integer (maximum 50) No.No. Il valore predefinito è 20.Default is 20.
ItemsItems Un'espressione che restituisce una matrice JSON su cui eseguire un'iterazione.An expression that returns a JSON Array to be iterated over. Espressione (che restituisce una matrice JSON)Expression (which returns a JSON Array) Yes
attivitàActivities Le attività da eseguire.The activities to be executed. Elenco di attivitàList of Activities Yes

Esecuzione parallelaParallel execution

Se isSequential è impostato su false, l'attività esegue le iterazioni in parallelo con un massimo di 20 iterazioni simultanee.If isSequential is set to false, the activity iterates in parallel with a maximum of 20 concurrent iterations. Questa impostazione deve essere usata con cautela.This setting should be used with caution. Se le iterazioni simultanee scrivono nella stessa cartella, ma in file diversi, non ci sono problemi.If the concurrent iterations are writing to the same folder but to different files, this approach is fine. Se le iterazioni simultanee scrivono contemporaneamente in esattamente lo stesso file, questo approccio causa un errore.If the concurrent iterations are writing concurrently to the exact same file, this approach most likely causes an error.

Linguaggio delle espressioni di iterazioneIteration expression language

Nell'attività ForEach, fornire una matrice di cui eseguire un'iterazione per la proprietà items."In the ForEach activity, provide an array to be iterated over for the property items." Usare @item() per eseguire l'iterazione su un'unica enumerazione nell'attività ForEach.Use @item() to iterate over a single enumeration in ForEach activity. Ad esempio, se items è una matrice: [1, 2, 3], @item() restituisce 1 nella prima iterazione, 2 nella seconda iterazione e 3 nella terza iterazione.For example, if items is an array: [1, 2, 3], @item() returns 1 in the first iteration, 2 in the second iteration, and 3 in the third iteration.

Iterazione su una singola attivitàIterating over a single activity

Scenario: copia dello stesso file di origine del BLOB di Azure in più file di destinazione nel BLOB di Azure.Scenario: Copy from the same source file in Azure Blob to multiple destination files in Azure Blob.

Definizione della pipelinePipeline definition

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definizione del set di dati BLOBBlob dataset definition

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valori del parametro di esecuzioneRun parameter values

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Eseguire l'iterazione su più attivitàIterate over multiple activities

È possibile eseguire l'iterazione su più attività (ad esempio attività di copia e web) in un'attività ForEach.It's possible to iterate over multiple activities (for example: copy and web activities) in a ForEach activity. In questo scenario, si consiglia di astrarre più attività in una pipeline distinta.In this scenario, we recommend that you abstract out multiple activities into a separate pipeline. È quindi possibile usare l'attività ExecutePipeline nella pipeline con l'attività ForEach per richiamare la pipeline distinta con più attività.Then, you can use the ExecutePipeline activity in the pipeline with ForEach activity to invoke the separate pipeline with multiple activities.

SintassiSyntax

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

EsempioExample

Scenario: iterazione su un oggetto InnerPipeline in un'attività ForEach con l'attività di ExecutePipeline.Scenario: Iterate over an InnerPipeline within a ForEach activity with Execute Pipeline activity. La pipeline interna viene copiata con le definizioni dello schema parametrizzate.The inner pipeline copies with schema definitions parameterized.

Definizione della pipeline masterMaster Pipeline definition

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definizione della pipeline internaInner pipeline definition

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati di origineSource dataset definition

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati sinkSink dataset definition

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametri della pipeline masterMaster pipeline parameters

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Aggregazione di outputAggregating outputs

Per aggregare gli output dell'attività foreach , utilizzare le variabili e aggiungere l'attività di variabile .To aggregate outputs of foreach activity, please utilize Variables and Append Variable activity.

Innanzitutto, dichiarare una array variabile nella pipeline.First, declare an array variable in the pipeline. Quindi, richiamare l'attività Aggiungi variabile all'interno di ogni ciclo foreach.Then, invoke Append Variable activity inside each foreach loop. Successivamente, è possibile recuperare l'aggregazione dall'array.Subsequently, you can retrieve the aggregation from your array.

Limitazioni e soluzioni alternativeLimitations and workarounds

Di seguito vengono descritte alcune limitazioni dell'attività ForEach con le soluzioni alternative suggerite.Here are some limitations of the ForEach activity and suggested workarounds.

LimitazioneLimitation Soluzione alternativaWorkaround
Non è possibile annidare un ciclo ForEach all'interno di un altro ciclo ForEach (o un ciclo Until).You can't nest a ForEach loop inside another ForEach loop (or an Until loop). Progettare una pipeline a due livelli, in cui la pipeline esterna con il ciclo ForEach esterno esegue l'iterazione su una pipeline interna con il ciclo annidato.Design a two-level pipeline where the outer pipeline with the outer ForEach loop iterates over an inner pipeline with the nested loop.
Per l'attività ForEach sono previsti un batchCount massimo di 50 per l'elaborazione parallela e un massimo di 100.000 elementi.The ForEach activity has a maximum batchCount of 50 for parallel processing, and a maximum of 100,000 items. Progettare una pipeline a due livelli in cui la pipeline esterna con l'attività ForEach esegue l'iterazione su una pipeline interna.Design a two-level pipeline where the outer pipeline with the ForEach activity iterates over an inner pipeline.

Passaggi successiviNext steps

Vedere altre attività del flusso di controllo supportate da Data Factory:See other control flow activities supported by Data Factory: