Gegevens Flow activiteit in Azure Data Factory en Azure Synapse Analytics

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Gebruik de Data Flow om gegevens te transformeren en te verplaatsen via toewijzingsgegevensstromen. Als u geen tijd hebt voor gegevensstromen, zie Toewijzingsgegevens Flow overzicht

Syntax

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Typeeigenschappen

Eigenschap Beschrijving Toegestane waarden Vereist
gegevensstroom De verwijzing naar de Data Flow wordt uitgevoerd DataFlowReference Ja
integrationRuntime De rekenomgeving waarin de gegevensstroom wordt uitgevoerd. Als dit niet wordt opgegeven, wordt de Azure Integration Runtime automatisch oplossen gebruikt. IntegrationRuntimeReference Nee
compute.coreCount Het aantal kernen dat wordt gebruikt in het Spark-cluster. Kan alleen worden opgegeven als de Azure Integration Runtime automatisch wordt opgelost 8, 16, 32, 48, 80, 144, 272 Nee
compute.computeType Het type rekenkracht dat wordt gebruikt in het Spark-cluster. Kan alleen worden opgegeven als de Azure Integration Runtime automatisch wordt opgelost "Algemeen", "ComputeOptimized", "MemoryOptimized" Nee
staging.linkedService Als u een bron of sink Azure Synapse Analytics, geeft u het opslagaccount op dat wordt gebruikt voor polybase-fasering.

Als uw Azure Storage is geconfigureerd met het VNet-service-eindpunt, moet u verificatie van beheerde identiteiten gebruiken met 'vertrouwde Microsoft-service toestaan' ingeschakeld voor het opslagaccount. Raadpleeg Impact of using VNet Service Endpoints with Azure storage(Gevolgen van het gebruik van VNet-service-eindpunten met Azure Storage). Leer ook de benodigde configuraties voor respectievelijk Azure Blob en Azure Data Lake Storage Gen2.
LinkedServiceReference Alleen als de gegevensstroom naar een Azure Synapse Analytics
staging.folderPath Als u een bron of sink Azure Synapse Analytics, het mappad in het Blob Storage-account dat wordt gebruikt voor fasering van PolyBase Tekenreeks Alleen als de gegevensstroom naar de Azure Synapse Analytics
traceLevel Het logboekregistratieniveau van de uitvoering van uw gegevensstroomactiviteit instellen Fine, Coarse, None Nee

Gegevens Flow

Dynamisch gegevensstroomrekengrootte tijdens runtime

De eigenschappen Core Count en Compute Type kunnen dynamisch worden ingesteld om aan te passen aan de grootte van uw binnenkomende brongegevens tijdens runtime. Gebruik pijplijnactiviteiten zoals Opzoek of Metagegevens opschonen om de grootte van de gegevens van de brongegevensset te vinden. Gebruik vervolgens Dynamische inhoud toevoegen in de activiteitseigenschappen Flow Data Flow.

Notitie

Wanneer u stuurprogramma- en werkknooppuntkernen kiest in Azure Synapse gegevensstromen, worden altijd minimaal 3 knooppunten gebruikt.

Dynamische Flow

Hier is een korte videozelfstudie waarin deze techniek wordt uitgelegd

Data Flow Integration Runtime

Kies welke Integration Runtime moet worden gebruikt voor de uitvoering van uw Flow activiteit. De service gebruikt standaard de Azure Integration-runtime voor automatisch oplossen met vier werkkernen. Deze IR heeft een rekentype voor algemeen gebruik en wordt uitgevoerd in dezelfde regio als uw service-exemplaar. Voor operationele pijplijnen is het raadzaam om uw eigen Azure Integration Runtimes te maken die specifieke regio's, rekentype, kernen en TTL definiëren voor het uitvoeren van uw gegevensstroomactiviteit.

Een minimaal rekentype van Algemeen (geoptimaliseerd voor rekenkracht wordt niet aanbevolen voor grote workloads) met een configuratie van 8+8 (16 in totaal v-cores) en een 10 minuten is de minimale aanbeveling voor de meeste productieworkloads. Door een kleine TTL in te stellen, kan Azure IR een warm cluster onderhouden dat niet de enkele minuten van de starttijd voor een niet-koude cluster in stand houdt. U kunt de uitvoering van uw gegevensstromen nog sneller uitvoeren door 'Snel opnieuw gebruiken' te selecteren in de configuraties Azure IR gegevensstroom. Zie Azure Integration Runtime voor meer informatie.

Azure Integration Runtime

Belangrijk

De Integration Runtime in de Data Flow-activiteit is alleen van toepassing op geactiveerde uitvoeringen van uw pijplijn. Foutopsporing van uw pijplijn met gegevensstromen wordt uitgevoerd op het cluster dat is opgegeven in de foutopsporingssessie.

PolyBase

Als u een testlocatie gebruikt Azure Synapse Analytics sink of bron, moet u een faseringslocatie kiezen voor uw PolyBase-batchbelasting. PolyBase maakt het bulksgewijs laden van gegevens mogelijk in plaats van de gegevens rij voor rij te laden. PolyBase vermindert de laadtijd in de Azure Synapse Analytics.

Niveau van logboekregistratie

