Attività ForEach in Azure Data Factory

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

L'attività ForEach definisce un flusso di controllo ripetuto nella pipeline. Questa attività viene usata per eseguire l'iterazione di una raccolta e attività specifiche in un ciclo. L'implementazione di cicli di questa attività è simile alla struttura di esecuzione in ciclo Foreach nei linguaggi di programmazione.

Sintassi

Le proprietà sono descritte più avanti in questo articolo. La proprietà items rappresenta la raccolta e ogni elemento della raccolta viene definito tramite @item(), come illustrato nella sintassi seguente:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Proprietà del tipo

Proprietà Descrizione Valori consentiti Obbligatoria
name Nome dell'attività ForEach. string
type Deve essere impostato su ForEach string
isSequential Specifica se il ciclo deve essere eseguito in sequenza o in parallelo. È possibile eseguire un numero massimo di 20 iterazioni del ciclo simultanee in parallelo. Se si dispone ad esempio di un'iterazione di attività ForEach su un'attività di copia con 10 set di dati di origine e sink diversi con isSequential impostato su False, tutte le copie vengono eseguite simultaneamente. Il valore predefinito è False.

Se "isSequential" è impostato su False, assicurarsi che sia presente una configurazione corretta per usare più eseguibili. In caso contrario, questa proprietà deve essere usata con attenzione per evitare di incorrere in conflitti di scrittura. Per altre informazioni, vedere la sezione Esecuzione parallela.
Boolean No. Il valore predefinito è False.
batchCount Numero di batch da usare per controllare il numero di esecuzione parallela (quando isSequential è impostato su Falso). Si tratta del limite di concorrenza superiore, ma l'attività for-each non verrà sempre eseguita a questo numero Valore intero (massimo 50) No. Il valore predefinito è 20.
Elementi Un'espressione che restituisce una matrice JSON su cui eseguire un'iterazione. Espressione (che restituisce una matrice JSON)
Attività Le attività da eseguire. Elenco di attività

Esecuzione parallela

Se isSequential è impostato su false, l'attività esegue le iterazioni in parallelo con un massimo di 20 iterazioni simultanee. Questa impostazione deve essere usata con cautela. Se le iterazioni simultanee scrivono nella stessa cartella, ma in file diversi, non ci sono problemi. Se le iterazioni simultanee scrivono contemporaneamente in esattamente lo stesso file, questo approccio causa un errore.

Linguaggio delle espressioni di iterazione

Nell'attività ForEach, fornire una matrice di cui eseguire un'iterazione per la proprietà items." Usare @item() per eseguire l'iterazione su un'unica enumerazione nell'attività ForEach. Ad esempio, se items è una matrice: [1, 2, 3], @item() restituisce 1 nella prima iterazione, 2 nella seconda iterazione e 3 nella terza iterazione. È anche possibile usare @range(0,10) l'espressione like per scorrere dieci volte a partire da 0 che termina con 9.

Iterazione su una singola attività

Scenario: copia dello stesso file di origine del BLOB di Azure in più file di destinazione nel BLOB di Azure.

Definizione della pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definizione del set di dati BLOB

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Valori del parametro di esecuzione

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Eseguire l'iterazione su più attività

È possibile eseguire l'iterazione su più attività (ad esempio attività di copia e web) in un'attività ForEach. In questo scenario, si consiglia di astrarre più attività in una pipeline distinta. È quindi possibile usare l'attività ExecutePipeline nella pipeline con l'attività ForEach per richiamare la pipeline distinta con più attività.

Sintassi

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Esempio

Scenario: iterazione su un oggetto InnerPipeline in un'attività ForEach con l'attività di ExecutePipeline. La pipeline interna viene copiata con le definizioni dello schema parametrizzate.

Definizione della pipeline master

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definizione della pipeline interna

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati di origine

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definizione del set di dati sink

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parametri della pipeline master

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Aggregazione di output

Per aggregare gli output dell'attività foreach, usare le variabili e l'attività Append Variable.

Innanzitutto, dichiarare una array variabile nella pipeline. Quindi, richiamare l'attività Aggiungi variabile all'interno di ogni ciclo foreach. Successivamente, è possibile recuperare l'aggregazione dall'array.

Limitazioni e soluzioni alternative

Di seguito vengono descritte alcune limitazioni dell'attività ForEach con le soluzioni alternative suggerite.

Limitazione Soluzione alternativa
Non è possibile annidare un ciclo ForEach all'interno di un altro ciclo ForEach (o un ciclo Until). Progettare una pipeline a due livelli, in cui la pipeline esterna con il ciclo ForEach esterno esegue l'iterazione su una pipeline interna con il ciclo annidato.
Per l'attività ForEach sono previsti un batchCount massimo di 50 per l'elaborazione parallela e un massimo di 100.000 elementi. Progettare una pipeline a due livelli in cui la pipeline esterna con l'attività ForEach esegue l'iterazione su una pipeline interna.
SetVariable non può essere usato all'interno di un'attività ForEach eseguita in parallelo perché le variabili sono globali per l'intera pipeline, non hanno come ambito un'attività ForEach o qualsiasi altra attività. Prendere in considerazione l'uso di ForEach sequenziale o l'uso di Esegui pipeline all'interno di ForEach (variabile/parametro gestito nella pipeline figlio).

Passaggi successivi

Vedere altre attività del flusso di controllo supportate da Data Factory: