End-to-end blob ingestion into Azure Data Explorer using C#
Azure Data Explorer is a fast and scalable data exploration service for log and telemetry data. This article gives you an end-to-end example of how to ingest data from Azure Blob storage into Azure Data Explorer.
You'll learn how to programmatically create a resource group, a storage account and container, an event hub, and an Azure Data Explorer cluster and database. You'll also learn how to programmatically configure Azure Data Explorer to ingest data from the new storage account.
Prerequisites
An Azure subscription. Create a free Azure account.
Install C# NuGet
- Install Microsoft.Azure.Management.kusto.
- Install Microsoft.Azure.Management.ResourceManager.
- Install Microsoft.Azure.Management.EventGrid.
- Install Microsoft.Azure.Storage.Blob.
- Install Microsoft.Rest.ClientRuntime.Azure.Authentication for authentication.
Authentication
To run the following example, you need an Azure Active Directory (Azure AD) application and service principal that can access resources. To create a free Azure AD application and add role assignment at the subscription level, see Create an Azure AD application. You also need the directory (tenant) ID, application ID, and client secret.
Azure Resource Manager template
In this article, you use an Azure Resource Manager template to create a resource group, a storage account and container, an event hub, and an Azure Data Explorer cluster and database. Save the following content in a file with the name template.json. You'll use this file to run the code example.
{
"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"eventHubNamespaceName": {
"type": "string",
"metadata": {
"description": "Specifies a the event hub Namespace name."
}
},
"eventHubName": {
"type": "string",
"metadata": {
"description": "Specifies a event hub name."
}
},
"storageAccountType": {
"type": "string",
"defaultValue": "Standard_LRS",
"allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"],
"metadata": {
"description": "Storage Account type"
}
},
"storageAccountName": {
"type": "string",
"defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the storage account to create"
}
},
"containerName": {
"type": "string",
"defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the container in storage account to create"
}
},
"eventHubSku": {
"type": "string",
"allowedValues": ["Basic", "Standard"],
"defaultValue": "Standard",
"metadata": {
"description": "Specifies the messaging tier for service Bus namespace."
}
},
"kustoClusterName": {
"type": "string",
"defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]",
"metadata": {
"description": "Name of the cluster to create"
}
},
"kustoDatabaseName": {
"type": "string",
"defaultValue": "kustodb",
"metadata": {
"description": "Name of the database to create"
}
},
"clusterPrincipalAssignmentName": {
"type": "string",
"defaultValue": "clusterPrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForCluster": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForClusterPrincipal": {
"type": "string",
"defaultValue": "AllDatabasesViewer",
"metadata": {
"description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin',
'AllDatabasesMonitor' or 'AllDatabasesViewer'"
}
},
"tenantIdForClusterPrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the cluster principal"
}
},
"principalTypeForCluster": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"databasePrincipalAssignmentName": {
"type": "string",
"defaultValue": "databasePrincipalAssignment1",
"metadata": {
"description": "Specifies the name of the principal assignment"
}
},
"principalIdForDatabase": {
"type": "string",
"metadata": {
"description": "Specifies the principal id. It can be user email, application (client) ID, security group name"
}
},
"roleForDatabasePrincipal": {
"type": "string",
"defaultValue": "Admin",
"metadata": {
"description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'"
}
},
"tenantIdForDatabasePrincipal": {
"type": "string",
"metadata": {
"description": "Specifies the tenantId of the database principal"
}
},
"principalTypeForDatabase": {
"type": "string",
"defaultValue": "App",
"metadata": {
"description": "Specifies the principal type. It can be 'User', 'App', 'Group'"
}
},
"location": {
"type": "string",
"defaultValue": "[resourceGroup().location]",
"metadata": {
"description": "Location for all resources."
}
}
},
"variables": {
},
"resources": [{
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces",
"name": "[parameters('eventHubNamespaceName')]",
"location": "[parameters('location')]",
"sku": {
"name": "[parameters('eventHubSku')]",
"tier": "[parameters('eventHubSku')]",
"capacity": 1
},
"properties": {
"isAutoInflateEnabled": false,
"maximumThroughputUnits": 0
}
}, {
"apiVersion": "2017-04-01",
"type": "Microsoft.EventHub/namespaces/eventhubs",
"name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"],
"properties": {
"messageRetentionInDays": 7,
"partitionCount": 1
}
}, {
"type": "Microsoft.Storage/storageAccounts",
"name": "[parameters('storageAccountName')]",
"location": "[parameters('location')]",
"apiVersion": "2018-07-01",
"sku": {
"name": "[parameters('storageAccountType')]"
},
"kind": "StorageV2",
"resources": [
{
"name": "[concat('default/', parameters('containerName'))]",
"type": "blobServices/containers",
"apiVersion": "2018-07-01",
"dependsOn": [
"[parameters('storageAccountName')]"
],
"properties": {
"publicAccess": "None"
}
}
],
"properties": {}
}, {
"name": "[parameters('kustoClusterName')]",
"type": "Microsoft.Kusto/clusters",
"sku": {
"name": "Standard_D13_v2",
"tier": "Standard",
"capacity": 2
},
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"tags": {
"Created By": "GitHub quickstart template"
}
}, {
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]",
"type": "Microsoft.Kusto/clusters/databases",
"apiVersion": "2019-09-07",
"location": "[parameters('location')]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"softDeletePeriodInDays": 365,
"hotCachePeriodInDays": 31
}
}, {
"type": "Microsoft.Kusto/Clusters/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"],
"properties": {
"principalId": "[parameters('principalIdForCluster')]",
"role": "[parameters('roleForClusterPrincipal')]",
"tenantId": "[parameters('tenantIdForClusterPrincipal')]",
"principalType": "[parameters('principalTypeForCluster')]"
}
}, {
"type": "Microsoft.Kusto/Clusters/Databases/principalAssignments",
"apiVersion": "2019-11-09",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]",
"dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"],
"properties": {
"principalId": "[parameters('principalIdForDatabase')]",
"role": "[parameters('roleForDatabasePrincipal')]",
"tenantId": "[parameters('tenantIdForDatabasePrincipal')]",
"principalType": "[parameters('principalTypeForDatabase')]"
}
}
]
}
Code example
The following code example gives you a step-by-step process that results in data ingestion into Azure Data Explorer.
You first create a resource group. You also create Azure resources such as a storage account and container, an event hub, and an Azure Data Explorer cluster and database, and add principals. You then create an Azure Event Grid subscription, along with a table and column mapping, in the Azure Data Explorer database. Finally, you create the data connection to configure Azure Data Explorer to ingest data from the new storage account.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
string location = "West Europe";
string locationSmallCase = "westeurope";
string azureResourceTemplatePath = @"xxxxxxxxx\template.json";//Path to the Azure Resource Manager template JSON from the previous section
string deploymentName = "e2eexample";
string resourceGroupName = deploymentName + "resourcegroup";
string eventHubName = deploymentName + "eventhub";
string eventHubNamespaceName = eventHubName + "ns";
string storageAccountName = deploymentName + "storage";
string storageContainerName = deploymentName + "storagecontainer";
string eventGridSubscriptionName = deploymentName + "eventgrid";
string kustoClusterName = deploymentName + "kustocluster";
string kustoDatabaseName = deploymentName + "kustodatabase";
string kustoTableName = "Events";
string kustoColumnMappingName = "Events_CSV_Mapping";
string kustoDataConnectionName = deploymentName + "kustoeventgridconnection";
//principals
string principalIdForCluster = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
string roleForClusterPrincipal = "AllDatabasesAdmin";
string tenantIdForClusterPrincipal = tenantId;
string principalTypeForCluster = "App";
string principalIdForDatabase = "xxxxxxxx@xxxxxxxx.com";//User Email
string roleForDatabasePrincipal = "Admin";
string tenantIdForDatabasePrincipal = tenantId;
string principalTypeForDatabase = "User";
var serviceCreds = await ApplicationTokenProvider.LoginSilentAsync(tenantId, clientId, clientSecret);
var resourceManagementClient = new ResourceManagementClient(serviceCreds);
Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");
resourceManagementClient.SubscriptionId = subscriptionId;
await resourceManagementClient.ResourceGroups.CreateOrUpdateAsync(resourceGroupName,
new ResourceGroup() { Location = locationSmallCase });
Console.WriteLine(
"Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");
var parameters = new Dictionary<string, Dictionary<string, object>>();
parameters["eventHubNamespaceName"] = new Dictionary<string, object>(capacity: 1) {{"value", eventHubNamespaceName}};
parameters["eventHubName"] = new Dictionary<string, object>(capacity: 1) {{"value", eventHubName }};
parameters["storageAccountName"] = new Dictionary<string, object>(capacity: 1) {{"value", storageAccountName }};
parameters["containerName"] = new Dictionary<string, object>(capacity: 1) {{"value", storageContainerName }};
parameters["kustoClusterName"] = new Dictionary<string, object>(capacity: 1) {{"value", kustoClusterName }};
parameters["kustoDatabaseName"] = new Dictionary<string, object>(capacity: 1) {{"value", kustoDatabaseName }};
parameters["principalIdForCluster"] = new Dictionary<string, object>(capacity: 1) {{"value", principalIdForCluster }};
parameters["roleForClusterPrincipal"] = new Dictionary<string, object>(capacity: 1) {{"value", roleForClusterPrincipal }};
parameters["tenantIdForClusterPrincipal"] = new Dictionary<string, object>(capacity: 1) {{"value", tenantIdForClusterPrincipal }};
parameters["principalTypeForCluster"] = new Dictionary<string, object>(capacity: 1) {{"value", principalTypeForCluster }};
parameters["principalIdForDatabase"] = new Dictionary<string, object>(capacity: 1) {{"value", principalIdForDatabase }};
parameters["roleForDatabasePrincipal"] = new Dictionary<string, object>(capacity: 1) {{"value", roleForDatabasePrincipal }};
parameters["tenantIdForDatabasePrincipal"] = new Dictionary<string, object>(capacity: 1) {{"value", tenantIdForDatabasePrincipal }};
parameters["principalTypeForDatabase"] = new Dictionary<string, object>(capacity: 1) {{"value", principalTypeForDatabase }};
string template = File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8);
await resourceManagementClient.Deployments.CreateOrUpdateAsync(resourceGroupName, deploymentName,
new Deployment(new DeploymentProperties(DeploymentMode.Incremental, template: template,
parameters: parameters)));
Console.WriteLine(
"Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");
var eventGridClient = new EventGridManagementClient(serviceCreds)
{
SubscriptionId = subscriptionId
};
string storageResourceId = $"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}";
string eventHubResourceId = $"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}";
await eventGridClient.EventSubscriptions.CreateOrUpdateAsync(storageResourceId, eventGridSubscriptionName,
new EventSubscription()
{
Destination = new EventHubEventSubscriptionDestination(eventHubResourceId),
Filter = new EventSubscriptionFilter()
{
SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}",
IncludedEventTypes = new List<string>(){"Microsoft.Storage.BlobCreated"}
}
});
Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");
var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
InitialCatalog = kustoDatabaseName,
FederatedSecurity = true,
ApplicationClientId = clientId,
ApplicationKey = clientSecret,
Authority = tenantId
};
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
kustoTableName,
new[]
{
Tuple.Create("EventTime", "System.DateTime"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("EventSummary", "System.String"),
});
kustoClient.ExecuteControlCommand(command);
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
kustoTableName,
kustoColumnMappingName,
new ColumnMapping[] {
new ColumnMapping() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
});
kustoClient.ExecuteControlCommand(command);
}
Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");
var kustoManagementClient = new KustoManagementClient(serviceCreds)
{
SubscriptionId = subscriptionId
};
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, kustoClusterName,
kustoDatabaseName, dataConnectionName: kustoDataConnectionName, new EventGridDataConnection(storageResourceId, eventHubResourceId, consumerGroup: "$Default", location: location, tableName:kustoTableName, mappingRuleName: kustoColumnMappingName, dataFormat: "csv"));
| Setting | Field description |
|---|---|
| tenantId | Your tenant ID. It's also known as a directory ID. |
| subscriptionId | The subscription ID that you use for resource creation. |
| clientId | The client ID of the application that can access resources in your tenant. |
| clientSecret | The client secret of the application that can access resources in your tenant. |
Test the code example
Upload a file into the storage account.
string storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.windows.net"; var cloudStorageAccount = CloudStorageAccount.Parse(storageConnectionString); CloudBlobClient blobClient = cloudStorageAccount.CreateCloudBlobClient(); CloudBlobContainer container = blobClient.GetContainerReference(storageContainerName); CloudBlockBlob blockBlob = container.GetBlockBlobReference("test.csv"); var blobContent = @"2007-01-01 00:00:00.0000000,2592,Several trees down 2007-01-01 00:00:00.0000000,4171,Winter Storm"; await blockBlob.UploadTextAsync(blobContent);Setting Field description storageConnectionString The connection string of the programmatically created storage account. Run a test query in Azure Data Explorer.
var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net"; var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri) { InitialCatalog = kustoDatabaseName, FederatedSecurity = true, ApplicationClientId = clientId, ApplicationKey = clientSecret, Authority = tenantId }; using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder)) { var query = $"{kustoTableName} | take 10"; using (var reader = kustoClient.ExecuteQuery(query) as DataTableReader2) {// Print the contents of each of the result sets. while (reader.Read()) { Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}"); } } }
Clean up resources
To delete the resource group and clean up resources, use the following command:
await resourceManagementClient.ResourceGroups.DeleteAsync(resourceGroupName);
Next steps
- To learn about other ways to create a cluster and database, see Create an Azure Data Explorer cluster and database.
- To learn more about ingestion methods, see Azure Data Explorer data ingestion.
- To learn about the web application, see Quickstart: Query data in the Azure Data Explorer web UI.
- Write queries with Kusto Query Language.