通过 .NET 应用管理 Azure Data Lake Analytics

重要

Azure Data Lake Analytics于 2024 年 2 月 29 日停用。 通过此公告了解更多信息。

对于数据分析,组织可以使用 Azure Synapse AnalyticsMicrosoft Fabric

本文介绍如何通过使用 Azure .NET SDK 编写的应用管理 Azure Data Lake Analytics 帐户、数据源、用户和作业。

先决条件

  • Visual Studio 2015、Visual Studio 2013 Update 4 或安装有 Visual C++ 的 Visual Studio 2012
  • 用于 .NET 的 Microsoft Azure SDK 2.5 或更高版本。 可以使用 Web 平台安装程序安装它。
  • 所需的 NuGet 包

安装 NuGet 包

程序包 版本
Microsoft.Rest.ClientRuntime.Azure.Authentication 2.3.1
Microsoft.Azure.Management.DataLake.Analytics 3.0.0
Microsoft.Azure.Management.DataLake.Store 2.2.0
Microsoft.Azure.Management.ResourceManager 1.6.0-preview
Microsoft.Azure.Graph.RBAC 3.4.0-preview

可以使用以下命令安装通过 NuGet 命令行安装这些包:

Install-Package -Id Microsoft.Rest.ClientRuntime.Azure.Authentication  -Version 2.3.1
Install-Package -Id Microsoft.Azure.Management.DataLake.Analytics  -Version 3.0.0
Install-Package -Id Microsoft.Azure.Management.DataLake.Store  -Version 2.2.0
Install-Package -Id Microsoft.Azure.Management.ResourceManager  -Version 1.6.0-preview
Install-Package -Id Microsoft.Azure.Graph.RBAC -Version 3.4.0-preview

常见变量

string subid = "<Subscription ID>"; // Subscription ID (a GUID)
string tenantid = "<Tenant ID>"; // AAD tenant ID or domain. For example, "contoso.onmicrosoft.com"
string rg == "<value>"; // Resource  group name
string clientid = "abcdef01-2345-6789-0abc-def012345678"; // Sample client ID

身份验证

可以通过多种方法登录到 Azure Data Lake Analytics。 下面的代码片段显示使用弹出窗口通过交互用户身份验证进行身份验证的示例。

对于 ClientID,可以使用用户的 ID,也可以使用 服务主体的应用程序 (客户端) ID。

using System;
using System.IO;
using System.Threading;
using System.Security.Cryptography.X509Certificates;

using Microsoft.Rest;
using Microsoft.Rest.Azure.Authentication;
using Microsoft.Azure.Management.DataLake.Analytics;
using Microsoft.Azure.Management.DataLake.Analytics.Models;
using Microsoft.Azure.Management.DataLake.Store;
using Microsoft.Azure.Management.DataLake.Store.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Azure.Graph.RBAC;

public static Program
{
   public static string TENANT = "microsoft.onmicrosoft.com";
   public static string CLIENTID = "abcdef01-2345-6789-0abc-def012345678";
   public static System.Uri ARM_TOKEN_AUDIENCE = new System.Uri( @"https://management.core.windows.net/");
   public static System.Uri ADL_TOKEN_AUDIENCE = new System.Uri( @"https://datalake.azure.net/" );
   public static System.Uri GRAPH_TOKEN_AUDIENCE = new System.Uri( @"https://graph.windows.net/" );

   static void Main(string[] args)
   {
      string MY_DOCUMENTS= System.Environment.GetFolderPath( System.Environment.SpecialFolder.MyDocuments);
      string TOKEN_CACHE_PATH = System.IO.Path.Combine(MY_DOCUMENTS, "my.tokencache");

      var tokenCache = GetTokenCache(TOKEN_CACHE_PATH);
      var armCreds = GetCreds_User_Popup(TENANT, ARM_TOKEN_AUDIENCE, CLIENTID, tokenCache);
      var adlCreds = GetCreds_User_Popup(TENANT, ADL_TOKEN_AUDIENCE, CLIENTID, tokenCache);
      var graphCreds = GetCreds_User_Popup(TENANT, GRAPH_TOKEN_AUDIENCE, CLIENTID, tokenCache);
   }
}

GetCreds_User_Popup 的源代码和用于身份验证其他选项的代码会在 Data Lake Analytics .NET 身份验证选项中进行介绍

创建客户端管理对象

var resourceManagementClient = new ResourceManagementClient(armCreds) { SubscriptionId = subid };

var adlaAccountClient = new DataLakeAnalyticsAccountManagementClient(armCreds);
adlaAccountClient.SubscriptionId = subid;

var adlsAccountClient = new DataLakeStoreAccountManagementClient(armCreds);
adlsAccountClient.SubscriptionId = subid;

var adlaCatalogClient = new DataLakeAnalyticsCatalogManagementClient(adlCreds);
var adlaJobClient = new DataLakeAnalyticsJobManagementClient(adlCreds);

