Azure Data Explorer 엔드 투 엔드 Blob 수집

Azure Data Explorer는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장이 가능한 데이터 탐색 서비스로서, 이 문서에서는 Azure Blob Storage에서 Azure Data Explorer로 데이터를 수집하는 방법에 대한 엔드투엔드 예제를 제공합니다.

리소스 그룹, 스토리지 계정 및 컨테이너, 이벤트 허브, Azure Data Explorer 클러스터 및 데이터베이스를 프로그래밍 방식으로 만드는 방법을 알아봅니다. 또한 새 스토리지 계정에서 데이터를 수집하도록 Azure Data Explorer를 프로그래밍 방식으로 구성하는 방법도 알아봅니다.

이전 SDK 버전을 기반으로 하는 코드 샘플은 보관된 문서를 참조하세요.

사전 요구 사항

패키지 설치

이 문서에는 C# 및 Python의 예제가 포함되어 있습니다. 원하는 언어에 대한 탭을 선택하고 필요한 패키지를 설치합니다.

Azure Resource Manager 템플릿

이 문서에서는 ARM(Azure Resource Manager) 템플릿을 사용하여 리소스 그룹, 스토리지 계정 및 컨테이너, 이벤트 허브, Azure Data Explorer 클러스터 및 데이터베이스를 만듭니다. 이름이 template.json인 파일에 다음 콘텐츠를 저장합니다. 이 파일을 사용하여 코드 예제를 실행합니다.

{
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "eventHubNamespaceName": {
            "type": "string",
            "metadata": {
                "description": "Specifies a the event hub Namespace name."
            }
        },
        "eventHubName": {
            "type": "string",
            "metadata": {
                "description": "Specifies a event hub name."
            }
        },
        "storageAccountType": {
            "type": "string",
            "defaultValue": "Standard_LRS",
            "allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
            "metadata": {
                "description": "Storage Account type"
            }
        },
        "storageAccountName": {
            "type": "string",
            "defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the storage account to create"
            }
        },
        "containerName": {
            "type": "string",
            "defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the container in storage account to create"
            }
        },
        "eventHubSku": {
            "type": "string",
            "allowedValues": ["Basic", "Standard"],
            "defaultValue": "Standard",
            "metadata": {
                "description": "Specifies the messaging tier for service Bus namespace."
            }
        },
        "kustoClusterName": {
            "type": "string",
            "defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
            "metadata": {
                "description": "Name of the cluster to create"
            }
        },
        "kustoDatabaseName": {
            "type": "string",
            "defaultValue": "kustodb",
            "metadata": {
                "description": "Name of the database to create"
            }
        },
        "clusterPrincipalAssignmentName": {
            "type": "string",
            "defaultValue": "clusterPrincipalAssignment1",
            "metadata": {
                "description": "Specifies the name of the principal assignment"
            }
        },
        "principalIdForCluster": {
            "type": "string",
            "metadata": {
                "description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
            }
        },
        "roleForClusterPrincipal": {
            "type": "string",
            "defaultValue": "AllDatabasesViewer",
            "metadata": {
                "description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
                'AllDatabasesMonitor' or 'AllDatabasesViewer'"
            }
        },
        "tenantIdForClusterPrincipal": {
            "type": "string",
            "metadata": {
                "description": "Specifies the tenantId of the cluster principal"
            }
        },
        "principalTypeForCluster": {
            "type": "string",
            "defaultValue": "App",
            "metadata": {
                "description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
            }
        },
        "databasePrincipalAssignmentName": {
            "type": "string",
            "defaultValue": "databasePrincipalAssignment1",
            "metadata": {
                "description": "Specifies the name of the principal assignment"
            }
        },
        "principalIdForDatabase": {
            "type": "string",
            "metadata": {
                "description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
            }
        },
        "roleForDatabasePrincipal": {
            "type": "string",
            "defaultValue": "Admin",
            "metadata": {
                "description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
            }
        },
        "tenantIdForDatabasePrincipal": {
            "type": "string",
            "metadata": {
                "description": "Specifies the tenantId of the database principal"
            }
        },
        "principalTypeForDatabase": {
            "type": "string",
            "defaultValue": "App",
            "metadata": {
                "description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
            }
        },
        "location": {
            "type": "string",
            "defaultValue": "[resourceGroup().location]",
            "metadata": {
                "description": "Location for all resources."
            }
        }
    },
    "variables": {
    },
    "resources": [{
            "apiVersion": "2017-04-01",
            "type": "Microsoft.EventHub/namespaces",
            "name": "[parameters('eventHubNamespaceName')]",
            "location": "[parameters('location')]",
            "sku": {
                "name": "[parameters('eventHubSku')]",
                "tier": "[parameters('eventHubSku')]",
                "capacity": 1
            },
            "properties": {
                "isAutoInflateEnabled": false,
                "maximumThroughputUnits": 0
            }
        }, {
            "apiVersion": "2017-04-01",
            "type": "Microsoft.EventHub/namespaces/eventhubs",
            "name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
            "location": "[parameters('location')]",
            "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
            "properties": {
                "messageRetentionInDays": 7,
                "partitionCount": 1
            }
        }, {
            "type": "Microsoft.Storage/storageAccounts",
            "name": "[parameters('storageAccountName')]",
            "location": "[parameters('location')]",
            "apiVersion": "2018-07-01",
            "sku": {
                "name": "[parameters('storageAccountType')]"
            },
            "kind": "StorageV2",
            "resources": [
                {
                    "name": "[concat('default/', parameters('containerName'))]",
                    "type": "blobServices/containers",
                    "apiVersion": "2018-07-01",
                    "dependsOn": [
                        "[parameters('storageAccountName')]"
                    ],
                    "properties": {
                        "publicAccess": "None"
                    }
                }
            ],
            "properties": {}
        }, {
            "name": "[parameters('kustoClusterName')]",
            "type": "Microsoft.Kusto/clusters",
            "sku": {
                "name": "Standard_E8ads_v5",
                "tier": "Standard",
                "capacity": 2
            },
            "apiVersion": "2019-09-07",
            "location": "[parameters('location')]",
            "tags": {
                "Created By": "GitHub quickstart template"
            }
        }, {
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
            "type": "Microsoft.Kusto/clusters/databases",
            "apiVersion": "2019-09-07",
            "location": "[parameters('location')]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
            "properties": {
                "softDeletePeriodInDays": 365,
                "hotCachePeriodInDays": 31
            }
        }, {
            "type": "Microsoft.Kusto/Clusters/principalAssignments",
            "apiVersion": "2019-11-09",
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
            "properties": {
                "principalId": "[parameters('principalIdForCluster')]",
                "role": "[parameters('roleForClusterPrincipal')]",
                "tenantId": "[parameters('tenantIdForClusterPrincipal')]",
                "principalType": "[parameters('principalTypeForCluster')]"
            }
        }, {
            "type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
            "apiVersion": "2019-11-09",
            "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
            "dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
            "properties": {
                "principalId": "[parameters('principalIdForDatabase')]",
                "role": "[parameters('roleForDatabasePrincipal')]",
                "tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
                "principalType": "[parameters('principalTypeForDatabase')]"
            }
        }
    ]
}