Als u niet elke pijplijnuitvoering van uw gegevensstroomactiviteiten nodig hebt om alle uitgebreide telemetrielogboeken volledig te registreren, kunt u eventueel uw logboekregistratieniveau instellen op Basic of Geen. Wanneer u uw gegevensstromen in de modus Uitgebreid (standaard) wilt uitvoeren, vraagt u de service om activiteiten op elk afzonderlijk partitieniveau volledig te laten inloggen tijdens uw gegevenstransformatie. Dit kan een dure bewerking zijn, zodat u de algehele prestaties van uw gegevensstroom en pijplijn alleen kunt verbeteren door uitgebreide problemen op te lossen. In de modus Basic worden alleen de duur van transformaties in een logboek bij gegeven, terwijl 'Geen' alleen een samenvatting van de duur biedt.

Niveau van logboekregistratie

Sinkeigenschappen

Met de groeperingsfunctie in gegevensstromen kunt u zowel de volgorde van uitvoering van uw sinks instellen als sinks groeperen met hetzelfde groepsnummer. Als u groepen wilt beheren, kunt u de service vragen om sinks parallel in dezelfde groep uit te voeren. U kunt de sinkgroep ook instellen om door te gaan, zelfs nadat een van de sinks een fout heeft aangetroffen.

Het standaardgedrag van gegevensstrooms sinks is om elke sink opeenvolgend, serieel uit te voeren en om de gegevensstroom uit te voeren wanneer er een fout in de sink wordt aangetroffen. Bovendien worden alle sinks standaard ingesteld op dezelfde groep, tenzij u naar de eigenschappen van de gegevensstroom gaat en verschillende prioriteiten voor de sinks in stelt.

Sinkeigenschappen

Alleen eerste rij

Deze optie is alleen beschikbaar voor gegevensstromen met cache-sinks die zijn ingeschakeld voor Uitvoer naar activiteit. De uitvoer van de gegevensstroom die rechtstreeks in uw pijplijn wordt geïnjecteerd, is beperkt tot 2 MB. Als u Alleen eerste rij instelt, kunt u de gegevensuitvoer van de gegevensstroom beperken wanneer u de uitvoer van de gegevensstroomactiviteit rechtstreeks aan uw pijplijn injecteert.

Parameters voor gegevensstromen

Geparameteriseerde gegevenssets

Als uw gegevensstroom gebruikmaakt van geparameteriseerde gegevenssets, stelt u de parameterwaarden in op Instellingen tabblad.

Data Flow parameters uitvoeren

Geparameteriseerde gegevensstromen

Als uw gegevensstroom is geparameteriseerd, stelt u de dynamische waarden van de gegevensstroomparameters in op het tabblad Parameters. U kunt de taal van de pijplijnexpressie of de expressietaal van de gegevensstroom gebruiken om dynamische of letterlijke parameterwaarden toe te wijzen. Zie Data Flow Parameters voor meer informatie.

Geparameteriseerde rekeneigenschappen.

U kunt het aantal kernen of het rekentype parameteriseren als u de Azure Integration-runtime automatisch omgeeft en waarden opgeeft voor compute.coreCount en compute.computeType.

Voorbeeld van data Flow parameter uitvoeren

Foutopsporing van data Flow pijplijnactiviteit

Als u een pijplijn voor foutopsporing wilt uitvoeren met een Data Flow-activiteit, moet u de foutopsporingsmodus van de gegevensstroom inschakelen via de schuifregelaar Data Flow Debug op de bovenste balk. Met de foutopsporingsmodus kunt u de gegevensstroom uitvoeren op een actief Spark-cluster. Zie Foutopsporingsmodus voor meer informatie.

Schermopname die laat zien waar de knop Foutopsporing is

De foutopsporingspijplijn wordt uitgevoerd op het actieve foutopsporingscluster, niet de integratieruntime-omgeving die is opgegeven in de activiteitsinstellingen Flow Data Flow. U kunt de foutopsporingsrekenomgeving kiezen bij het starten van de foutopsporingsmodus.

De Data Flow bewaken

De Data Flow-activiteit heeft een speciale bewakingservaring waar u partitionering, fasetijd en gegevensgegevens voor gegevensvereeding kunt bekijken. Open het bewakingsvenster via het pictogram van een bril onder Acties. Zie Gegevensstromen bewaken voor meer informatie.

Gegevens gebruiken Flow activiteitsresultaten in een volgende activiteit

Met de gegevensstroomactiviteit worden metrische gegevens uitgevoerd met betrekking tot het aantal rijen dat naar elke sink is geschreven en rijen die uit elke bron worden gelezen. Deze resultaten worden geretourneerd in de output sectie van het resultaat van de activiteitsuit werking. De geretourneerde metrische gegevens hebben de indeling van de onderstaande json.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Als u bijvoorbeeld het aantal rijen wilt weergeven dat is geschreven naar een sink met de naam 'sink1' in een activiteit met de naam 'dataflowActivity', gebruikt u @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten .

Gebruik om het aantal rijen op te halen dat is gelezen uit een bron met de naam 'source1' die in die sink is @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead gebruikt.

Notitie

Als een sink nul rijen heeft geschreven, wordt deze niet in metrische gegevens weergeven. Het bestaan kan worden geverifieerd met behulp van de contains functie . Controleert bijvoorbeeld contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') of er rijen naar sink1 zijn geschreven.

Volgende stappen

Zie ondersteunde controlestroomactiviteiten: