クイック スタート:Python を使用してデータ ファクトリとパイプラインを作成するQuickstart: Create a data factory and pipeline using Python

Azure Data Factory は、データドリブン型のワークフローをクラウドに作成することでデータの移動と変換を制御し、自動化することができるクラウドベースのデータ統合サービスです。Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows in the cloud for orchestrating and automating data movement and data transformation. Azure Data Factory を使えば、データ主導型のワークフロー (パイプライン) を作成し、スケジューリングできます。具体的には、各種データ ストアからデータを取り込む、そのデータを各種コンピューティング サービス (Azure HDInsight Hadoop、Spark、Azure Data Lake Analytics、Azure Machine Learning など) で処理/変換する、データ ストア (Azure SQL Data Warehouse など) に出力データを公開して、それを利用するビジネス インテリジェンス (BI) アプリケーションに提供するという一連の処理を行えるワークフローです。Using Azure Data Factory, you can create and schedule data-driven workflows (called pipelines) that can ingest data from disparate data stores, process/transform the data by using compute services such as Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, and Azure Machine Learning, and publish output data to data stores such as Azure SQL Data Warehouse for business intelligence (BI) applications to consume.

このクイックスタートでは、Python を使用して Azure データ ファクトリを作成する方法について説明します。This quickstart describes how to use Python to create an Azure data factory. このデータ ファクトリのパイプラインは、データを Azure BLOB ストレージ内の 1 つのフォルダーから別のフォルダーにコピーします。The pipeline in this data factory copies data from one folder to another folder in an Azure blob storage.

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。If you don't have an Azure subscription, create a free account before you begin.

前提条件Prerequisites

  • Azure Storage アカウントAzure Storage account. BLOB ストレージを、ソースシンクのデータ ストアとして使用します。You use the blob storage as source and sink data store. Azure ストレージ アカウントがない場合、ストレージ アカウントの作成手順については、「ストレージ アカウントの作成」を参照してください。If you don't have an Azure storage account, see the Create a storage account article for steps to create one.
  • この手順に従って、Azure Active Directory にアプリケーションを作成しますCreate an application in Azure Active Directory following this instruction. アプリケーション ID認証キーテナント ID の値をメモしておいてください。後の手順で使用します。Make note of the following values that you use in later steps: application ID, authentication key, and tenant ID. 同じ記事の手順に従って、アプリケーションを "共同作成者" ロールに割り当てます。Assign application to "Contributor" role by following instructions in the same article.

入力ファイルを作成およびアップロードするCreate and upload an input file

  1. メモ帳を起動します。Launch Notepad. 次のテキストをコピーし、input.txt ファイルとしてディスクに保存します。Copy the following text and save it as input.txt file on your disk.

    John|Doe
    Jane|Doe
    
  2. Azure Storage Explorer などのツールを使用して adfv2tutorial コンテナーを作成し、このコンテナーに input フォルダーを作成します。Use tools such as Azure Storage Explorer to create the adfv2tutorial container, and input folder in the container. 次に、input フォルダーに input.txt ファイルをアップロードします。Then, upload the input.txt file to the input folder.

Python パッケージをインストールするInstall the Python package

  1. 管理者特権でターミナルまたはコマンド プロンプトを開きます。Open a terminal or command prompt with administrator privileges. 

  2. まず、Azure 管理リソースの Python パッケージをインストールします。First, install the Python package for Azure management resources:

    pip install azure-mgmt-resource
    
  3. Data Factory 用の Python パッケージをインストールするには、次のコマンドを実行します。To install the Python package for Data Factory, run the following command:

    pip install azure-mgmt-datafactory
    

    Data Factory 用の Python SDK では、Python 2.7、3.3、3.4、3.5、3.6、および 3.7 がサポートされています。The Python SDK for Data Factory supports Python 2.7, 3.3, 3.4, 3.5, 3.6 and 3.7.

データ ファクトリ クライアントを作成するCreate a data factory client

  1. datafactory.py という名前のファイルを作成します。Create a file named datafactory.py. 次のステートメントを追加して、名前空間への参照を追加します。Add the following statements to add references to namespaces.

    from azure.common.credentials import ServicePrincipalCredentials
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. 情報を出力する以下の関数を追加します。Add the following functions that print information.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. DataFactoryManagementClient クラスのインスタンスを作成する次のコードを Main メソッドに追加します。Add the following code to the Main method that creates an instance of DataFactoryManagementClient class. このオブジェクトを使用して、データ ファクトリ、リンクされたサービス、データセット、パイプラインを作成します。You use this object to create the data factory, linked service, datasets, and pipeline. また、このオブジェクトを使用して、パイプラインの実行の詳細を監視します。You also use this object to monitor the pipeline run details. subscription_id 変数を、ご使用の Azure サブスクリプションの ID に設定します。Set subscription_id variable to the ID of your Azure subscription. 現在 Data Factory が利用できる Azure リージョンの一覧については、次のページで目的のリージョンを選択し、 [分析] を展開して [Data Factory] を探してください。(「リージョン別の利用可能な製品」)。For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    def main():
    
        # Azure subscription ID
        subscription_id = '<Specify your Azure Subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = 'ADFTutorialResourceGroup'
    
        # The data factory name. It must be globally unique.
        df_name = '<Specify a name for the data factory. It must be globally unique>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ServicePrincipalCredentials(client_id='<Active Directory application/client ID>', secret='<client secret>', tenant='<Active Directory tenant ID>')
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'eastus'}
        df_params = {'location':'eastus'}
    

Data Factory の作成Create a data factory

データ ファクトリを作成する次のコードを Main メソッドに追加します。Add the following code to the Main method that creates a data factory. リソース グループが既に存在する場合は、最初の create_or_update ステートメントをコメント アウトします。If your resource group already exists, comment out the first create_or_update statement.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='eastus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

リンクされたサービスを作成するCreate a linked service

Azure Storage のリンクされたサービスを作成する次のコードを Main メソッドに追加します。Add the following code to the Main method that creates an Azure Storage linked service.

データ ストアおよびコンピューティング サービスをデータ ファクトリにリンクするには、リンクされたサービスをデータ ファクトリに作成します。You create linked services in a data factory to link your data stores and compute services to the data factory. このクイックスタートでは、コピー ソースとシンク ストアの両方として、Azure Storage のリンクされたサービスを 1 つ作成するだけで済みます。このサービスは、サンプルでは "AzureStorageLinkedService" という名前です。In this quickstart, you only need create one Azure Storage linked service as both copy source and sink store, named "AzureStorageLinkedService" in the sample. <storageaccountname><storageaccountkey> を、Azure ストレージ アカウントの名前とキーで置き換えます。Replace <storageaccountname> and <storageaccountkey> with name and key of your Azure Storage account.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString('DefaultEndpointsProtocol=https;AccountName=<storageaccountname>;AccountKey=<storageaccountkey>')

    ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

データセットを作成するCreate datasets

このセクションでは、ソース用とシンク用の 2 つのデータセットを作成します。In this section, you create two datasets: one for the source and the other for the sink.

ソース Azure BLOB のためのデータセットを作成するCreate a dataset for source Azure Blob

Azure BLOB データセットを作成する次のコードを Main メソッドに追加します。Add the following code to the Main method that creates an Azure blob dataset. Azure BLOB データセットのプロパティの詳細については、Azure BLOB コネクタに関する記事を参照してください。For information about properties of Azure Blob dataset, see Azure blob connector article.

Azure BLOB 内のソース データを表すデータセットを定義します。You define a dataset that represents the source data in Azure Blob. この BLOB データセットは、前の手順で作成した Azure Storage のリンクされたサービスを参照します。This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path= 'adfv2tutorial/input'
    blob_filename = 'input.txt'
    ds_azure_blob= AzureBlobDataset(linked_service_name=ds_ls, folder_path=blob_path, file_name = blob_filename)
    ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

シンク Azure BLOB のためのデータセットを作成するCreate a dataset for sink Azure Blob

Azure BLOB データセットを作成する次のコードを Main メソッドに追加します。Add the following code to the Main method that creates an Azure blob dataset. Azure BLOB データセットのプロパティの詳細については、Azure BLOB コネクタに関する記事を参照してください。For information about properties of Azure Blob dataset, see Azure blob connector article.

Azure BLOB 内のソース データを表すデータセットを定義します。You define a dataset that represents the source data in Azure Blob. この BLOB データセットは、前の手順で作成した Azure Storage のリンクされたサービスを参照します。This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adfv2tutorial/output'
    dsOut_azure_blob = AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)
    dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

パイプラインを作成する。Create a pipeline

コピー アクティビティが含まれているパイプラインを作成する次のコードを Main メソッドに追加します。Add the following code to the Main method that creates a pipeline with a copy activity.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

パイプラインの実行を作成するCreate a pipeline run

パイプラインの実行をトリガーする次のコードを Main メソッドに追加します。Add the following code to the Main method that triggers a pipeline run.

    #Create a pipeline run.
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
        {
        }
    )

パイプラインの実行を監視するMonitor a pipeline run

パイプラインの実行を監視するには、次のコードを Main メソッドに追加します。To monitor the pipeline run, add the following code the Main method:

    #Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1),  datetime.now() + timedelta(1)))
    print_activity_run_details(activity_runs_paged[0])

プログラムの実行時に main メソッドを呼び出す次のステートメントを追加します。Now, add the following statement to invoke the main method when the program is run:

# Start the main method
main()

完全なスクリプトFull script

完全な Python コードを次に示します。Here is the full Python code:

from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time


def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)
    print("\n")


def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n")


def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(
            activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(
            activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(
            activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<your Azure subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<Azure resource group name>'

    # The data factory name. It must be globally unique.
    df_name = '<Your data factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ServicePrincipalCredentials(
        client_id='<Active Directory client ID>', secret='<client secret>', tenant='<tenant ID>')
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location': 'eastus'}
    df_params = {'location': 'eastus'}

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='eastus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService'

    # Specify the name and key of your Azure Storage account
    storage_string = SecureString(
        'DefaultEndpointsProtocol=https;AccountName=<storage account name>;AccountKey=<storage account key>')

    ls_azure_storage = AzureStorageLinkedService(
        connection_string=storage_string)
    ls = adf_client.linked_services.create_or_update(
        rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(ls_name)
    blob_path = 'adfv2tutorial/input'
    blob_filename = 'input.txt'
    ds_azure_blob = AzureBlobDataset(
        ds_ls, folder_path=blob_path, file_name=blob_filename)
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adfv2tutorial/output'
    dsOut_azure_blob = AzureBlobDataset(ds_ls, folder_path=output_blobpath)
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(ds_name)
    dsOut_ref = DatasetReference(dsOut_name)
    copy_activity = CopyActivity(act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name,
                                                   {
                                                   }
                                                   )

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    activity_runs_paged = list(adf_client.activity_runs.list_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, datetime.now() - timedelta(1), datetime.now() + timedelta(1)))
    print_activity_run_details(activity_runs_paged[0])


# Start the main method
main()

コードの実行Run the code

アプリケーションをビルドして起動し、パイプラインの実行を確認します。Build and start the application, then verify the pipeline execution.

コンソールは、データ ファクトリ、リンクされたサービス、データセット、パイプライン、およびパイプラインの実行の作成の進捗状況を表示します。The console prints the progress of creating data factory, linked service, datasets, pipeline, and pipeline run. コピー アクティビティの実行の詳細と、データの読み取り/書き込みのサイズが表示されるまで待ちます。Wait until you see the copy activity run details with data read/written size. 次に、Azure Storage Explorer などのツールを使用して、変数で指定したように BLOB が "inputBlobPath" から "outputBlobPath" にコピーされていることを確認します。Then, use tools such as Azure Storage explorer to check the blob(s) is copied to "outputBlobPath" from "inputBlobPath" as you specified in variables.

出力例を次に示します。Here is the sample output:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

リソースのクリーンアップClean up resources

データ ファクトリを削除するには、プログラムに次のコードを追加します。To delete the data factory, add the following code to the program:

adf_client.factories.delete(rg_name, df_name)

次の手順Next steps

このサンプルのパイプラインは、Azure BLOB ストレージ内のある場所から別の場所にデータをコピーするものです。The pipeline in this sample copies data from one location to another location in an Azure blob storage. より多くのシナリオで Data Factory を使用する方法については、チュートリアルを参照してください。Go through the tutorials to learn about using Data Factory in more scenarios.