Início Rápido: Criar um data factory e pipeline usando o PythonQuickstart: Create a data factory and pipeline using Python

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Neste início rápido, você criará um data factory usando Python.In this quickstart, you create a data factory by using Python. O pipeline nesse data factory copia dados de uma pasta para outra no Armazenamento de Blobs do Azure.The pipeline in this data factory copies data from one folder to another folder in Azure Blob storage.

O Azure Data Factory é um serviço de integração de dados baseado em nuvem que permite que você crie fluxos de trabalho controlados por dados para orquestrar e automatizar a movimentação e a transformação de dados.Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows for orchestrating and automating data movement and data transformation. Usando o Azure Data Factory, você pode criar e agendar fluxos de trabalho controlados por dados, chamados de pipelines.Using Azure Data Factory, you can create and schedule data-driven workflows, called pipelines.

Pipelines podem ingerir dados de armazenamentos de dados diferentes.Pipelines can ingest data from disparate data stores. Pipelines processam ou transformam dados usando serviços de computação, como o Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics e Azure Machine Learning.Pipelines process or transform data by using compute services such as Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, and Azure Machine Learning. Os pipelines publicam dados de saída em armazenamentos de dados como o Azure Synapse Analytics para aplicativos de BI (business intelligence).Pipelines publish output data to data stores such as Azure Synapse Analytics for business intelligence (BI) applications.

Pré-requisitosPrerequisites

Criar e carregar um arquivo de entradaCreate and upload an input file

  1. Inicie o Bloco de notas.Launch Notepad. Copie o texto a seguir e salve-o como um arquivo input.txt no disco.Copy the following text and save it as input.txt file on your disk.

    John|Doe
    Jane|Doe
    
  2. Use ferramentas como o Gerenciador de Armazenamento do Azure para criar o contêiner adfv2tutorial e a pasta input no contêiner.Use tools such as Azure Storage Explorer to create the adfv2tutorial container, and input folder in the container. Em seguida, carregue o arquivo input.txt na pasta input.Then, upload the input.txt file to the input folder.

Instalar o pacote do PythonInstall the Python package

  1. Abra um terminal ou prompt de comando com privilégios de administrador.Open a terminal or command prompt with administrator privileges.

  2. Primeiro, instale o pacote do Python para recursos de gerenciamento do Azure:First, install the Python package for Azure management resources:

    pip install azure-mgmt-resource
    
  3. Para instalar o pacote do Python para o Data Factory, execute o seguinte comando:To install the Python package for Data Factory, run the following command:

    pip install azure-mgmt-datafactory
    

    O SDK do Python para Data Factory dá suporte a Python 2.7, 3.3, 3.4, 3.5, 3.6 e 3.7.The Python SDK for Data Factory supports Python 2.7, 3.3, 3.4, 3.5, 3.6 and 3.7.

  4. Para instalar o pacote do Python para a autenticação da Identidade do Azure, execute o seguinte comando:To install the Python package for Azure Identity authentication, run the following command:

    pip install azure-identity
    

    Observação

    O pacote "azure-identity" pode ter conflitos com "azure-cli" em algumas dependências comuns.The "azure-identity" package might have conflicts with "azure-cli" on some common dependencies. Se você encontrar qualquer problema de autenticação, remova "azure-cli" e as dependências dela ou use um computador limpo sem instalar o pacote "azure-cli" para fazê-lo funcionar.If you meet any authentication issue, remove "azure-cli" and its dependencies, or use a clean machine without installing "azure-cli" package to make it work.

Criar um cliente data factoryCreate a data factory client

  1. Crie um arquivo chamado datafactory.py.Create a file named datafactory.py. Adicione as instruções a seguir para adicionar referências aos namespaces.Add the following statements to add references to namespaces.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Adicione as funções a seguir, que imprimem informações.Add the following functions that print information.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Adicione o código a seguir, que cria uma instância da classe DataFactoryManagementClient, ao método Main.Add the following code to the Main method that creates an instance of DataFactoryManagementClient class. Você usa esse objeto para criar o data factory, o serviço vinculado, os conjuntos de dados e o pipeline.You use this object to create the data factory, linked service, datasets, and pipeline. Você também pode usar esse objeto para monitorar os detalhes da execução de pipeline.You also use this object to monitor the pipeline run details. Defina a variável subscription_id para a ID da assinatura do Azure.Set subscription_id variable to the ID of your Azure subscription. Para obter uma lista de regiões do Azure no qual o Data Factory está disponível no momento, selecione as regiões que relevantes para você na página a seguir e, em seguida, expanda Análise para localizar Data Factory: Produtos disponíveis por região.For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. Os armazenamentos de dados (Armazenamento do Azure, Banco de Dados SQL do Azure, etc.) e serviços de computação (HDInsight, etc.) usados pelo data factory podem estar em outras regiões.The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

Criar uma data factoryCreate a data factory

