إدارة تحليلات بحيرة بيانات Azure باستخدام Python

توضح هذه المقالة كيفية إدارة حسابات Azure Data Lake Analytics ومصادر البيانات والمستخدمين والمهام باستخدام Python.

إصدارات بايثون المدعومة

  • استخدم إصدار 64 بت من بايثون.
  • يمكنك استخدام توزيع Python القياسي الموجود في Python.org التنزيلات.
  • يجد العديد من المطورين أنه من المناسب استخدام توزيع Anaconda Python.
  • تمت كتابة هذه المقالة باستخدام إصدار Python 3.6 من توزيع Python القياسي

Install Azure Python SDK

قم بتثبيت الوحدات التالية:

  • تتضمن الوحدة النمطية azure-mgmt-resource وحدات Azure النمطية الأخرى ل Active Directory، وما إلى ذلك.
  • تتضمن الوحدة النمطية azure-datalake-store عمليات نظام ملفات Azure Data Lake Store.
  • تتضمن الوحدة النمطية azure-mgmt-datalake-store عمليات إدارة حساب Azure Data Lake Store.
  • تتضمن الوحدة النمطية azure-mgmt-datalake-analytics عمليات Azure Data Lake Analytics.

أولا، تأكد من أن لديك الأحدث pip عن طريق تشغيل الأمر التالي:

python -m pip install --upgrade pip

تمت كتابة هذه الوثيقة باستخدام pip version 9.0.1.

استخدم الأوامر التالية pip لتثبيت الوحدات النمطية من سطر الأوامر:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

إنشاء برنامج نصي جديد ل Python

لصق التعليمة البرمجية التالية في البرنامج النصي:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

قم بتشغيل هذا البرنامج النصي للتحقق من إمكانية استيراد الوحدات النمطية.

المصادقة

مصادقة المستخدم التفاعلية مع نافذة منبثقة

هذه الطريقة غير مدعومة.

مصادقة المستخدم التفاعلية باستخدام رمز الجهاز

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

مصادقة غير تفاعلية مع SPI وسر

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

مصادقة غير تفاعلية باستخدام واجهة برمجة التطبيقات وشهادة

هذه الطريقة غير مدعومة.

متغيرات البرنامج النصي الشائعة

يتم استخدام هذه المتغيرات في العينات.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

إنشاء العملاء

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

إنشاء مجموعة موارد Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

إنشاء حساب تحليلات بحيرة البيانات

قم أولا بإنشاء حساب متجر.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

ثم قم بإنشاء حساب ADLA يستخدم هذا المتجر.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

تقديم وظيفة

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

انتظر حتى تنتهي الوظيفة

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

قائمة خطوط الأنابيب والتكرارات

استنادا إلى ما إذا كانت وظائفك تحتوي على بيانات تعريف لخطوط الأنابيب أو التكرار، يمكنك سرد خطوط الأنابيب والتكرارات.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

إدارة سياسات الحوسبة

يوفر كائن DataLakeAnalyticsAccountManagementClient طرقا لإدارة نهج الحوسبة لحساب Data Lake Analytics.

نهج حساب القائمة

تسترد التعليمة البرمجية التالية قائمة بنهج الحوسبة لحساب Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

إنشاء نهج حوسبة جديد

تقوم التعليمة البرمجية التالية بإنشاء نهج حوسبة جديد لحساب Data Lake Analytics، وتعيين الحد الأقصى لوحدات AU المتاحة للمستخدم المحدد إلى 50، والحد الأدنى لأولوية المهمة إلى 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

الخطوات التالية