Zarządzanie usługą Azure Data Lake Analytics przy użyciu języka Python

Ważne

Usługa Azure Data Lake Analytics wycofana 29 lutego 2024 r. Dowiedz się więcej z tego ogłoszenia.

Na potrzeby analizy danych organizacja może używać usługi Azure Synapse Analytics lub Microsoft Fabric.

W tym artykule opisano sposób zarządzania kontami usługi Azure Data Lake Analytics, źródłami danych, użytkownikami i zadaniami przy użyciu języka Python.

Obsługiwane wersje języka Python

  • Użyj 64-bitowej wersji języka Python.
  • Możesz użyć standardowej dystrybucji języka Python znajdującej się w Python.org pobierania.
  • Wielu deweloperów uważa, że jest wygodne korzystanie z dystrybucji anaconda Python.
  • Ten artykuł został napisany przy użyciu języka Python w wersji 3.6 ze standardowej dystrybucji języka Python

Instalowanie zestawu Azure Python SDK

Zainstaluj następujące moduły:

  • Moduł azure-mgmt-resource zawiera inne moduły platformy Azure dla usługi Active Directory itp.
  • Moduł azure-datalake-store obejmuje operacje systemu plików usługi Azure Data Lake Store.
  • Moduł azure-mgmt-datalake-store obejmuje operacje zarządzania kontami usługi Azure Data Lake Store.
  • Moduł azure-mgmt-datalake-analytics zawiera operacje Data Lake Analytics platformy Azure.

Najpierw upewnij się, że masz najnowszą wersję pip , uruchamiając następujące polecenie:

python -m pip install --upgrade pip

Ten dokument został napisany przy użyciu polecenia pip version 9.0.1.

Użyj następujących pip poleceń, aby zainstalować moduły z poziomu wiersza polecenia:

pip install azure-identity
pip install azure-mgmt-resource
pip install azure-datalake-store
pip install azure-mgmt-datalake-store
pip install azure-mgmt-datalake-analytics

Tworzenie nowego skryptu języka Python

Wklej następujący kod do skryptu:

# Use this only for Azure AD service-to-service authentication
#from azure.common.credentials import ServicePrincipalCredentials

# Use this only for Azure AD end-user authentication
#from azure.common.credentials import UserPassCredentials

# Required for Azure Identity
from azure.identity import DefaultAzureCredential

# Required for Azure Resource Manager
from azure.mgmt.resource.resources import ResourceManagementClient
from azure.mgmt.resource.resources.models import ResourceGroup

# Required for Azure Data Lake Store account management
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.mgmt.datalake.store.models import DataLakeStoreAccount

# Required for Azure Data Lake Store filesystem management
from azure.datalake.store import core, lib, multithread

# Required for Azure Data Lake Analytics account management
from azure.mgmt.datalake.analytics.account import DataLakeAnalyticsAccountManagementClient
from azure.mgmt.datalake.analytics.account.models import DataLakeAnalyticsAccount, DataLakeStoreAccountInformation

# Required for Azure Data Lake Analytics job management
from azure.mgmt.datalake.analytics.job import DataLakeAnalyticsJobManagementClient
from azure.mgmt.datalake.analytics.job.models import JobInformation, JobState, USqlJobProperties

# Required for Azure Data Lake Analytics catalog management
from azure.mgmt.datalake.analytics.catalog import DataLakeAnalyticsCatalogManagementClient

# Required for Azure Data Lake Analytics Model
from azure.mgmt.datalake.analytics.account.models import CreateOrUpdateComputePolicyParameters

# Use these as needed for your application
import logging
import getpass
import pprint
import uuid
import time

Uruchom ten skrypt, aby sprawdzić, czy moduły można zaimportować.

Authentication

Interaktywne uwierzytelnianie użytkowników z wyskakującym okienkiem

Ta metoda nie jest obsługiwana.

Interaktywne uwierzytelnianie użytkowników przy użyciu kodu urządzenia

user = input(
    'Enter the user to authenticate with that has permission to subscription: ')
password = getpass.getpass()
credentials = UserPassCredentials(user, password)

Uwierzytelnianie nieinteraktywne przy użyciu usługi SPI i wpisu tajnego

# Acquire a credential object for the app identity. When running in the cloud,
# DefaultAzureCredential uses the app's managed identity (MSI) or user-assigned service principal.
# When run locally, DefaultAzureCredential relies on environment variables named
# AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID.

