Pipeline e attività in Azure Data Factory

Questo articolo fornisce informazioni sulle pipeline e sulle attività in Azure Data Factory e su come usarle per creare flussi di lavoro completi basati sui dati per gli scenari di elaborazione e trasferimento dei dati.

Nota

Questo articolo è da leggersi dopo aver consultato l' Introduzione a Data factory di Azure. Se non si ha esperienza diretta nella creazione di data factory, l'esercitazione sulla trasformazione dei dati e/o quella sullo spostamento dei dati può essere utile per comprendere meglio questo articolo.

Panoramica

Una data factory può comprendere una o più pipeline. Una pipeline è un raggruppamento logico di attività che insieme eseguono un compito. Le attività in una pipeline definiscono le azioni da eseguire sui dati. Ad esempio, è possibile usare un'attività di copia per copiare i dati da un Server SQL locale a un'archiviazione BLOB di Azure. Quindi, usare un'attività Hive che esegue uno script Hive in un cluster HDInsight di Azure per elaborare o trasformare i dati dall'archivio BLOB per produrre dati di output. Infine, usare una seconda attività di copia per copiare i dati di output in un Azure SQL Data Warehouse in cui vengono compilate le soluzioni di report di business intelligence, BI.

Un'attività può non avere alcun set di dati di input o può averne più di uno e generare uno o più set di dati di output. Nel diagramma seguente viene illustrata la relazione tra attività, set di dati e pipeline in Data Factory:

Relazione tra pipeline, attività e set di dati

Una pipeline consente di gestire le attività come un set anziché singolarmente. Ad esempio, è possibile distribuire, pianificare, sospendere e riprendere una pipeline, anziché gestire le attività della pipeline singolarmente.

Data Factory supporta due tipi di attività: attività di spostamento dei dati e attività di trasformazione dei dati. Ogni attività può non avere alcun set di dati di input o può averne più di uno e generare uno o più set di dati di output.

Un set di dati di input rappresenta l'input per un'attività nella pipeline, un set di dati di output rappresenta l'output dell'attività. I set di dati identificano i dati all'interno dei diversi archivi dati, come tabelle, file, cartelle e documenti. Dopo aver creato un set di dati, è possibile usarlo con le attività in una pipeline. Ad esempio, un set di dati può essere configurato come set di dati di input o di output di un'attività di copia o un'attività HDInsightHive. Per altre informazioni sui set di dati, vedere l'articolo Set di dati in Azure Data Factory.

Attività di spostamento dei dati

L'attività di copia in Data Factory esegue la copia dei dati da un archivio dati di origine a un archivio dati sink. Data Factory supporta gli archivi dati seguenti. I dati da qualsiasi origine possono essere scritti in qualsiasi sink. Fare clic su un archivio dati per informazioni su come copiare dati da e verso tale archivio.

Categoria Archivio dati Supportato come origine Supportato come sink
Azure Archivio BLOB di Azure
  Azure Cosmos DB (API di DocumentDB)
  Archivio Data Lake di Azure
  Database SQL di Azure
  Azure SQL Data Warehouse
  Indice di Ricerca di Azure
  Archivio tabelle di Azure
Database Amazon Redshift
  DB2*
  MySQL*
  Oracle*
  PostgreSQL*
  SAP Business Warehouse*
  SAP HANA*
  SQL Server*
  Sybase*
  Teradata*
NoSQL Cassandra*
  MongoDB*
File Amazon S3
  File system*
  FTP
  HDFS*
  SFTP
Altro HTTP generico
  OData generico
  ODBC generico*
  Salesforce
  Tabella Web (tabella da HTML)
  GE Historian*
Nota

Gli archivi dati contrassegnati da un asterisco (*) possono essere locali o in IaaS di Azure e richiederanno l'installazione del Gateway di gestione dati in un computer IaaS locale o in Azure.

Per altre informazioni, vedere l'articolo Attività di spostamento dei dati.

Attività di trasformazione dei dati

Azure Data Factory supporta le seguenti attività di trasformazione che possono essere aggiunte alle pipeline singolarmente o con un'altra attività concatenata.