var adlsFileSystemClient = new DataLakeStoreFileSystemManagementClient(adlCreds);

var  graphClient = new GraphRbacManagementClient(graphCreds);
graphClient.TenantID = domain;

管理帐户

创建 Azure 资源组

如果尚未创建 Azure 资源组,必须创建一个,这样才能创建 Data Lake Analytics 组件。 需要身份验证凭据、订阅 ID 和位置。 以下代码演示如何创建资源组:

var resourceGroup = new ResourceGroup { Location = location };
resourceManagementClient.ResourceGroups.CreateOrUpdate(groupName, rg);

有关详细信息,请参阅 Azure 资源组和 Data Lake Analytics。

创建 Data Lake Store 帐户

ADLA 帐户曾经需要 ADLS 帐户。 如果尚未具有可供使用的帐户,则可以使用以下代码创建一个:

var new_adls_params = new DataLakeStoreAccount(location: _location);
adlsAccountClient.Account.Create(rg, adls, new_adls_params);

创建 Data Lake Analytics 帐户

下面的代码会创建 ADLS 帐户

var new_adla_params = new DataLakeAnalyticsAccount()
{
   DefaultDataLakeStoreAccount = adls,
   Location = location
};

adlaClient.Account.Create(rg, adla, new_adla_params);

列出 Data Lake Store 帐户

var adlsAccounts = adlsAccountClient.Account.List().ToList();
foreach (var adls in adlsAccounts)
{
   Console.WriteLine($"ADLS: {0}", adls.Name);
}

列出 Data Lake Analytics 帐户

var adlaAccounts = adlaClient.Account.List().ToList();

for (var adla in AdlaAccounts)
{
   Console.WriteLine($"ADLA: {0}, adla.Name");
}

检查帐户是否存在

bool exists = adlaClient.Account.Exists(rg, adla));

获取有关帐户的信息

bool exists = adlaClient.Account.Exists(rg, adla));
if (exists)
{
   var adla_accnt = adlaClient.Account.Get(rg, adla);
}

删除帐户

if (adlaClient.Account.Exists(rg, adla))
{
   adlaClient.Account.Delete(rg, adla);
}

获取默认的 Data Lake Store 帐户

每个 Data Lake Analytics 帐户都需要一个默认的 Data Lake Store 帐户。 使用此代码可以确定 Analytics 帐户的默认 Store 帐户。

if (adlaClient.Account.Exists(rg, adla))
{
  var adla_accnt = adlaClient.Account.Get(rg, adla);
  string def_adls_account = adla_accnt.DefaultDataLakeStoreAccount;
}

管理数据源

Data Lake Analytics 当前支持以下数据源:

可以创建指向 Azure 存储帐户的链接。

string storage_key = "xxxxxxxxxxxxxxxxxxxx";
string storage_account = "mystorageaccount";
var addParams = new AddStorageAccountParameters(storage_key);            
adlaClient.StorageAccounts.Add(rg, adla, storage_account, addParams);

列出 Azure 存储数据源

var stg_accounts = adlaAccountClient.StorageAccounts.ListByAccount(rg, adla);

if (stg_accounts != null)
{
  foreach (var stg_account in stg_accounts)
  {
      Console.WriteLine($"Storage account: {0}", stg_account.Name);
  }
}

列出 Data Lake Store 数据源

var adls_accounts = adlsClient.Account.List();

if (adls_accounts != null)
{
  foreach (var adls_accnt in adls_accounts)
  {
      Console.WriteLine($"ADLS account: {0}", adls_accnt.Name);
  }
}

上传和下载文件夹和文件

可以通过以下方法使用 Data Lake Store 文件系统客户端管理对象从 Azure 将单个文件或文件夹上传和下载到本地计算机:

  • UploadFolder
  • UploadFile
  • DownloadFolder
  • DownloadFile

这些方法的第一个参数是 Data Lake Store 帐户的名称,后跟源路径和目标路径的参数。

以下示例演示如何下载 Data Lake Store 中的文件夹。

adlsFileSystemClient.FileSystem.DownloadFolder(adls, sourcePath, destinationPath);

在 Data Lake Store 帐户中创建文件

using (var memstream = new MemoryStream())
{
   using (var sw = new StreamWriter(memstream, UTF8Encoding.UTF8))
   {
      sw.WriteLine("Hello World");
      sw.Flush();
      
      memstream.Position = 0;

      adlsFileSystemClient.FileSystem.Create(adls, "/Samples/Output/randombytes.csv", memstream);
   }
}

验证 Azure 存储帐户路径

以下代码检查某个 Data Lake Analytics 帐户 (analyticsAccountName) 中是否存在某个 Azure 存储帐户 (storageAccntName),以及该 Azure 存储帐户中是否存在某个容器 (containerName)。