코드 예제

다음 코드 예제는 Azure Data Explorer로 데이터를 수집하는 단계별 프로세스를 제공합니다.

먼저 리소스 그룹을 만듭니다. 또한 스토리지 계정 및 컨테이너, 이벤트 허브, Azure Data Explorer 클러스터 및 데이터베이스와 같은 Azure 리소스를 만들고 보안 주체를 추가합니다. 그런 다음, Azure Data Explorer 데이터베이스에서 테이블 및 열 매핑과 함께 Azure Event Grid 구독을 만듭니다. 마지막으로 데이터 연결을 만들어 새 스토리지 계정에서 데이터를 수집하도록 Azure Data Explorer를 구성합니다.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application ID
var clientSecret = "PlaceholderClientSecret"; //Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var deploymentName = "e2eexample";
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
var subscriptions = resourceManagementClient.GetSubscriptions();
var subscription = (await subscriptions.GetAsync(subscriptionId)).Value;
var resourceGroups = subscription.GetResourceGroups();
var resourceGroupName = deploymentName + "resourcegroup";
var location = AzureLocation.WestEurope;
var resourceGroupData = new ResourceGroupData(location);
var resourceGroup = (await resourceGroups.CreateOrUpdateAsync(WaitUntil.Completed, resourceGroupName, resourceGroupData)).Value;
Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var deployments = resourceGroup.GetArmDeployments();
var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous section
var eventHubName = deploymentName + "eventhub";
var eventHubNamespaceName = eventHubName + "ns";
var storageAccountName = deploymentName + "storage";
var storageContainerName = deploymentName + "storagecontainer";
var eventGridSubscriptionName = deploymentName + "eventgrid";
var kustoClusterName = deploymentName + "kustocluster";
var kustoDatabaseName = deploymentName + "kustodatabase";
var kustoTableName = "Events";
var kustoColumnMappingName = "Events_CSV_Mapping";
var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
var armDeploymentContent = new ArmDeploymentContent(
    new ArmDeploymentProperties(ArmDeploymentMode.Incremental)
    {
        Template = BinaryData.FromString(File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8)),
        Parameters = BinaryData.FromObjectAsJson(
            JsonConvert.SerializeObject(
                new Dictionary<string, Dictionary<string, string>>
                {
                    ["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } },
                    ["eventHubName"] = new(capacity: 1) { { "value", eventHubName } },
                    ["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } },
                    ["containerName"] = new(capacity: 1) { { "value", storageContainerName } },
                    ["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } },
                    ["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } },
                    ["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID
                    ["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } },
                    ["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } },
                    ["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } },
                    ["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email
                    ["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } },
                    ["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } },
                    ["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } }
                }
            )
        )
    }
);
await deployments.CreateOrUpdateAsync(WaitUntil.Completed, deploymentName, armDeploymentContent);
Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var storageResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}");
var eventHubResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}");
var eventSubscriptions = resourceManagementClient.GetEventSubscriptions(storageResourceId);
var eventSubscriptionData = new EventGridSubscriptionData
{
    Destination = new EventHubEventSubscriptionDestination { ResourceId = eventHubResourceId },
    Filter = new EventSubscriptionFilter
    {
        SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
    }
};
eventSubscriptionData.Filter.IncludedEventTypes.Add(BlobStorageEventType.MicrosoftStorageBlobCreated.ToString());
await eventSubscriptions.CreateOrUpdateAsync(WaitUntil.Completed, eventGridSubscriptionName, eventSubscriptionData);
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{location}.kusto.windows.net";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
    InitialCatalog = kustoDatabaseName,
    FederatedSecurity = true,
    ApplicationClientId = clientId,
    ApplicationKey = clientSecret,
    Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    kustoClient.ExecuteControlCommand(
        CslCommandGenerator.GenerateTableCreateCommand(
            kustoTableName,
            new[]
            {
                Tuple.Create("EventTime", "System.DateTime"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("EventSummary", "System.String"),
            }
        )
    );
    kustoClient.ExecuteControlCommand(
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            IngestionMappingKind.Csv,
            kustoTableName,
            kustoColumnMappingName,
            new ColumnMapping[]
            {
                new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
                new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
                new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            }
        )
    );
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var cluster = (await resourceGroup.GetKustoClusterAsync(kustoClusterName)).Value;
var database = (await cluster.GetKustoDatabaseAsync(kustoDatabaseName)).Value;
var dataConnections = database.GetKustoDataConnections();
var eventGridDataConnectionData = new KustoEventGridDataConnection
{
    StorageAccountResourceId = storageResourceId,
    EventGridResourceId = eventHubResourceId,
    ConsumerGroup = "$Default",
    Location = location,
    TableName = kustoTableName,
    MappingRuleName = kustoColumnMappingName,
    DataFormat = KustoEventGridDataFormat.Csv
};
await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, kustoDataConnectionName, eventGridDataConnectionData);
설정 필드 설명
tenantId 테넌트 ID 디렉터리 ID라고도 합니다.
subscriptionId 리소스를 만드는 데 사용하는 구독 ID입니다.
clientId 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 ID입니다.
clientSecret 테넌트의 리소스에 액세스할 수 있는 애플리케이션의 클라이언트 암호입니다.

코드 예제 테스트

  1. 스토리지 계정에 파일을 업로드합니다.

    var container = new BlobContainerClient(
        "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.windows.net",
        storageContainerName
    );
    var blobContent = "2007-01-01 00:00:00.0000000,2592,Several trees down\n2007-01-01 00:00:00.0000000,4171,Winter Storm";
    await container.UploadBlobAsync("test.csv", BinaryData.FromString(blobContent));
    
    설정 필드 설명
    storageConnectionString 프로그래밍 방식으로 만든 스토리지 계정의 연결 문자열입니다.
  2. Azure Data Explorer에서 테스트 쿼리를 실행합니다.

    var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net";
    var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
    {
        InitialCatalog = kustoDatabaseName,
        FederatedSecurity = true,
        ApplicationClientId = clientId,
        ApplicationKey = clientSecret,
        Authority = tenantId
    };
    using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
    {
        var query = $"{kustoTableName} | take 10";
    
        using var reader = kustoClient.ExecuteQuery(query) as DataTableReader2;
        // Print the contents of each of the result sets. 
        while (reader != null && reader.Read())
        {
            Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}");
        }
    }