Attività di trasformazione dei dati Ambiente di calcolo
Hive HDInsight [Hadoop]
Pig HDInsight [Hadoop]
MapReduce HDInsight [Hadoop]
Hadoop Streaming HDInsight [Hadoop]
Spark HDInsight [Hadoop]
Attività di Machine Learning: esecuzione batch e aggiornamento risorse Macchina virtuale di Azure
Stored procedure Azure SQL, Azure SQL Data Warehouse o SQL Server
Attività U-SQL di Data Lake Analytics Azure Data Lake Analytics.
DotNet HDInsight [Hadoop] o Batch di Azure
Nota

È possibile usare l'attività MapReduce per eseguire i programmi Spark nel cluster HDInsight Spark. Per informazioni dettagliate, vedere Chiamare i programmi Spark da Data Factory . È possibile creare un'attività personalizzata per eseguire gli script R nel cluster HDInsight con R installato. Vedere RunRScriptUsingADFSample(Esempio relativo all'esecuzione di script R con Azure Data Factory).

Per altre informazioni, vedere l'articolo Attività di trasformazione dei dati.

Attività .NET personalizzate

Per spostare i dati da e verso un archivio dati che non è supportato dall'attività di copia o per trasformare i dati usando la propria logica, creare un'attività .NET personalizzata. Per i dettagli sulla creazione e l'uso di un'attività personalizzata, vedere l'articolo Usare attività personalizzate in una pipeline di Azure Data Factory.

Pianificare le pipeline

Una pipeline è attiva solo tra l'ora di inizio e l'ora di fine. Non viene eseguita prima dell'ora di inizio o dopo l'ora di fine. Se la pipeline viene sospesa, non viene eseguita indipendentemente dall'ora di inizio e di fine. Per essere eseguita, una pipeline non deve essere in pausa. Vedere Pianificazione ed esecuzione con Data factory per comprendere il funzionamento della pianificazione e dell'esecuzione in Data factory di Azure.

Pipeline JSON

Osserviamo più da vicino come viene definita una pipeline nel formato JSON. La struttura generica di una pipeline è simile a quella indicata di seguito:

{
    "name": "PipelineName",
    "properties": 
    {
        "description" : "pipeline description",
        "activities":
        [

        ],
        "start": "<start date-time>",
        "end": "<end date-time>",
        "isPaused": true/false,
        "pipelineMode": "scheduled/onetime",
        "expirationTime": "15.00:00:00",
        "datasets": 
        [
        ]
    }
}
Tag Descrizione Obbligatorio
name Nome della pipeline. Specificare un nome che rappresenti l'azione eseguita dalla pipeline.
  • Numero massimo di caratteri: 260
  • Deve iniziare con una lettera, un numero o un carattere di sottolineatura (_)
  • Non sono ammessi i caratteri seguenti: ".", "+", "?", "/", "<", ">", "*", "%", "&", ":", "\"
Descrizione Specificare il testo descrittivo che illustra lo scopo della pipeline.
attività Nella sezione delle attività possono essere definite una o più attività. Vedere la sezione successiva per informazioni dettagliate sulle attività dell'elemento JSON.
start Data e ora di inizio per la pipeline. Devono essere nel formato ISO, Ad esempio: 2016-10-14T16:32:41Z.

È possibile specificare un'ora locale, ad esempio in base al fuso orario EST. Di seguito viene riportato un esempio: 2016-02-27T06:00:00-05:00, che indica le 06:00 EST.

Le proprietà start ed end insieme specificano il periodo attivo per la pipeline. Le sezioni di output vengono generate solo in questo periodo attivo.
No

Se si specifica un valore per la proprietà di fine, è necessario specificare un valore anche per la proprietà di avvio.

L'ora di inizio e l'ora di fine possono essere entrambe vuote per creare una pipeline. È necessario specificare entrambi i valori per impostare un periodo attivo per l'esecuzione della pipeline. Se non si specificano le ore di inizio e fine durante la creazione di una pipeline, è possibile impostare tali valori in un secondo momento usando il cmdlet Set-AzureRmDataFactoryPipelineActivePeriod.
end Data e ora di fine per la pipeline. Se specificate, devono essere in formato ISO. Ad esempio: 2016-10-14T17:32:41Z

È possibile specificare un'ora locale, ad esempio in base al fuso orario EST. Di seguito è fornito l'esempio 2016-02-27T06:00:00-05:00, che indica le 6 EST.

Per eseguire la pipeline illimitatamente, specificare 9999-09-09 come valore per la proprietà end.

Una pipeline è attiva solo tra l'ora di inizio e l’ora di fine. Non viene eseguita prima dell'ora di inizio o dopo l'ora di fine. Se la pipeline viene sospesa, non viene eseguita indipendentemente dall'ora di inizio e di fine. Per essere eseguita, una pipeline non deve essere in pausa. Vedere Pianificazione ed esecuzione con Data factory per comprendere il funzionamento della pianificazione e dell'esecuzione in Data factory di Azure.
No

Se si specifica un valore per la proprietà di avvio, è necessario specificare un valore anche per la proprietà di fine.

Vedere le note della proprietà start .
isPaused Se impostata su true, la pipeline non viene eseguita. È in stato di sospensione. Valore predefinito = false. È possibile usare questa proprietà per abilitare o disabilitare una pipeline. No
pipelineMode Metodo di pianificazione delle esecuzioni per la pipeline. I valori consentiti sono scheduled (predefinito) e onetime.

"Scheduled" indica che la pipeline viene eseguita a intervalli di tempo specificati in base al periodo di attività, ovvero all'ora di inizio e di fine. "Onetime" indica che la pipeline viene eseguita una sola volta. Al momento, dopo aver creato una pipeline monouso non è possibile modificarla o aggiornarla. Per informazioni dettagliate sull'impostazione onetime, vedere la sezione Pipeline monouso .
No
expirationTime Periodo di tempo dopo la creazione in cui la pipeline monouso è valida e deve rimanere con provisioning eseguito. Se non ha esecuzioni attive, non riuscite o in sospeso, quando viene raggiunta la scadenza, la pipeline viene eliminata automaticamente. Il valore predefinito: "expirationTime": "3.00:00:00" No
set di dati Elenco dei set di dati usati dalle attività definite nella pipeline. Questa proprietà può essere usata per definire i set di dati specifici di questa pipeline e non definiti all'interno della data factory. I set di dati definiti all'interno di questa pipeline possono essere usati solo da questa pipeline e non possono essere condivisi. Per informazioni dettagliate, vedere la sezione Set di dati con ambito . No

Attività JSON

Nella sezione delle attività possono essere definite una o più attività. Ogni attività presenta la seguente struttura di livello superiore:

{
    "name": "ActivityName",
    "description": "description", 
    "type": "<ActivityType>",
    "inputs":  "[]",
    "outputs":  "[]",
    "linkedServiceName": "MyLinkedService",
    "typeProperties":
    {

    },
    "policy":
    {
    },
    "scheduler":
    {
    }
}

La tabella seguente descrive le proprietà all'interno della definizione JSON dell'attività:

Tag Descrizione Obbligatorio
name Nome dell'attività. Specificare un nome che rappresenti l'azione eseguita dall'attività.
  • Numero massimo di caratteri: 260
  • Deve iniziare con una lettera, un numero o un carattere di sottolineatura (_)
  • Non sono ammessi i caratteri seguenti: ".", "+", "?", "/", "<", ">", "*", "%", "&", ":", "\"
Descrizione Testo descrittivo per il tipo o lo scopo dell'attività
type Tipo di attività. Per informazioni sui diversi tipi di attività, vedere le sezioni Attività di spostamento dei dati e Attività di trasformazione dei dati.
inputs Tabelle di input usate dall'attività

// one input table
"inputs": [ { "name": "inputtable1" } ],

// two input tables
"inputs": [ { "name": "inputtable1" }, { "name": "inputtable2" } ],
outputs Tabelle di output usate dall'attività.

// one output table
"outputs": [ { "name": "outputtable1" } ],

//two output tables
"outputs": [ { "name": "outputtable1" }, { "name": "outputtable2" } ],
linkedServiceName Nome del servizio collegato usato dall'attività.

Per un'attività può essere necessario specificare il servizio collegato che collega all'ambiente di calcolo richiesto.
Sì per Attività di HDInsight e Attività di assegnazione punteggio batch di Azure Machine Learning

No per tutto il resto
typeProperties Le proprietà nella sezione typeProperties dipendono dal tipo di attività. Per visualizzare le proprietà del tipo per un'attività, fare clic sui collegamenti all'attività nella sezione precedente. No
policy Criteri che influiscono sul comportamento di runtime dell'attività. Se vengono omessi, vengono usati i criteri predefiniti. No
scheduler La proprietà "scheduler" viene usata per definire la pianificazione per l'attività. Le relative proprietà secondarie sono quelle indicate nella sezione Disponibilità dei set di dati. No

Criteri

I criteri influiscono sul comportamento in fase di esecuzione di un'attività, in particolare quando viene elaborata la sezione di una tabella. La tabella seguente fornisce informazioni dettagliate.

Proprietà Valori consentiti Valore predefinito Descrizione
Concorrenza Integer

Valore massimo: 10
1 Numero di esecuzioni simultanee dell'attività.

Determina il numero di esecuzioni di attività parallele che possono verificarsi in sezioni diverse. Ad esempio, se un'attività deve passare attraverso grandi set di dati disponibili, con un valore di concorrenza maggiore che consente di velocizzare l'elaborazione dei dati.
executionPriorityOrder NewestFirst

OldestFirst
OldestFirst Determina l'ordine delle sezioni di dati che vengono elaborate.

Ad esempio nel caso in cui si abbiano 2 sezioni, una alle 16.00 e l'altra alle 17.00, ed entrambe siano in attesa di esecuzione. Se si imposta executionPriorityOrder su NewestFirst, viene elaborata per prima la sezione delle 17:00. Allo stesso modo, se si imposta executionPriorityORder su OldestFIrst, verrà elaborata per prima la sezione delle 16:00.
retry Integer

Valore massimo: 10
0 Numero di tentativi prima che l'elaborazione dei dati per la sezione sia contrassegnata come errore. L'esecuzione dell’attività per una sezione di dati viene ritentata fino al numero di tentativi specificato. Il tentativo viene eseguito appena possibile dopo l'errore.
timeout TimeSpan 00:00:00 Timeout per l'attività. Esempio: 00:10:00 (implica un timeout di 10 minuti)

Se un valore non è specificato o è 0, il timeout è infinito.

Se il tempo di elaborazione dei dati in una sezione supera il valore di timeout, viene annullato e il sistema prova a ripetere l'elaborazione. Il numero di tentativi dipende dalla proprietà retry. Quando si verifica il timeout, lo stato viene impostato su TimedOut.
delay TimeSpan 00:00:00 Specificare il ritardo prima che abbia inizio l'elaborazione dei dati della sezione.

L'esecuzione dell'attività per una sezione di dati viene avviata non appena il ritardo supera il tempo di esecuzione previsto.

Esempio: 00:10:00 (implica un ritardo di 10 minuti)
longRetry Integer

Valore massimo: 10
1 Numero di tentativi estesi prima che l'esecuzione della sezione dia esito negativo.

I tentativi longRetry sono distanziati da longRetryInterval. Pertanto, se è necessario specificare un tempo tra i tentativi, utilizzare longRetry. Se si specificano sia Retry che longRetry, ogni tentativo longRetry include tentativi Retry e il numero massimo di tentativi corrisponde a Retry * longRetry.

Ad esempio, se si hanno le seguenti impostazioni nel criterio attività:
Retry: 3
longRetry: 2
longRetryInterval: 01:00:00

si presume che la sezione da eseguire sia solo una, con stato Waiting, e che l'esecuzione dell'attività abbia ogni volta esito negativo. All’inizio vi saranno tre tentativi di esecuzione consecutivi. Dopo ogni tentativo, lo stato della sezione sarà Retry. Una volta terminati i 3 tentativi sulla sezione, lo stato sarà LongRetry.

Dopo un'ora (vale a dire il valore di longRetryInteval), verrà eseguita un'altra serie di 3 tentativi di esecuzione consecutivi. Al termine, lo stato della sezione sarà Failed e non verranno eseguiti altri tentativi. Quindi, sono stati eseguiti 6 tentativi.

Se un'esecuzione ha esito positivo, lo stato della sezione sarà Ready e non saranno ripetuti altri tentativi.

longRetry può essere usato nelle situazioni in cui i dati dipendenti arrivano in orari non deterministici o l'ambiente complessivo in cui si verifica l'elaborazione dei dati è debole. In tali casi, l'esecuzione di tentativi consecutivi potrebbe non essere utile, mentre l'applicazione di un intervallo consente di ottenere il risultato voluto.

Attenzione: non impostare valori elevati per longRetry o longRetryInterval. In genere, valori più elevati comportano altri problemi sistemici.
longRetryInterval TimeSpan 00:00:00 Il ritardo tra tentativi longRetry

Esempio di una pipeline di copia

In questa pipeline di esempio è presente un'attività di tipo Copy in the attività . In questo esempio, l'attività di copia consente di copiare i dati da un'archiviazione BLOB di Azure a un database SQL di Azure.

{
  "name": "CopyPipeline",
  "properties": {
    "description": "Copy data from a blob to Azure SQL table",
    "activities": [
      {
        "name": "CopyFromBlobToSQL",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputDataset"
          }
        ],
        "outputs": [
          {
            "name": "OutputDataset"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "SqlSink",
            "writeBatchSize": 10000,
            "writeBatchTimeout": "60:00:00"
          }
        },
        "Policy": {
          "concurrency": 1,
          "executionPriorityOrder": "NewestFirst",
          "retry": 0,
          "timeout": "01:00:00"
        }
      }
    ],
    "start": "2016-07-12T00:00:00Z",
    "end": "2016-07-13T00:00:00Z"
  }
} 

Tenere presente quanto segue:

  • Nella sezione delle attività esiste una sola attività con l'oggetto type impostato su Copy.
  • L'input per l'attività è impostato su InputDataset e l'output è impostato su OutputDataset. Vedere l'articolo Set di dati per la definizione di set di dati in JSON.
  • Nella sezione typeProperties vengono specificati BlobSource come tipo di origine e SqlSink come tipo di sink. Nella sezione Attività di spostamento dati, scegliere l'archivio dati che si desidera usare come origine o un sink per avere altre informazioni sullo spostamento dei dati da e verso tale archivio dati.

Per la procedura completa di creazione di questa pipeline, vedere Esercitazione: copiare i dati dall'archivio BLOB al database SQL.

Esempio di una pipeline di trasformazione

In questa pipeline di esempio è presente un'attività di tipo HDInsightHive in the attività . In questo esempio, l' attività Hive di HDInsight trasforma i dati da un archivio BLOB di Azure tramite l'esecuzione di un file di script Hive in un cluster Hadoop di HDInsight.

{
    "name": "TransformPipeline",
    "properties": {
        "description": "My first Azure Data Factory pipeline",
        "activities": [
            {
                "type": "HDInsightHive",
                "typeProperties": {
                    "scriptPath": "adfgetstarted/script/partitionweblogs.hql",
                    "scriptLinkedService": "AzureStorageLinkedService",
                    "defines": {
                        "inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
                        "partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
                    }
                },
                "inputs": [
                    {
                        "name": "AzureBlobInput"
                    }
                ],
                "outputs": [
                    {
                        "name": "AzureBlobOutput"
                    }
                ],
                "policy": {
                    "concurrency": 1,
                    "retry": 3
                },
                "scheduler": {
                    "frequency": "Month",
                    "interval": 1
                },
                "name": "RunSampleHiveActivity",
                "linkedServiceName": "HDInsightOnDemandLinkedService"
            }
        ],
        "start": "2016-04-01T00:00:00Z",
        "end": "2016-04-02T00:00:00Z",
        "isPaused": false
    }
}

Tenere presente quanto segue:

  • Nella sezione attività esiste una sola attività con l'oggetto type impostato su HDInsightHive.
  • Il file di script Hive, partitionweblogs.hql, è archiviato nell'account di archiviazione di Azure (specificato da scriptLinkedService, denominato AzureStorageLinkedService) e nella cartella script nel contenitore adfgetstarted.
  • La sezione defines viene usata per specificare le impostazioni di runtime che vengono passate allo script Hive come valori di configurazione Hive, ad esempio ${hiveconf:inputtable} e ${hiveconf:partitionedtable}.

La sezione typeProperties è diversa per ogni attività di trasformazione. Per altre informazioni sulle proprietà del tipo supportate per un'attività di trasformazione, fare clic sull'attività di trasformazione nella tabella Data transformation activities (Attività di trasformazione dati).

Per la procedura completa della creazione di questa pipeline, vedere Esercitazione: creare la prima pipeline per elaborare i dati usando il cluster Hadoop.

Attività multiple in una pipeline

Le due pipeline di due esempio precedenti contengono una sola attività. È possibile avere più di un'attività in una pipeline.

Se si hanno più attività in una pipeline e l'output di un'attività non è l'input di un'altra attività, le attività possono essere eseguite in parallelo se le sezioni di dati di input per le attività sono pronte.

È possibile concatenare due attività usando il set di dati di output di un'attività come set di dati di input di altre attività. La seconda attività viene eseguita solo quando la prima viene completata correttamente.

Concatenamento di attività nella stessa pipeline

In questo esempio, la pipeline ha due attività: Activity1 e Activity2. Activity1 accetta Dataset1 come input e produce un output Dataset2. Activity2 accetta Dataset2 come input e produce un output Dataset3. Poiché l'output di Activity1 (Dataset2) è l'input di Activity2, Activity2 viene eseguita solo in seguito al corretto completamento dell'attività precedente e genera la sezione Dataset2. Se il completamento di Activity1 per qualche motivo non riesce e non produce la sezione Dataset2, Activity2 non viene eseguita per tale sezione, ad esempio dalle 09:00 alle 10:00.

È anche possibile concatenare le attività che si trovano in pipeline diverse.

Concatenamento di attività in due pipeline

In questo esempio, Pipeline1 ha una sola attività che accetta come input Dataset1 e genera come output Dataset2. Pipeline2 ha una sola attività che accetta come input Dataset2 e genera come output Dataset3.

Per altre informazioni, vedere Pianificazione ed esecuzione.

Creare e monitorare le pipeline

È possibile creare pipeline usando uno di questi strumenti o SDK.

  • Copia guidata.
  • Portale di Azure
  • Visual Studio
  • Azure PowerShell
  • Modello di Azure Resource Manager
  • API REST
  • API .NET

Vedere le esercitazioni seguenti per istruzioni dettagliate sulla creazione di pipeline tramite uno di questi strumenti o SDK.

Dopo aver creato o distribuito una pipeline, è possibile gestire e monitorare le pipeline usando i pannelli del portale di Azure o con un'app di gestione e monitoraggio. Vedere l'argomento successivo per le istruzioni dettagliate.

Pipeline monouso

È possibile creare e pianificare una pipeline da eseguire periodicamente, ad esempio ogni ora o ogni giorno, tra le ore di inizio e di fine specificate nella definizione della pipeline. Per informazioni dettagliate, vedere la sezione Pianificazione delle attività . È anche possibile creare una pipeline che viene eseguita una sola volta. A tale scopo, impostare la proprietà pipelineMode nella definizione della pipeline su onetime, come illustrato nell'esempio JSON seguente. Il valore predefinito per questa proprietà è scheduled.

{
    "name": "CopyPipeline",
    "properties": {
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "recursive": false
                    },
                    "sink": {
                        "type": "BlobSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "InputDataset"
                    }
                ],
                "outputs": [
                    {
                        "name": "OutputDataset"
                    }
                ]
                "name": "CopyActivity-0"
            }
        ]
        "pipelineMode": "OneTime"
    }
}

Tenere presente quanto segue:

  • Le ore di inizio e fine della pipeline non vengono specificate.
  • La disponibilità dei set di dati di input e output viene specificata (frequenza e intervallo) anche se i valori non vengono usati da Data Factory.
  • Le pipeline monouso non vengono visualizzate nella vista Diagramma. Questo comportamento dipende dalla progettazione.
  • Le pipeline monouso non possono essere aggiornate. È possibile clonare una pipeline monouso, rinominarla, aggiornarne le proprietà e distribuirla per crearne un'altra.

Passaggi successivi