Adicione o código a seguir, que cria um data factory, ao método Main.Add the following code to the Main method that creates a data factory. Se seu grupo de recursos já existir, comente a primeira instrução create_or_update.If your resource group already exists, comment out the first create_or_update statement.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Criar um serviço vinculadoCreate a linked service

Adicione o código a seguir, que cria um serviço vinculado do Armazenamento do Azure, ao método Main.Add the following code to the Main method that creates an Azure Storage linked service.

Os serviços vinculados são criados em um data factory para vincular seus armazenamentos de dados e serviços de computação ao data factory.You create linked services in a data factory to link your data stores and compute services to the data factory. Neste guia de início rápido, você só precisa criar um serviço vinculado do Armazenamento do Azure tanto como a origem da cópia quanto como o repositório de coletor, denominado "AzureStorageLinkedService" na amostra.In this quickstart, you only need create one Azure Storage linked service as both copy source and sink store, named "AzureStorageLinkedService" in the sample. Substitua <storageaccountname> e <storageaccountkey> pelo nome e chave da conta de Armazenamento do Azure.Replace <storageaccountname> and <storageaccountkey> with name and key of your Azure Storage account.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Criar conjuntos de dadosCreate datasets

Nesta seção, você criará dois conjuntos de dados: um para a origem e o outro para o coletor.In this section, you create two datasets: one for the source and the other for the sink.

Criar um conjunto de dados para o Blob do Azure de origemCreate a dataset for source Azure Blob

Adicione o código a seguir, que cria um Conjunto de Dados do Blob do Azure, ao método Main.Add the following code to the Main method that creates an Azure blob dataset. Para obter informações sobre as propriedades do conjunto de dados do Blob do Azure, confira o artigo sobre o conector do Blob do Azure.For information about properties of Azure Blob dataset, see Azure blob connector article.

Você define um conjunto de dados que representa os dados de origem no Blob do Azure.You define a dataset that represents the source data in Azure Blob. Esse conjunto de dados de Blob refere-se ao serviço vinculado do Armazenamento do Azure que você criou na etapa anterior.This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Criar um conjunto de dados para o Blob do Azure de coletorCreate a dataset for sink Azure Blob

Adicione o código a seguir, que cria um Conjunto de Dados do Blob do Azure, ao método Main.Add the following code to the Main method that creates an Azure blob dataset. Para obter informações sobre as propriedades do conjunto de dados do Blob do Azure, confira o artigo sobre o conector do Blob do Azure.For information about properties of Azure Blob dataset, see Azure blob connector article.

Você define um conjunto de dados que representa os dados de origem no Blob do Azure.You define a dataset that represents the source data in Azure Blob. Esse conjunto de dados de Blob refere-se ao serviço vinculado do Armazenamento do Azure que você criou na etapa anterior.This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Criar um pipelineCreate a pipeline

Adicione o código a seguir, que cria um pipeline com uma atividade de cópia, ao método Main.Add the following code to the Main method that creates a pipeline with a copy activity.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Criar uma execução de pipelineCreate a pipeline run

Adicione o código a seguir, que dispara uma execução de pipeline, ao método Main.Add the following code to the Main method that triggers a pipeline run.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Monitorar uma execução de pipelineMonitor a pipeline run

Para monitorar a execução de pipeline, adicione o código a seguir ao método Main:To monitor the pipeline run, add the following code the Main method:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Agora, adicione a seguinte instrução para invocar o método principal quando o programa é executado:Now, add the following statement to invoke the main method when the program is run:

# Start the main method
main()

Script completoFull script

Aqui está o código Python completo:Here is the full Python code:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

Executar o códigoRun the code

Compile e inicie o aplicativo, então verifique a execução do pipeline.Build and start the application, then verify the pipeline execution.

O console imprime o progresso de criação do data factory, do serviço vinculado, dos conjuntos de dados, do pipeline e da execução de pipeline.The console prints the progress of creating data factory, linked service, datasets, pipeline, and pipeline run. Aguarde até ver os detalhes de execução da atividade de cópia com o tamanho dos dados lidos/gravados.Wait until you see the copy activity run details with data read/written size. Em seguida, use ferramentas como o Gerenciador de Armazenamento do Azure para verificar se os blobs são copiados de "inputBlobPath" para "outputBlobPath" conforme você especificou nas variáveis.Then, use tools such as Azure Storage explorer to check the blob(s) is copied to "outputBlobPath" from "inputBlobPath" as you specified in variables.

Veja o exemplo de saída:Here is the sample output:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Limpar os recursosClean up resources

Para excluir o data factory, adicione o código a seguir ao programa:To delete the data factory, add the following code to the program:

adf_client.factories.delete(rg_name, df_name)

Próximas etapasNext steps

O pipeline nessa amostra copia dados de uma localização para outra em um Armazenamento de Blobs do Azure.The pipeline in this sample copies data from one location to another location in an Azure blob storage. Percorra os tutoriais para saber mais sobre o uso do Data Factory em mais cenários.Go through the tutorials to learn about using Data Factory in more scenarios.