Azure Data Explorer 엔드 투 엔드 Blob 수집
Azure Data Explorer는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장이 가능한 데이터 탐색 서비스로서, 이 문서에서는 Azure Blob Storage에서 Azure Data Explorer로 데이터를 수집하는 방법에 대한 엔드투엔드 예제를 제공합니다.
리소스 그룹, 스토리지 계정 및 컨테이너, 이벤트 허브, Azure Data Explorer 클러스터 및 데이터베이스를 프로그래밍 방식으로 만드는 방법을 알아봅니다. 또한 새 스토리지 계정에서 데이터를 수집하도록 Azure Data Explorer를 프로그래밍 방식으로 구성하는 방법도 알아봅니다.
이전 SDK 버전을 기반으로 하는 코드 샘플은 보관된 문서를 참조하세요.
사전 요구 사항
- Azure 구독 평가판 Azure 계정을 만듭니다.
- 리소스에 액세스할 수 있는 Microsoft Entra 애플리케이션 및 서비스 주체입니다. 디렉터리(테넌트) ID, 애플리케이션 ID 및 클라이언트 암호를 저장합니다.
패키지 설치
이 문서에는 C# 및 Python의 예제가 포함되어 있습니다. 원하는 언어에 대한 탭을 선택하고 필요한 패키지를 설치합니다.
Azure Resource Manager 템플릿
이 문서에서는 ARM(Azure Resource Manager) 템플릿을 사용하여 리소스 그룹, 스토리지 계정 및 컨테이너, 이벤트 허브, Azure Data Explorer 클러스터 및 데이터베이스를 만듭니다. 이름이 template.json
인 파일에 다음 콘텐츠를 저장합니다. 이 파일을 사용하여 코드 예제를 실행합니다.
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"eventHubNamespaceName": {
"type": "string",
"metadata": {
"description": "Specifies a the event hub Namespace name."
}
},
"eventHubName": {
"type": "string",
"metadata": {
"description": "Specifies a event hub name."
}
},
"storageAccountType": {
"type": "string",
"defaultValue": "Standard_LRS",
"allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
"metadata": {
"description": "Storage Account type"
}
},
"storageAccountName": {
"type": "string",
"defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the storage account to create"
}
},
"containerName": {
"type": "string",
"defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the container in storage account to create"
}
},
"eventHubSku": {
"type": "string",
"allowedValues": ["Basic", "Standard"],
"defaultValue": "Standard",
"metadata": {
"description": "Specifies the messaging tier for service Bus namespace."
}
},
"kustoClusterName": {
"type": "string",
"defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the cluster to create"
}
},
"kustoDatabaseName": {
"type": "string",
"defaultValue": "kustodb",
"metadata": {
"description": "Name of the database to create"
}
},
"clusterPrincipalAssignmentName": {
"type": "string",
"defaultValue": "clusterPrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForCluster": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForClusterPrincipal": {
"type": "string",
"defaultValue": "AllDatabasesViewer",
"metadata": {
"description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
'AllDatabasesMonitor' or 'AllDatabasesViewer'"
}
},
"tenantIdForClusterPrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the cluster principal"
}
},
"principalTypeForCluster": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"databasePrincipalAssignmentName": {
"type": "string",
"defaultValue": "databasePrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForDatabase": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForDatabasePrincipal": {
"type": "string",
"defaultValue": "Admin",
"metadata": {
"description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
}
},
"tenantIdForDatabasePrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the database principal"
}
},
"principalTypeForDatabase": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"location": {
"type": "string",
"defaultValue": "[resourceGroup().location]",
"metadata": {
"description": "Location for all resources."
}
}
},
"variables": {
},
"resources": [{
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces",
"name": "[parameters('eventHubNamespaceName')]",
"location": "[parameters('location')]",
"sku": {
"name": "[parameters('eventHubSku')]",
"tier": "[parameters('eventHubSku')]",
"capacity": 1
},
"properties": {
"isAutoInflateEnabled": false,
"maximumThroughputUnits": 0
}
}, {
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces/eventhubs",
"name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
"properties": {
"messageRetentionInDays": 7,
"partitionCount": 1
}
}, {
"type": "Microsoft.Storage/storageAccounts",
"name": "[parameters('storageAccountName')]",
"location": "[parameters('location')]",
"apiVersion": "2018-07-01",
"sku": {
"name": "[parameters('storageAccountType')]"
},
"kind": "StorageV2",
"resources": [
{
"name": "[concat('default/', parameters('containerName'))]",
"type": "blobServices/containers",
"apiVersion": "2018-07-01",
"dependsOn": [
"[parameters('storageAccountName')]"
],
"properties": {
"publicAccess": "None"
}
}
],
"properties": {}
}, {
"name": "[parameters('kustoClusterName')]",
"type": "Microsoft.Kusto/clusters",
"sku": {
"name": "Standard_E8ads_v5",
"tier": "Standard",
"capacity": 2
},
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"tags": {
"Created By": "GitHub quickstart template"
}
}, {
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
"type": "Microsoft.Kusto/clusters/databases",
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"softDeletePeriodInDays": 365,
"hotCachePeriodInDays": 31
}
}, {
"type": "Microsoft.Kusto/Clusters/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"principalId": "[parameters('principalIdForCluster')]",
"role": "[parameters('roleForClusterPrincipal')]",
"tenantId": "[parameters('tenantIdForClusterPrincipal')]",
"principalType": "[parameters('principalTypeForCluster')]"
}
}, {
"type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
"properties": {
"principalId": "[parameters('principalIdForDatabase')]",
"role": "[parameters('roleForDatabasePrincipal')]",
"tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
"principalType": "[parameters('principalTypeForDatabase')]"
}
}
]
}
코드 예제
다음 코드 예제는 Azure Data Explorer로 데이터를 수집하는 단계별 프로세스를 제공합니다.
먼저 리소스 그룹을 만듭니다. 또한 스토리지 계정 및 컨테이너, 이벤트 허브, Azure Data Explorer 클러스터 및 데이터베이스와 같은 Azure 리소스를 만들고 보안 주체를 추가합니다. 그런 다음, Azure Data Explorer 데이터베이스에서 테이블 및 열 매핑과 함께 Azure Event Grid 구독을 만듭니다. 마지막으로 데이터 연결을 만들어 새 스토리지 계정에서 데이터를 수집하도록 Azure Data Explorer를 구성합니다.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var deploymentName = "e2eexample";
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
var subscriptions = resourceManagementClient.GetSubscriptions();
var subscription = (await subscriptions.GetAsync(subscriptionId)).Value;
var resourceGroups = subscription.GetResourceGroups();
var resourceGroupName = deploymentName + "resourcegroup";
var location = AzureLocation.WestEurope;
var resourceGroupData = new ResourceGroupData(location);
var resourceGroup = (await resourceGroups.CreateOrUpdateAsync(WaitUntil.Completed, resourceGroupName, resourceGroupData)).Value;
Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var deployments = resourceGroup.GetArmDeployments();
var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous section
var eventHubName = deploymentName + "eventhub";
var eventHubNamespaceName = eventHubName + "ns";
var storageAccountName = deploymentName + "storage";
var storageContainerName = deploymentName + "storagecontainer";
var eventGridSubscriptionName = deploymentName + "eventgrid";
var kustoClusterName = deploymentName + "kustocluster";
var kustoDatabaseName = deploymentName + "kustodatabase";
var kustoTableName = "Events";
var kustoColumnMappingName = "Events_CSV_Mapping";
var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
var armDeploymentContent = new ArmDeploymentContent(
new ArmDeploymentProperties(ArmDeploymentMode.Incremental)
{
Template = BinaryData.FromString(File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8)),
Parameters = BinaryData.FromObjectAsJson(
JsonConvert.SerializeObject(
new Dictionary<string, Dictionary<string, string>>
{
["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } },
["eventHubName"] = new(capacity: 1) { { "value", eventHubName } },
["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } },
["containerName"] = new(capacity: 1) { { "value", storageContainerName } },
["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } },
["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } },
["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID
["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } },
["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } },
["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } },
["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email
["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } },
["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } },
["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } }
}
)
)
}
);
await deployments.CreateOrUpdateAsync(WaitUntil.Completed, deploymentName, armDeploymentContent);
Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var storageResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}");
var eventHubResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}");
var eventSubscriptions = resourceManagementClient.GetEventSubscriptions(storageResourceId);
var eventSubscriptionData = new EventGridSubscriptionData
{
Destination = new EventHubEventSubscriptionDestination { ResourceId = eventHubResourceId },
Filter = new EventSubscriptionFilter
{
SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
}
};
eventSubscriptionData.Filter.IncludedEventTypes.Add(BlobStorageEventType.MicrosoftStorageBlobCreated.ToString());
await eventSubscriptions.CreateOrUpdateAsync(WaitUntil.Completed, eventGridSubscriptionName, eventSubscriptionData);
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{location}.kusto.windows.net";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
InitialCatalog = kustoDatabaseName,
FederatedSecurity = true,
ApplicationClientId = clientId,
ApplicationKey = clientSecret,
Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
kustoClient.ExecuteControlCommand(
CslCommandGenerator.GenerateTableCreateCommand(
kustoTableName,
new[]
{
Tuple.Create("EventTime", "System.DateTime"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("EventSummary", "System.String"),
}
)
);
kustoClient.ExecuteControlCommand(
CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
kustoTableName,
kustoColumnMappingName,
new ColumnMapping[]
{
new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
}
)
);
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var cluster = (await resourceGroup.GetKustoClusterAsync(kustoClusterName)).Value;
var database = (await cluster.GetKustoDatabaseAsync(kustoDatabaseName)).Value;
var dataConnections = database.GetKustoDataConnections();
var eventGridDataConnectionData = new KustoEventGridDataConnection
{
StorageAccountResourceId = storageResourceId,
EventGridResourceId = eventHubResourceId,
ConsumerGroup = "$Default",
Location = location,
TableName = kustoTableName,
MappingRuleName = kustoColumnMappingName,
DataFormat = KustoEventGridDataFormat.Csv
};
await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, kustoDataConnectionName, eventGridDataConnectionData);
설정 | 필드 설명 |
---|---|
tenantId | 테넌트 ID 디렉터리 ID라고도 합니다. |
subscriptionId | 리소스를 만드는 데 사용하는 구독 ID입니다. |
clientId | 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 ID입니다. |
clientSecret | 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 암호입니다. |
코드 예제 테스트
스토리지 계정에 파일을 업로드합니다.
var container = new BlobContainerClient( "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.windows.net", storageContainerName ); var blobContent = "2007-01-01 00:00:00.0000000,2592,Several trees down\n2007-01-01 00:00:00.0000000,4171,Winter Storm"; await container.UploadBlobAsync("test.csv", BinaryData.FromString(blobContent));
설정 필드 설명 storageConnectionString 프로그래밍 방식으로 만든 스토리지 계정의 연결 문자열입니다. Azure Data Explorer에서 테스트 쿼리를 실행합니다.
var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net"; var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) { InitialCatalog = kustoDatabaseName, FederatedSecurity = true, ApplicationClientId = clientId, ApplicationKey = clientSecret, Authority = tenantId }; using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder)) { var query = $"{kustoTableName} | take 10"; using var reader = kustoClient.ExecuteQuery(query) as DataTableReader2; // Print the contents of each of the result sets. while (reader != null && reader.Read()) { Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}"); } }
관련 콘텐츠
피드백
https://aka.ms/ContentUserFeedback
출시 예정: 2024년 내내 콘텐츠에 대한 피드백 메커니즘으로 GitHub 문제를 단계적으로 폐지하고 이를 새로운 피드백 시스템으로 바꿀 예정입니다. 자세한 내용은 다음을 참조하세요.다음에 대한 사용자 의견 제출 및 보기