string storage_account = "mystorageaccount";
string storage_container = "mycontainer";
bool accountExists = adlaClient.Account.StorageAccountExists(rg, adla, storage_account));
bool containerExists = adlaClient.Account.StorageContainerExists(rg, adla, storage_account, storage_container));

管理目录和作业

DataLakeAnalyticsCatalogManagementClient 对象提供用于管理 SQL 数据库(为每个 Azure Data Lake Analytics 帐户提供)的方法。 DataLakeAnalyticsJobManagementClient 提供使用 U-SQL 脚本来提交和管理数据库中运行的作业的方法。

列出数据库和架构

在可以列出的多种对象中,最常见的对象是数据库及其架构。 以下代码获取数据库的集合,并枚举每个数据库的架构。

var databases = adlaCatalogClient.Catalog.ListDatabases(adla);
foreach (var db in databases)
{
  Console.WriteLine($"Database: {db.Name}");
  Console.WriteLine(" - Schemas:");
  var schemas = adlaCatalogClient.Catalog.ListSchemas(adla, db.Name);
  foreach (var schm in schemas)
  {
      Console.WriteLine($"\t{schm.Name}");
  }
}

列出表列

以下代码演示如何使用 Data Lake Analytics 目录管理客户端访问数据库,以列出指定表中的列。

var tbl = adlaCatalogClient.Catalog.GetTable(adla, "master", "dbo", "MyTableName");
IEnumerable<USqlTableColumn> columns = tbl.ColumnList;

foreach (USqlTableColumn utc in columns)
{
  Console.WriteLine($"\t{utc.Name}");
}

提交 U-SQL 作业

以下代码演示如何使用 Data Lake Analytics 作业管理客户端提交作业。

string scriptPath = "/Samples/Scripts/SearchResults_Wikipedia_Script.txt";
Stream scriptStrm = adlsFileSystemClient.FileSystem.Open(_adlsAccountName, scriptPath);
string scriptTxt = string.Empty;
using (StreamReader sr = new StreamReader(scriptStrm))
{
    scriptTxt = sr.ReadToEnd();
}

var jobName = "SR_Wikipedia";
var jobId = Guid.NewGuid();
var properties = new USqlJobProperties(scriptTxt);
var parameters = new JobInformation(jobName, JobType.USql, properties, priority: 1, degreeOfParallelism: 1, jobId: jobId);
var jobInfo = adlaJobClient.Job.Create(adla, jobId, parameters);
Console.WriteLine($"Job {jobName} submitted.");

列出失败的作业

以下代码列出有关失败的作业的信息。

var odq = new ODataQuery<JobInformation> { Filter = "result eq 'Failed'" };
var jobs = adlaJobClient.Job.List(adla, odq);
foreach (var j in jobs)
{
   Console.WriteLine($"{j.Name}\t{j.JobId}\t{j.Type}\t{j.StartTime}\t{j.EndTime}");
}

列出管道

下面的代码列出有关提交给帐户的作业的每个管道的信息。

var pipelines = adlaJobClient.Pipeline.List(adla);
foreach (var p in pipelines)
{
   Console.WriteLine($"Pipeline: {p.Name}\t{p.PipelineId}\t{p.LastSubmitTime}");
}

列出重复周期

下面的代码列出有关提交给帐户的作业的每个重复周期的信息。

var recurrences = adlaJobClient.Recurrence.List(adla);
foreach (var r in recurrences)
{
   Console.WriteLine($"Recurrence: {r.Name}\t{r.RecurrenceId}\t{r.LastSubmitTime}");
}

常见图形方案

在Microsoft Entra ID目录中查找用户

var userinfo = graphClient.Users.Get( "bill@contoso.com" );

获取 Microsoft Entra ID 目录中用户的 ObjectId

var userinfo = graphClient.Users.Get( "bill@contoso.com" );
Console.WriteLine( userinfo.ObjectId )

管理计算策略

DataLakeAnalyticsAccountManagementClient 对象提供用于为 Data Lake Analytics 帐户管理计算策略的方法。

列出计算策略

下面的代码检索 Data Lake Analytics 帐户的计算策略列表。

var policies = adlaAccountClient.ComputePolicies.ListByAccount(rg, adla);
foreach (var p in policies)
{
   Console.WriteLine($"Name: {p.Name}\tType: {p.ObjectType}\tMax AUs / job: {p.MaxDegreeOfParallelismPerJob}\tMin priority / job: {p.MinPriorityPerJob}");
}

创建新计算策略

下面的代码为 Data Lake Analytics 帐户创建新的计算策略,将对指定用户可用的最大 AU 设置为 50,并将最小作业优先级设置为 250。

var userAadObjectId = "3b097601-4912-4d41-b9d2-78672fc2acde";
var newPolicyParams = new ComputePolicyCreateOrUpdateParameters(userAadObjectId, "User", 50, 250);
adlaAccountClient.ComputePolicies.CreateOrUpdate(rg, adla, "GaryMcDaniel", newPolicyParams);

后续步骤