ForEach-Aktivität in Azure Data Factory und Azure Synapse Analytics

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

Die ForEach-Aktivität definiert eine sich wiederholende Ablaufsteuerung in einer Azure Data Factory- oder Synapse-Pipeline. Diese Aktivität wird verwendet, um eine Sammlung zu durchlaufen. Sie führt die angegebenen Aktivitäten in einer Schleife aus. Die Schleifenimplementierung dieser Aktivität ähnelt der Foreach-Schleifenstruktur in Programmiersprachen.

Erstellen einer ForEach-Aktivität mit Benutzeroberfläche

Führen Sie die folgenden Schritte aus, um eine ForEach-Aktivität in einer Pipeline zu verwenden:

  1. Als Eingabe für Ihre ForEach-Aktivität können Sie eine beliebige Arraytypvariable oder Ausgaben anderer Aktivitäten verwenden. Wählen Sie zum Erstellen einer Arrayvariablen den Hintergrund der Pipelinecanvas und anschließend die Registerkarte Variablen aus, um wie unten gezeigt eine Arraytypvariable hinzuzufügen.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. Suchen Sie im Bereich mit den Pipelineaktivitäten nach ForEach, und ziehen Sie eine ForEach-Aktivität in den Pipelinebereich.

  3. Wählen Sie auf der Canvas die neue ForEach-Aktivität aus, sofern sie noch nicht ausgewählt ist, und wählen Sie anschließend die Registerkarte Einstellungen aus, um die Details zu bearbeiten.

    Shows the UI for a Filter activity.

  4. Wählen Sie das Feld Elemente und anschließend den Link Dynamischen Inhalt hinzufügen aus, um den Bereich mit dem Editor für dynamische Inhalte zu öffnen.

    Shows the  Add dynamic content  link for the Items property.

  5. Wählen Sie Ihr Eingabearray aus, das im Editor für dynamische Inhalte gefiltert werden soll. In diesem Beispiel wird die im ersten Schritt erstellte Variable ausgewählt.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Wählen Sie den Aktivitäten-Editor für die ForEach-Aktivität aus, um mindestens eine Aktivität hinzuzufügen, die für jedes Element im Eingabearray Elemente ausgeführt werden soll.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. In allen Aktivitäten, die Sie innerhalb der ForEach-Aktivität erstellen, können Sie auf das aktuelle Element, das von der ForEach-Aktivität durchlaufen wird, über die Liste Elemente verweisen. Auf das aktuelle Element kann überall dort verwiesen werden, wo ein dynamischer Ausdruck zur Angabe eines Eigenschaftswert verwendet werden kann. Wählen Sie im Editor für dynamische Inhalte den ForEach-Iterator aus, um das aktuelle Element zurückzukehren.

    Shows the dynamic content editor with the ForEach iterator selected.

Syntax

Die Eigenschaften werden weiter unten in diesem Artikel beschrieben. Die Eigenschaft „items“ ist die Sammlung, und auf die einzelnen Elemente in der Sammlung wird mit @item() verwiesen, wie im folgenden Syntaxbeispiel gezeigt:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Typeigenschaften

Eigenschaft BESCHREIBUNG Zulässige Werte Erforderlich
name Name der ForEach-Aktivität. String Ja
type Muss auf ForEach festgelegt sein. String Ja
isSequential Gibt an, ob die Schleife sequenziell oder parallel ausgeführt werden soll. Es können maximal 50 Schleifeniterationen parallel ausgeführt werden. Beispiel: Bei einer ForEach-Aktivität, die eine Kopieraktivität mit 10 unterschiedlichen Quell- und Senkendatasets durchläuft, während isSequential auf „false“ festgelegt ist, werden alle Kopien gleichzeitig ausgeführt. Der Standardwert lautet False.

Wenn „isSequential“ auf „false“ festgelegt ist, stellen Sie sicher, dass die Konfiguration die Ausführung mehrerer ausführbarer Dateien ermöglicht. Andernfalls sollte diese Eigenschaft vorsichtig verwendet werden, um Schreibkonflikte zu vermeiden. Weitere Informationen finden Sie im Abschnitt Parallele Ausführung.
Boolean Nein. Der Standardwert lautet False.
batchCount Batchanzahl, die zum Steuern der Anzahl der parallelen Ausführungen verwendet werden soll (wenn „isSequential“ auf „false“ festgelegt ist). Dies ist das obere Parallelitätslimit, aber die ForEach-Aktivität wird nicht immer mit dieser Zahl ausgeführt. Ganze Zahl (maximal 50) Nein. Der Standardwert ist 20.
Items Ein Ausdruck, der ein JSON-Array zurückgibt, das durchlaufen werden soll. Ausdruck (der ein JSON-Array zurückgibt) Ja
activities Die Aktivitäten, die ausgeführt werden sollen. Liste der Aktivitäten Ja

