使用 Python 來管理 Azure Data Lake AnalyticsManage Azure Data Lake Analytics using Python

本文說明如何使用 Python 來管理 Azure Data Lake Analytics 帳戶、資料來源、使用者和作業。This article describes how to manage Azure Data Lake Analytics accounts, data sources, users, and jobs by using Python.

支援的 Python 版本Supported Python versions

  • 使用 64 位元版 Python。Use a 64-bit version of Python.
  • 您可以使用在 Python.org 下載 (英文) 找到的標準 Python 散發套件。You can use the standard Python distribution found at Python.org downloads.
  • 許多開發人員發現使用 Anaconda Python 散發套件 (英文) 相當便利。Many developers find it convenient to use the Anaconda Python distribution.
  • 本文是使用來自標準 Python 散發套件的 Python 3.6 版來撰寫的This article was written using Python version 3.6 from the standard Python distribution

安裝 Azure Python SDKInstall Azure Python SDK

請安裝下列模組:Install the following modules:

  • azure-mgmt-resource 模組包含適用於 Active Directory 等等的其他 Azure 模組。The azure-mgmt-resource module includes other Azure modules for Active Directory, etc.
  • azure-datalake-store 模組包含 Azure Data Lake Store 檔案系統作業。The azure-datalake-store module includes the Azure Data Lake Store filesystem operations.
  • azure-mgmt-datalake-store 模組包含 Azure Data Lake Store 帳戶管理作業。The azure-mgmt-datalake-store module includes the Azure Data Lake Store account management operations.
  • azure-mgmt-datalake-analytics 模組包含 Azure Data Lake Analytics 作業。The azure-mgmt-datalake-analytics module includes the Azure Data Lake Analytics operations.

請先執行下列命令,以確保您擁有最新的 pipFirst, ensure you have the latest pip by running the following command:

python -m pip install --upgrade pip

本文件是使用 pip version 9.0.1 撰寫的。This document was written using pip version 9.0.1.

使用下列 pip 命令以從命令列安裝新模組:Use the following pip commands to install the modules from the commandline:

pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

建立新的 Python 指令碼Create a new Python script

將下列程式碼貼到指令碼中:Paste the following code into the script:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

請執行此指令碼以確認可將模組匯入。Run this script to verify that the modules can be imported.

驗證Authentication

使用快顯視窗進行互動式使用者驗證Interactive user authentication with a pop-up

不支援此方法。This method is not supported.

使用裝置代碼進行互動式使用者驗證Interactive user authentication with a device code

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

使用 SPI 和祕密進行非互動式驗證Noninteractive authentication with SPI and a secret

credentials = ServicePrincipalCredentials(
    client_id='FILL-IN-HERE', secret='FILL-IN-HERE', tenant='FILL-IN-HERE')

使用 API 和憑證進行非互動式驗證Noninteractive authentication with API and a certificate

不支援此方法。This method is not supported.

通用指令碼變數Common script variables

以下是範例中使用的變數。These variables are used in the samples.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

建立用戶端Create the clients

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

建立 Azure 資源群組Create an Azure Resource Group

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

建立 Data Lake Analytics 帳戶Create Data Lake Analytics account

首先,建立一個存放區帳戶。First create a store account.

adlsAcctResult = adlsAcctClient.account.create(
    rg,
    adls,
    DataLakeStoreAccount(
        location=location)
    )
).wait()

接著,建立一個使用該存放區的 ADLA 帳戶。Then create an ADLA account that uses that store.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

提交作業Submit a job

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

等候工作結束Wait for a job to end

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

列出管線和週期List pipelines and recurrences

視您作業是否有附加的管線或週期中繼資料而定,您可以列出管線和週期。Depending whether your jobs have pipeline or recurrence metadata attached, you can list pipelines and recurrences.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

管理計算原則Manage compute policies

DataLakeAnalyticsAccountManagementClient 物件會提供方法,用以管理 Data Lake Analytics 帳戶的計算原則。The DataLakeAnalyticsAccountManagementClient object provides methods for managing the compute policies for a Data Lake Analytics account.

列出計算原則List compute policies

下列程式碼會擷取 Data Lake Analytics 帳戶的計算原則清單。The following code retrieves a list of compute policies for a Data Lake Analytics account.

policies = adlaAccountClient.computePolicies.listByAccount(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.objectType + 'Max AUs / job: ' +
          p.maxDegreeOfParallelismPerJob + 'Min priority / job: ' + p.minPriorityPerJob)

建立新的計算原則Create a new compute policy

下列程式碼會為 Data Lake Analytics 帳戶建立新的計算原則,其中是將指定使用者可用的 AU 上限設定為 50,而將作業最低優先順序設定為 250。The following code creates a new compute policy for a Data Lake Analytics account, setting the maximum AUs available to the specified user to 50, and the minimum job priority to 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = ComputePolicyCreateOrUpdateParameters(
    userAadObjectId, "User", 50, 250)
adlaAccountClient.computePolicies.createOrUpdate(
    rg, adla, "GaryMcDaniel", newPolicyParams)

後續步驟Next steps