Guida introduttiva: Creare una data factory e una pipeline con PythonQuickstart: Create a data factory and pipeline using Python

Azure Data Factory è un servizio di integrazione di dati basato sul cloud che consente di creare flussi di lavoro basati sui dati nel cloud per orchestrare e automatizzare lo spostamento e la trasformazione dei dati stessi.Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows in the cloud for orchestrating and automating data movement and data transformation. Usando Azure Data Factory è possibile creare e pianificare flussi di lavoro (denominati pipeline) basati sui dati che possono inserire dati da archivi diversi, elaborarli e trasformarli tramite servizi di calcolo come Hadoop di Azure HDInsight, Spark, Azure Data Lake Analytics e Azure Machine Learning e pubblicare l'output in archivi come Azure SQL Data Warehouse per l'uso da parte di applicazioni di business intelligence (BI).Using Azure Data Factory, you can create and schedule data-driven workflows (called pipelines) that can ingest data from disparate data stores, process/transform the data by using compute services such as Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, and Azure Machine Learning, and publish output data to data stores such as Azure SQL Data Warehouse for business intelligence (BI) applications to consume.

Questa guida introduttiva descrive come usare Python per creare una data factory di Azure.This quickstart describes how to use Python to create an Azure data factory. La pipeline in questa data factory copia dati da una cartella a un'altra cartella in un archivio BLOB di Azure.The pipeline in this data factory copies data from one folder to another folder in an Azure blob storage.

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.If you don't have an Azure subscription, create a free account before you begin.

PrerequisitiPrerequisites

  • Account di archiviazione di Azure.Azure Storage account. Usare l'archivio BLOB come archivio dati di origine e sink.You use the blob storage as source and sink data store. Se non si ha un account di archiviazione di Azure, vedere l'articolo Creare un account di archiviazione per informazioni su come crearne uno.If you don't have an Azure storage account, see the Create a storage account article for steps to create one.
  • Creare un'applicazione in Azure Active Directory seguendo queste istruzioni.Create an application in Azure Active Directory following this instruction. Annotare i valori seguenti, da usare nei passaggi successivi: ID applicazione, chiave di autenticazione e ID tenant.Make note of the following values that you use in later steps: application ID, authentication key, and tenant ID. Assegnare l'applicazione al ruolo "Collaboratore" seguendo le istruzioni disponibili nello stesso articolo.Assign application to "Contributor" role by following instructions in the same article.

Creare e caricare un file di inputCreate and upload an input file

  1. Avviare il Blocco note.Launch Notepad. Copiare il testo seguente e salvarlo come file input.txt sul disco.Copy the following text and save it as input.txt file on your disk.

    John|Doe
    Jane|Doe
    
  2. Usare strumenti come Azure Storage Explorer per creare il contenitore adfv2tutorial e la cartella input nel contenitore.Use tools such as Azure Storage Explorer to create the adfv2tutorial container, and input folder in the container. Caricare quindi il file input.txt nella cartella input.Then, upload the input.txt file to the input folder.

Installare il pacchetto PythonInstall the Python package

  1. Aprire un terminale o un prompt dei comandi con privilegi di amministratore.Open a terminal or command prompt with administrator privileges. 

  2. Per prima cosa, installare il pacchetto Python per le risorse di gestione di Azure:First, install the Python package for Azure management resources:

    pip install azure-mgmt-resource
    
  3. Per installare il pacchetto Python per Data Factory, eseguire questo comando:To install the Python package for Data Factory, run the following command:

    pip install azure-mgmt-datafactory
    

    Python SDK per Data Factory supporta Python 2.7, 3.3, 3.4, 3.5, 3.6 e 3.7.The Python SDK for Data Factory supports Python 2.7, 3.3, 3.4, 3.5, 3.6 and 3.7.

Creare un client di data factoryCreate a data factory client

  1. Creare un file denominato datafactory.py.Create a file named datafactory.py. Aggiungere le istruzioni seguenti per aggiungere riferimenti a spazi dei nomi.Add the following statements to add references to namespaces.

    from azure.common.credentials import ServicePrincipalCredentials
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Aggiungere le funzioni seguenti, che consentono di stampare informazioni.Add the following functions that print information.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Aggiungere il codice seguente al metodo Main per creare un'istanza della classe DataFactoryManagementClient.Add the following code to the Main method that creates an instance of DataFactoryManagementClient class. Usare questo oggetto per creare la data factory, il servizio collegato, i set di dati e la pipeline.You use this object to create the data factory, linked service, datasets, and pipeline. È possibile usare questo oggetto anche per monitorare i dettagli sull'esecuzione della pipeline.You also use this object to monitor the pipeline run details. Impostare la variabile subscription_id sull'ID della sottoscrizione di Azure.Set subscription_id variable to the ID of your Azure subscription. Per un elenco di aree di Azure in cui Data Factory è attualmente disponibile, selezionare le aree di interesse nella pagina seguente, quindi espandere Analytics per individuare Data Factory: Prodotti disponibili in base all'area.For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. Gli archivi dati (Archiviazione di Azure, database SQL di Azure e così via) e le risorse di calcolo (HDInsight e così via) usati dalla data factory possono trovarsi in altre aree.The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    def main():
    
        # Azure subscription ID
        subscription_id = '<Specify your Azure Subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = 'ADFTutorialResourceGroup'
    
        # The data factory name. It must be globally unique.
        df_name = '<Specify a name for the data factory. It must be globally unique>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ServicePrincipalCredentials(client_id='<Active Directory application/client ID>', secret='<client secret>', tenant='<Active Directory tenant ID>')
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'eastus'}
        df_params = {'location':'eastus'}
    

Creare una data factoryCreate a data factory

Aggiungere il codice seguente al metodo Main per creare una data factory.Add the following code to the Main method that creates a data factory. Se il gruppo di risorse esiste già, impostare come commento la prima istruzione create_or_update.If your resource group already exists, comment out the first create_or_update statement.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='eastus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Creare un servizio collegatoCreate a linked service

Aggiungere il codice seguente al metodo Main per creare un servizio collegato di Archiviazione di Azure.Add the following code to the Main method that creates an Azure Storage linked service.

Si creano servizi collegati in una data factory per collegare gli archivi dati e i servizi di calcolo alla data factory.You create linked services in a data factory to link your data stores and compute services to the data factory. In questa guida introduttiva è necessario creare solo un servizio collegato di Archiviazione di Azure come archivio di origine e sink della copia, denominato "AzureStorageLinkedService" nell'esempio.In this quickstart, you only need create one Azure Storage linked service as both copy source and sink store, named "AzureStorageLinkedService" in the sample. Sostituire <storageaccountname> e <storageaccountkey> con il nome e la chiave dell'account di archiviazione di Azure.Replace <storageaccountname> and <storageaccountkey> with name and key of your Azure Storage account.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<storageaccountname>;AccountKey=<storageaccountkey>')

    ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Creare set di datiCreate datasets

In questa sezione vengono creati due set di dati: uno per l'origine e l'altro per il sink.In this section, you create two datasets: one for the source and the other for the sink.

Creare un set di dati per il BLOB di Azure di origineCreate a dataset for source Azure Blob

Aggiungere il codice seguente al metodo Main per creare un set di dati BLOB di Azure.Add the following code to the Main method that creates an Azure blob dataset. Per informazioni sulle proprietà del set di dati BLOB di Azure, vedere l'articolo Azure blob connector (Connettore BLOB di Azure).For information about properties of Azure Blob dataset, see Azure blob connector article.

Definire un set di dati che rappresenta i dati di origine nel BLOB di Azure.You define a dataset that represents the source data in Azure Blob. Questo set di dati BLOB fa riferimento al servizio collegato di Archiviazione di Azure creato nel passaggio precedente.This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path= 'adfv2tutorial/input'
    blob_filename = 'input.txt'
    ds_azure_blob= AzureBlobDataset(linked_service_name=ds_ls, folder_path=blob_path, file_name = blob_filename)
    ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Creare un set di dati per il BLOB di Azure sinkCreate a dataset for sink Azure Blob

Aggiungere il codice seguente al metodo Main per creare un set di dati BLOB di Azure.Add the following code to the Main method that creates an Azure blob dataset. Per informazioni sulle proprietà del set di dati BLOB di Azure, vedere l'articolo Azure blob connector (Connettore BLOB di Azure).For information about properties of Azure Blob dataset, see Azure blob connector article.

Definire un set di dati che rappresenta i dati di origine nel BLOB di Azure.You define a dataset that represents the source data in Azure Blob. Questo set di dati BLOB fa riferimento al servizio collegato di Archiviazione di Azure creato nel passaggio precedente.This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adfv2tutorial/output'
    dsOut_azure_blob = AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)
    dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Creare una pipelineCreate a pipeline

Aggiungere il codice seguente al metodo Main per creare una pipeline con un'attività di copia.Add the following code to the Main method that creates a pipeline with a copy activity.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Creare un'esecuzione della pipelineCreate a pipeline run

Aggiungere il codice seguente al metodo Main per attivare un'esecuzione della pipeline.Add the following code to the Main method that triggers a pipeline run.

    #Create a pipeline run.
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Monitorare un'esecuzione della pipelineMonitor a pipeline run

Per monitorare l'esecuzione della pipeline, aggiungere il codice seguente al metodo Main:To monitor the pipeline run, add the following code the Main method:

    #Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Aggiungere quindi l'istruzione seguente per richiamare il metodo main quando viene eseguito il programma:Now, add the following statement to invoke the main method when the program is run:

# Start the main method
main()

Script completoFull script

Ecco il codice Python completo:Here is the full Python code:

from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time


def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)
    print("\n")


def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n")


def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(
            activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(
            activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(
            activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<your Azure subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<Azure resource group name>'

    # The data factory name. It must be globally unique.
    df_name = '<Your data factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ServicePrincipalCredentials(
        client_id='<Active Directory client ID>', secret='<client secret>', tenant='<tenant ID>')
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location': 'eastus'}
    df_params = {'location': 'eastus'}

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='eastus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService'

    # Specify the name and key of your Azure Storage account
    storage_string = SecureString(
        value='DefaultEndpointsProtocol=https;AccountName=<storage account name>;AccountKey=<storage account key>')

    ls_azure_storage = AzureStorageLinkedService(
        connection_string=storage_string)
    ls = adf_client.linked_services.create_or_update(
        rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(ls_name)
    blob_path = 'adfv2tutorial/input'
    blob_filename = 'input.txt'
    ds_azure_blob = AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adfv2tutorial/output'
    dsOut_azure_blob = AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(ds_name)
    dsOut_ref = DatasetReference(dsOut_name)
    copy_activity = CopyActivity(act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

Eseguire il codiceRun the code

Compilare e avviare l'applicazione, quindi verificare l'esecuzione della pipeline.Build and start the application, then verify the pipeline execution.

La console stampa lo stato di creazione della data factory, del servizio collegato, dei set di dati, della pipeline e dell'esecuzione della pipeline.The console prints the progress of creating data factory, linked service, datasets, pipeline, and pipeline run. Attendere fino a quando non vengono visualizzati i dettagli sull'esecuzione dell'attività di copia con le dimensioni dei dati letti/scritti.Wait until you see the copy activity run details with data read/written size. Usare quindi strumenti come Azure Storage Explorer per verificare che i BLOB siano stati copiati da "inputBlobPath" a "outputBlobPath", come specificato nelle variabili.Then, use tools such as Azure Storage explorer to check the blob(s) is copied to "outputBlobPath" from "inputBlobPath" as you specified in variables.

Di seguito è riportato l'output di esempio:Here is the sample output:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Pulire le risorseClean up resources

Per eliminare la data factory, aggiungere il codice seguente al programma:To delete the data factory, add the following code to the program:

adf_client.factories.delete(rg_name, df_name)

Passaggi successiviNext steps

La pipeline in questo esempio copia i dati da una posizione a un'altra in un archivio BLOB di Azure.The pipeline in this sample copies data from one location to another location in an Azure blob storage. Per informazioni sull'uso di Data Factory in più scenari, fare riferimento alle esercitazioni.Go through the tutorials to learn about using Data Factory in more scenarios.