Parallele Ausführung

Wenn isSequential auf FALSE festgelegt ist, werden maximal 50 Iterationen der Aktivität gleichzeitig ausgeführt. Diese Einstellung sollte vorsichtig verwendet werden. Wenn die gleichzeitigen Iterationen in den gleichen Ordner, aber in andere Dateien schreiben, ist dieser Ansatz gut. Wenn die gleichzeitigen Iterationen gleichzeitig in genau dieselbe Datei schreiben, verursacht dieser Ansatz wahrscheinlich einen Fehler.

Sprache für Iterationsausdrücke

Geben Sie in der ForEach-Aktivität ein Array an, das für die Eigenschaft items durchlaufen werden soll. Mit @item() kann eine einzelne Enumeration in der ForEach-Aktivität durchlaufen werden. Beispiel: Wenn items ein Array wie [1, 2, 3] ist, gibt @item() in der ersten Iteration 1 zurück, in der zweiten Iteration 2 und in der dritten Iteration 3. Sie können @range(0,10) den auch „Gefällt mir“ Ausdruck verwenden, um zehnmal zu iterieren, beginnend mit 0 bis 9.

Durchlaufen einer einzelnen Aktivität

Szenario: Kopieren aus einer Quelldatei im Azure-Blob in mehrere Zieldateien im Azure-Blob.

Definition der Pipeline

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Definition des Blobdatasets

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Verwendete Parameterwerte

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Durchlaufen mehrerer Aktivitäten

Es ist möglich, mehrere Aktivitäten in einer ForEach-Aktivität zu durchlaufen (zum Beispiel: Kopier- und Webaktivitäten). In diesem Szenario wird empfohlen, dass Sie mehrere Aktivitäten in eine separate Pipeline abstrahieren. Anschließend können Sie die ExecutePipeline-Aktivität in der Pipeline mit der ForEach-Aktivität verwenden, um die separate Pipeline mit mehreren Aktivitäten aufzurufen.

Syntax

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Beispiel

Szenario: Durchlaufen einer inneren Pipeline in einer ForEach-Aktivität mit der Aktivität „Pipeline ausführen“. Die innere Pipeline wird mit parametrisierten Schemadefinitionen kopiert.

Definition der Masterpipeline

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

Definition der inneren Pipeline

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Definition des Quelldatasets

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Definition des Senkendatasets

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Parameter der Masterpipeline

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Aggregieren von Ausgaben

Um Ausgaben der foreach-Aktivität zu aggregieren, verwenden Sie Variables und die Aktivität Variable anfügen.

Deklarieren Sie zunächst eine array-Variable in der Pipeline. Rufen Sie anschließend die Aktivität Variable anfügen innerhalb jeder foreach-Schleife auf. Anschließend können Sie die Aggregation aus Ihrem Array abrufen.

Einschränkungen und Problemumgehungen

Hier finden Sie einige Einschränkungen der ForEach-Aktivität sowie empfohlene Problemumgehungen.

Einschränkung Problemumgehung
Eine ForEach-Schleife kann nicht in einer anderen ForEach-Schleife (oder in einer Until-Schleife) geschachtelt werden. Entwerfen Sie eine Pipeline mit zwei Ebenen, bei der die äußere Pipeline mit der äußeren ForEach-Schleife eine innere Pipeline mit der geschachtelten Schleife durchläuft.
Bei der ForEach-Aktivität gilt eine batchCount-Obergrenze von 50 für die parallele Verarbeitung sowie eine Obergrenze von 100.000 Elementen. Entwerfen Sie eine Pipeline mit zwei Ebenen, bei der die äußere Pipeline mit der ForEach-Aktivität eine innere Pipeline durchläuft.
„SetVariable“ kann nicht innerhalb einer parallel ausgeführten ForEach-Aktivität verwendet werden, da die Variablen nicht für eine ForEach-Aktivität oder für eine andere Aktivität spezifisch, sondern für die gesamte Pipeline global sind. Verwenden Sie ggf. ein sequenzielles ForEach-Element, oder verwenden Sie „Pipeline ausführen“ innerhalb von „ForEach“ (mit Behandlung der Variable/des Parameters in einer untergeordneten Pipeline).

Informationen zu weiteren unterstützten Ablaufsteuerungsaktivitäten: