Mulai Cepat: Buat pabrik data dan alur menggunakan Python

BERLAKU UNTUK: Azure Data Factory Azure Synapse Analytics

Dalam mulai cepat ini, Anda membuat pabrik data dengan menggunakan Python. Alur dalam pabrik data ini menyalin data dari satu folder ke folder lain dalam penyimpanan Azure Blob.

Azure Data Factory adalah layanan integrasi data berbasis cloud yang memungkinkan Anda membuat alur kerja berbasis data untuk mengatur dan mengotomatiskan pemindahan data dan transformasi data. Dengan menggunakan Azure Data Factory, Anda dapat membuat dan menjadwalkan alur kerja berbasis data, yang disebut alur.

Alur dapat menyerap data dari penyimpanan data yang berbeda. Alur memproses atau mengubah data dengan menggunakan layanan komputasi seperti Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics, dan Azure Machine Learning. Alur menerbitkan data output ke penyimpanan data seperti aplikasi Azure Synapse Analytics untuk kecerdasan bisnis (BI).

Prasyarat

Membuat dan mengunggah file input

  1. Luncurkan Notepad. Salin teks berikut dan simpan sebagai file input.txt di disk Anda.

    John|Doe
    Jane|Doe
    
  2. Gunakan alat seperti Azure Storage Explorer untuk membuat kontainer adfv2tutorial, dan folder input dalam kontainer. Lalu, unggah file input.txt ke folder input.

Menginstal paket Python

  1. Buka terminal atau perintah dengan hak istimewa admin.

  2. Pertama, instal paket Python untuk sumber daya pengelolaan Azure:

    pip install azure-mgmt-resource
    
  3. Untuk menginstal paket Python untuk Data Factory, jalankan perintah berikut:

    pip install azure-mgmt-datafactory
    

    Python SDK untuk Data Factory mendukung Python 2.7 dan 3.6+.

  4. Untuk menginstal paket Python untuk autentikasi Azure Identity, jalankan perintah berikut:

    pip install azure-identity
    

    Catatan

    Paket "azure-identity" mungkin memiliki konflik dengan "azure-cli" pada beberapa dependensi umum. Jika Anda memenuhi masalah autentikasi apa pun, hapus "azure-cli" dan dependensinya, atau gunakan mesin bersih tanpa menginstal paket "azure-cli" untuk membuatnya berfungsi. Untuk Sovereign cloud, Anda harus menggunakan konstanta khusus cloud yang sesuai. Harap lihat Menghubungkan ke semua wilayah menggunakan pustaka Azure untuk Python Multi-cloud | Microsoft Docs untuk mendapatkan petunjuk guna terhubung dengan Python di Sovereign cloud.

Membuat klien pabrik data

  1. Buat file bernama datafactory.py. Tambahkan pernyataan berikut untuk menambahkan referensi ke namespace.

    from azure.identity import ClientSecretCredential 
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. Tambahkan fungsi berikut yang mencetak informasi.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Tambahkan kode berikut ke metode Utama yang membuat instans kelas DataFactoryManagementClient. Anda menggunakan objek ini untuk membuat pabrik data, layanan tertaut, himpunan data, dan alur. Anda juga menggunakan obyek ini untuk memantau detail eksekusi alur. Atur variabel subscription_id ke ID langganan Azure Anda. Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Azure Storage, Azure SQL Database, dll.) dan komputasi (HDInsight, dll.) yang digunakan oleh pabrik data dapat berada di wilayah lain.

    def main():
    
        # Azure subscription ID
        subscription_id = '<subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = '<resource group>'
    
        # The data factory name. It must be globally unique.
        df_name = '<factory name>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') 
    
        # Specify following for Soverign Clouds, import right cloud constant and then use it to connect.
        # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD
        # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id)
    
        resource_client = ResourceManagementClient(credentials, subscription_id)
        adf_client = DataFactoryManagementClient(credentials, subscription_id)
    
        rg_params = {'location':'westus'}
        df_params = {'location':'westus'}
    

Buat pabrik data

Tambahkan kode berikut ke metode Utama yang membuat pabrik data. Jika grup sumber daya Anda sudah ada, komentari pernyataan create_or_update pertama.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

Membuat layanan tertaut

Tambahkan kode berikut ke metode Utama yang membuat layanan tertaut Azure Storage.

Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Dalam mulai cepat ini, Anda hanya perlu membuat satu layanan tertaut Azure Storage sebagai sumber salinan dan penyimpanan sink, bernama "AzureStorageLinkedService" dalam sampel. Ganti <storageaccountname> dan <storageaccountkey> dengan nama dan kunci akun Azure Storage Anda.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

Membuat himpunan data

Di bagian ini, Anda membuat dua himpunan data: satu untuk sumber dan yang lain untuk sink.

Membuat himpunan data untuk Azure Blob sumber

Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)) 
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

Membuat himpunan data untuk Azure Blob sink

Tambahkan kode berikut ke metode Utama yang membuat himpunan data blob Azure. Untuk mengetahui informasi tentang properti himpunan data Azure Blob, lihat artikel Konektor blob Azure.

Anda menentukan himpunan data yang mewakili data sumber di Azure Blob. Himpunan data Blob ini mengacu pada layanan tertaut Azure Storage yang Anda buat di langkah sebelumnya.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

Membuat alur

Tambahkan kode berikut ke metode Utama yang membuat alur dengan aktivitas salin.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    
    #Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
    #Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
    
    p_name = 'copyPipeline'
    params_for_pipeline = {}

    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

Membuat eksekusi alur

Tambahkan kode berikut ke metode Utama yang memicu eksekusi alur.

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

Memantau eksekusi alur

Untuk memantau eksekusi alur, tambahkan kode berikut ke metode Utama:

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

Sekarang, tambahkan pernyataan berikut untuk memanggil metode Utama saat program dijalankan:

# Start the main method
main()

Skrip lengkap

Berikut adalah kode Python lengkap:

from azure.identity import ClientSecretCredential 
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<resource group>'

    # The data factory name. It must be globally unique.
    df_name = '<factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>') 
    resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)

    rg_params = {'location':'westus'}
    df_params = {'location':'westus'}
 
    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='westus')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService001'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')

    ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string)) 
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = '<container>/<folder path>'
    blob_filename = '<file name>'
    ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = '<container>/<folder path>'
    dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

Menjalankan kode

Bangun dan mulai aplikasi, lalu verifikasi eksekusi alur.

Konsol mencetak kemajuan pembuatan pabrik data, layanan tertaut, himpunan data, alur, dan eksekusi alur. Tunggu hingga Anda melihat detail eksekusi aktivitas salin dengan data berukuran dibaca/ditulis. Kemudian, gunakan alat seperti Penjelajah Azure Storage untuk memeriksa blob disalin ke "outputBlobPath" dari "inputBlobPath" seperti yang Anda tentukan dalam variabel.

Berikut sampel outputnya:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

Membersihkan sumber daya

Untuk menghapus pabrik data, tambahkan kode berikut ke program:

adf_client.factories.delete(rg_name, df_name)

Langkah berikutnya

Alur dalam sampel ini menyalin data dari satu lokasi ke lokasi lain dalam penyimpanan blob Azure. Ikuti tutorial untuk mempelajari tentang penggunaan Data Factory dalam skenario lainnya.