端到端 Blob 引入到 Azure 数据资源管理器
Azure 数据资源管理器是一项快速且可缩放的数据探索服务,适用于日志和遥测数据。 本文提供了有关如何将数据从 Azure Blob 存储引入 Azure 数据资源管理器的端到端示例。
你将了解如何以编程方式创建资源组、存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库。 你还将了解如何以编程方式配置 Azure 数据资源管理器以从新存储帐户引入数据。
先决条件
- Azure 订阅。 创建免费 Azure 帐户。
- 可以访问资源的 Azure AD 应用程序和服务主体。 保存“目录(租户) ID”、“应用程序 ID”和“客户端机密”值。
安装包
本文包含 C# 和 Python 的示例。 选择首选语言的选项卡,并安装所需的包。
Azure Resource Manager 模板
在本文中,你将使用 Azure 资源管理器 (ARM) 模板创建资源组、存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库。 将以下内容保存在名为 template.json
的文件中。 使用此文件来运行代码示例。
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"eventHubNamespaceName": {
"type": "string",
"metadata": {
"description": "Specifies a the event hub Namespace name."
}
},
"eventHubName": {
"type": "string",
"metadata": {
"description": "Specifies a event hub name."
}
},
"storageAccountType": {
"type": "string",
"defaultValue": "Standard_LRS",
"allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
"metadata": {
"description": "Storage Account type"
}
},
"storageAccountName": {
"type": "string",
"defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the storage account to create"
}
},
"containerName": {
"type": "string",
"defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the container in storage account to create"
}
},
"eventHubSku": {
"type": "string",
"allowedValues": ["Basic", "Standard"],
"defaultValue": "Standard",
"metadata": {
"description": "Specifies the messaging tier for service Bus namespace."
}
},
"kustoClusterName": {
"type": "string",
"defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the cluster to create"
}
},
"kustoDatabaseName": {
"type": "string",
"defaultValue": "kustodb",
"metadata": {
"description": "Name of the database to create"
}
},
"clusterPrincipalAssignmentName": {
"type": "string",
"defaultValue": "clusterPrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForCluster": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForClusterPrincipal": {
"type": "string",
"defaultValue": "AllDatabasesViewer",
"metadata": {
"description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
'AllDatabasesMonitor' or 'AllDatabasesViewer'"
}
},
"tenantIdForClusterPrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the cluster principal"
}
},
"principalTypeForCluster": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"databasePrincipalAssignmentName": {
"type": "string",
"defaultValue": "databasePrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForDatabase": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForDatabasePrincipal": {
"type": "string",
"defaultValue": "Admin",
"metadata": {
"description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
}
},
"tenantIdForDatabasePrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the database principal"
}
},
"principalTypeForDatabase": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"location": {
"type": "string",
"defaultValue": "[resourceGroup().location]",
"metadata": {
"description": "Location for all resources."
}
}
},
"variables": {
},
"resources": [{
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces",
"name": "[parameters('eventHubNamespaceName')]",
"location": "[parameters('location')]",
"sku": {
"name": "[parameters('eventHubSku')]",
"tier": "[parameters('eventHubSku')]",
"capacity": 1
},
"properties": {
"isAutoInflateEnabled": false,
"maximumThroughputUnits": 0
}
}, {
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces/eventhubs",
"name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
"properties": {
"messageRetentionInDays": 7,
"partitionCount": 1
}
}, {
"type": "Microsoft.Storage/storageAccounts",
"name": "[parameters('storageAccountName')]",
"location": "[parameters('location')]",
"apiVersion": "2018-07-01",
"sku": {
"name": "[parameters('storageAccountType')]"
},
"kind": "StorageV2",
"resources": [
{
"name": "[concat('default/', parameters('containerName'))]",
"type": "blobServices/containers",
"apiVersion": "2018-07-01",
"dependsOn": [
"[parameters('storageAccountName')]"
],
"properties": {
"publicAccess": "None"
}
}
],
"properties": {}
}, {
"name": "[parameters('kustoClusterName')]",
"type": "Microsoft.Kusto/clusters",
"sku": {
"name": "Standard_E8ads_v5",
"tier": "Standard",
"capacity": 2
},
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"tags": {
"Created By": "GitHub quickstart template"
}
}, {
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
"type": "Microsoft.Kusto/clusters/databases",
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"softDeletePeriodInDays": 365,
"hotCachePeriodInDays": 31
}
}, {
"type": "Microsoft.Kusto/Clusters/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"principalId": "[parameters('principalIdForCluster')]",
"role": "[parameters('roleForClusterPrincipal')]",
"tenantId": "[parameters('tenantIdForClusterPrincipal')]",
"principalType": "[parameters('principalTypeForCluster')]"
}
}, {
"type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
"properties": {
"principalId": "[parameters('principalIdForDatabase')]",
"role": "[parameters('roleForDatabasePrincipal')]",
"tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
"principalType": "[parameters('principalTypeForDatabase')]"
}
}
]
}
代码示例
下面的代码示例提供了一个分步过程,该过程导致将数据引入到 Azure 数据资源管理器中。
首先创建一个资源组。 还将创建 Azure 资源,例如存储帐户和容器、事件中心以及 Azure 数据资源管理器群集和数据库,并添加主体。 然后,在 Azure 数据资源管理器数据库中创建 Azure 事件网格订阅以及表和列映射。 最后,创建数据连接,将 Azure 数据资源管理器配置为从新存储帐户引入数据。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = await ApplicationTokenProvider.LoginSilentAsync(tenantId, clientId, clientSecret);
var resourceManagementClient = new ResourceManagementClient(credentials) { SubscriptionId = subscriptionId };
var deploymentName = "e2eexample";
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
var resourceGroupName = deploymentName + "resourcegroup";
var location = "West Europe";
var resourceGroupData = new ResourceGroup(location);
await resourceManagementClient.ResourceGroups.CreateOrUpdateAsync(resourceGroupName, resourceGroupData);
Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous section
var eventHubName = deploymentName + "eventhub";
var eventHubNamespaceName = eventHubName + "ns";
var storageAccountName = deploymentName + "storage";
var storageContainerName = deploymentName + "storagecontainer";
var eventGridSubscriptionName = deploymentName + "eventgrid";
var kustoClusterName = deploymentName + "kustocluster";
var kustoDatabaseName = deploymentName + "kustodatabase";
var kustoTableName = "Events";
var kustoColumnMappingName = "Events_CSV_Mapping";
var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
var deploymentData = new Deployment(
new DeploymentProperties(
DeploymentMode.Incremental,
template: File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8),
parameters: new Dictionary<string, Dictionary<string, object>>
{
["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } },
["eventHubName"] = new(capacity: 1) { { "value", eventHubName } },
["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } },
["containerName"] = new(capacity: 1) { { "value", storageContainerName } },
["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } },
["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } },
["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID
["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } },
["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } },
["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } },
["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email
["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } },
["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } },
["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } }
}
)
);
await resourceManagementClient.Deployments.CreateOrUpdateAsync(resourceGroupName, deploymentName, deploymentData);
Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var storageResourceId = $"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}";
var eventHubResourceId = $"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}";
var eventGridClient = new EventGridManagementClient(credentials) { SubscriptionId = subscriptionId };
var eventSubscriptionData = new EventSubscription
{
Destination = new EventHubEventSubscriptionDestination(eventHubResourceId),
Filter = new EventSubscriptionFilter
{
SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
IncludedEventTypes = new List<string> { "Microsoft.Storage.BlobCreated" }
}
};
await eventGridClient.EventSubscriptions.CreateOrUpdateAsync(storageResourceId, eventGridSubscriptionName, eventSubscriptionData);
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{location.ToLower().Replace(" ", "")}.kusto.windows.net";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
InitialCatalog = kustoDatabaseName,
FederatedSecurity = true,
ApplicationClientId = clientId,
ApplicationKey = clientSecret,
Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
kustoClient.ExecuteControlCommand(
CslCommandGenerator.GenerateTableCreateCommand(
kustoTableName,
new[]
{
Tuple.Create("EventTime", "System.DateTime"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("EventSummary", "System.String"),
}
)
);
kustoClient.ExecuteControlCommand(
CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
kustoTableName,
kustoColumnMappingName,
new ColumnMapping[]
{
new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
}
)
);
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var kustoManagementClient = new KustoManagementClient(credentials) { SubscriptionId = subscriptionId };
var eventGridDataConnectionData = new EventGridDataConnection(
storageResourceId, eventHubResourceId, "$Default",
location: location, tableName: kustoTableName, mappingRuleName: kustoColumnMappingName, dataFormat: "csv"
);
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(
resourceGroupName, kustoClusterName, kustoDatabaseName, kustoDataConnectionName, eventGridDataConnectionData
);
设置 | 字段说明 |
---|---|
tenantId | 租户 ID。 它也称为目录 ID。 |
subscriptionId | 用于创建资源的订阅 ID。 |
clientId | 可以访问租户中资源的应用程序的客户端 ID。 |
clientSecret | 可以访问租户中资源的应用程序的客户端密码。 |
测试代码示例
将文件上传到存储帐户。
string storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.windows.net"; var cloudStorageAccount = CloudStorageAccount.Parse(storageConnectionString); CloudBlobClient blobClient = cloudStorageAccount.CreateCloudBlobClient(); CloudBlobContainer container = blobClient.GetContainerReference(storageContainerName); CloudBlockBlob blockBlob = container.GetBlockBlobReference("test.csv"); var blobContent = @"2007-01-01 00:00:00.0000000,2592,Several trees down 2007-01-01 00:00:00.0000000,4171,Winter Storm"; await blockBlob.UploadTextAsync(blobContent);
设置 字段说明 storageConnectionString 以编程方式创建的存储帐户的连接字符串。 在 Azure 数据资源管理器中运行测试查询。
var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net"; var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) { InitialCatalog = kustoDatabaseName, FederatedSecurity = true, ApplicationClientId = clientId, ApplicationKey = clientSecret, Authority = tenantId }; using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder)) { var query = $"{kustoTableName} | take 10"; using (var reader = kustoClient.ExecuteQuery(query) as DataTableReader2) {// Print the contents of each of the result sets. while (reader.Read()) { Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}"); } } }
后续步骤
- 若要了解创建群集的其他方法,请参阅创建 Azure 数据资源管理器群集和数据库。
- 若要详细了解引入方法,请参阅 Azure 数据资源管理器数据引入。
- 了解 Kusto 查询语言 (KQL) 的常用运算符。