credentials = DefaultAzureCredential()

Uwierzytelnianie nieinteraktywne przy użyciu interfejsu API i certyfikatu

Ta metoda nie jest obsługiwana.

Typowe zmienne skryptu

Te zmienne są używane w przykładach.

subid = '<Azure Subscription ID>'
rg = '<Azure Resource Group Name>'
location = '<Location>'  # i.e. 'eastus2'
adls = '<Azure Data Lake Store Account Name>'
adla = '<Azure Data Lake Analytics Account Name>'

Tworzenie klientów

resourceClient = ResourceManagementClient(credentials, subid)
adlaAcctClient = DataLakeAnalyticsAccountManagementClient(credentials, subid)
adlaJobClient = DataLakeAnalyticsJobManagementClient(
    credentials, 'azuredatalakeanalytics.net')

Tworzenie grupy zasobów platformy Azure

armGroupResult = resourceClient.resource_groups.create_or_update(
    rg, ResourceGroup(location=location))

Tworzenie konta usługi Data Lake Analytics

Najpierw utwórz konto sklepu.

adlsAcctResult = adlsAcctClient.account.begin_create(
	rg,
	adls,
	DataLakeStoreAccount(
		location=location)
	)
).wait()

Następnie utwórz konto usługi ADLA, które używa tego magazynu.

adlaAcctResult = adlaAcctClient.account.create(
    rg,
    adla,
    DataLakeAnalyticsAccount(
        location=location,
        default_data_lake_store_account=adls,
        data_lake_store_accounts=[DataLakeStoreAccountInformation(name=adls)]
    )
).wait()

Przesyłanie zadania

script = """
@a  = 
    SELECT * FROM 
        (VALUES
            ("Contoso", 1500.0),
            ("Woodgrove", 2700.0)
        ) AS 
              D( customer, amount );
OUTPUT @a
    TO "/data.csv"
    USING Outputters.Csv();
"""

jobId = str(uuid.uuid4())
jobResult = adlaJobClient.job.create(
    adla,
    jobId,
    JobInformation(
        name='Sample Job',
        type='USql',
        properties=USqlJobProperties(script=script)
    )
)

Oczekiwanie na zakończenie zadania

jobResult = adlaJobClient.job.get(adla, jobId)
while(jobResult.state != JobState.ended):
    print('Job is not yet done, waiting for 3 seconds. Current state: ' +
          jobResult.state.value)
    time.sleep(3)
    jobResult = adlaJobClient.job.get(adla, jobId)

print('Job finished with result: ' + jobResult.result.value)

Wyświetlanie listy potoków i cykli

W zależności od tego, czy zadania mają dołączone metadane potoku lub cyklu, można wyświetlić listę potoków i cykli.

pipelines = adlaJobClient.pipeline.list(adla)
for p in pipelines:
    print('Pipeline: ' + p.name + ' ' + p.pipelineId)

recurrences = adlaJobClient.recurrence.list(adla)
for r in recurrences:
    print('Recurrence: ' + r.name + ' ' + r.recurrenceId)

Zarządzanie zasadami obliczeniowymi

Obiekt DataLakeAnalyticsAccountManagementClient udostępnia metody zarządzania zasadami obliczeniowymi dla konta Data Lake Analytics.

Wyświetlanie listy zasad obliczeniowych

Poniższy kod pobiera listę zasad obliczeniowych dla konta Data Lake Analytics.

policies = adlaAcctClient.compute_policies.list_by_account(rg, adla)
for p in policies:
    print('Name: ' + p.name + 'Type: ' + p.object_type + 'Max AUs / job: ' +
          p.max_degree_of_parallelism_per_job + 'Min priority / job: ' + p.min_priority_per_job)

Tworzenie nowych zasad obliczeniowych

Poniższy kod tworzy nowe zasady obliczeniowe dla konta Data Lake Analytics, ustawiając maksymalną liczbę jednostek AU dostępnych dla określonego użytkownika na 50, a minimalny priorytet zadania na 250.

userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde"
newPolicyParams = CreateOrUpdateComputePolicyParameters(
    userAadObjectId, "User", 50, 250)
adlaAcctClient.compute_policies.create_or_update(
    rg, adla, "GaryMcDaniel", newPolicyParams)

Następne kroki