Ingestion without Kusto.Ingest Library
The Kusto.Ingest library is preferred for ingesting data to Azure Data Explorer. However, you can still achieve almost the same functionality, without being dependent on the Kusto.Ingest package. This article shows you how, by using Queued Ingestion to Azure Data Explorer for production-grade pipelines.
Note
The code below is written in C#, and makes use of the Azure Storage SDK, the ADAL Authentication library, and the NewtonSoft.JSON package, to simplify the sample code. If needed, the corresponding code can be replaced with appropriate Azure Storage REST API calls, non-.NET ADAL package, and any available JSON handling package.
This article deals with the recommended mode of ingestion. For the Kusto.Ingest library, its corresponding entity is the IKustoQueuedIngestClient interface. Here, the client code interacts with the Azure Data Explorer service by posting ingestion notification messages to an Azure queue. References to the messages are obtained from the Kusto Data Management (also known as the Ingestion) service. Interaction with the service must be authenticated with Azure Active Directory (Azure AD).
The following code shows how the Kusto Data Management service handles queued data ingestion without using the Kusto.Ingest library. This example may be useful if full .NET is inaccessible or unavailable because of the environment, or other restrictions.
The code includes the steps to create an Azure Storage client and upload the data to a blob. Each step is described in greater detail, after the sample code.
- Obtain an authentication token for accessing the Azure Data Explorer ingestion service
- Query the Azure Data Explorer ingestion service to obtain:
- Upload data to a blob on one of the blob containers obtained from Kusto in (2)
- Compose an ingestion message that identifies the target database and table and that points to the blob from (3)
- Post the ingestion message we composed in (4) to an ingestion queue obtained from Azure Data Explorer in (2)**
- Retrieve any error found by the service during ingestion
// A container class for ingestion resources we are going to obtain from Azure Data Explorer
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
string DmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
string bearerToken = AuthenticateInteractiveUser(DmServiceBaseUri);
// 2a. Retrieve ingestion resources
IngestionResourcesSnapshot ingestionResources = RetrieveIngestionResources(DmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
string identityToken = RetrieveKustoIdentityToken(DmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers we got from Azure Data Explorer.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
long blobSizeBytes = 0;
string blobName = $"TestData{DateTime.UtcNow.ToString("yyyy-MM-dd_HH-mm-ss.FFF")}";
string blobUriWithSas = UploadFileToBlobContainer(file, ingestionResources.TempStorageContainers.First(),
"temp001", blobName, out blobSizeBytes);
// 4. Compose ingestion command
string ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the ingestion queues we got from Azure Data Explorer.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
Using Queued Ingestion to Azure Data Explorer for production-grade pipelines
Obtain authentication evidence from Azure AD
Here we use ADAL to obtain an Azure AD token to access the Kusto Data Management service and ask for its input queues. ADAL is available on non-Windows platforms if needed.
// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create Auth Context for MSFT Azure AD:
AuthenticationContext authContext = new AuthenticationContext("https://login.microsoftonline.com/{Azure AD Tenant ID or name}");
// Acquire user token for the interactive user for Azure Data Explorer:
AuthenticationResult result =
authContext.AcquireTokenAsync(resource, "<your client app ID>", new Uri(@"<your client app URI>"),
new PlatformParameters(PromptBehavior.Auto), UserIdentifier.AnyUser, "prompt=select_account").Result;
return result.AccessToken;
}
Retrieve Azure Data Explorer ingestion resources
Manually construct an HTTP POST request to the Data Management service, requesting the return of the ingestion resources. These resources include queues that the DM service is listening on, and blob containers for data uploading. The Data Management service will process any messages containing ingestion requests that arrive on one of those queues.
// Retrieve ingestion resources (queues and blob containers) with SAS from specified Azure Data Explorer Ingestion service using supplied Access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
string requestBody = $"{{ \"csl\": \".get ingestion resources\" }}";
IngestionResourcesSnapshot ingestionResources = new IngestionResourcesSnapshot();
using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
using (StreamReader sr = new StreamReader(response.GetResponseStream()))
using (JsonTextReader jtr = new JsonTextReader(sr))
{
JObject responseJson = JObject.Load(jtr);
IEnumerable<JToken> tokens;
// Input queues
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string) token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
}
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
WebRequest request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
Stream bodyStream = request.GetRequestStream();
using (StreamWriter sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
Obtain a Kusto identity token
Ingest messages are handed off to Azure Data Explorer via a non-direct channel (Azure queue), making it impossible to do in-band authorization validation for accessing the Azure Data Explorer ingestion service. The solution is to attach an identity token to every ingest message. The token enables in-band authorization validation. This signed token can then be validated by the Azure Data Explorer service when it receives the ingestion message.
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
string requestBody = $"{{ \"csl\": \".get kusto identity token\" }}";
string jsonPath = "Tables[0].Rows[*].[0]";
using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
using (StreamReader sr = new StreamReader(response.GetResponseStream()))
using (JsonTextReader jtr = new JsonTextReader(sr))
{
JObject responseJson = JObject.Load(jtr);
JToken identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return ((string)identityToken);
}
}
Upload data to the Azure Blob container
This step is about uploading a local file to an Azure Blob that will be handed off for ingestion. This code uses the Azure Storage SDK. If dependency isn't possible, it can be achieved with Azure Blob Service REST API.
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string containerName, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
CloudBlobContainer blobContainer = new CloudBlobContainer(blobUri);
CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(blobName);
using (Stream stream = File.OpenRead(filePath))
{
blockBlob.UploadFromStream(stream);
blobSize = blockBlob.Properties.Length;
}
return string.Format("{0}{1}", blockBlob.Uri.AbsoluteUri, blobUri.Query);
}
Compose the Azure Data Explorer ingestion message
The NewtonSoft.JSON package will again compose a valid ingestion request to identify the target database and table, and that points to the blob. The message will be posted to the Azure Queue that the relevant Kusto Data Management service is listening on.
Here are some points to consider.
- This request is the bare minimum for the ingestion message.
Note
The identity token is mandatory and must be part of the AdditionalProperties JSON object.
- Whenever necessary, CsvMapping or JsonMapping properties must be provided as well
- For more information, see the article on ingestion mapping pre-creation.
- Section Ingestion message internal structure provides an explanation of the ingestion message structure
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject();
message.Add("Id", Guid.NewGuid().ToString());
message.Add("BlobPath", dataUri);
message.Add("RawDataSize", blobSizeBytes);
message.Add("DatabaseName", db);
message.Add("TableName", table);
message.Add("RetainBlobOnSuccess", true); // Do not delete the blob on success
message.Add("FlushImmediately", true); // Do not aggregate
message.Add("ReportLevel", 2); // Report failures and successes (might incur perf overhead)
message.Add("ReportMethod", 0); // Failures are reported to an Azure Queue
message.Add("AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("jsonMappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "json")));
return message.ToString();
}
Post the Azure Data Explorer ingestion message to the Azure Data Explorer ingestion queue
Finally, post the message that you constructed, to the selected ingestion queue that you obtained from Azure Data Explorer.
Note
.Net storage client versions below v12, by default, encode the message to base64 For more information, see storage docs. If you are using .Net storage client versions above v12, you must properly encode the message content.
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
CloudQueueMessage queueMessage = new CloudQueueMessage(message);
queue.AddMessage(queueMessage, null, null, null, null);
}
Check for error messages from the Azure queue
After ingestion, we check for failure messages from the relevant queue that the Data Management writes to. For more information on the failure message structure, see Ingestion failure message structure.
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
List<string> messages = Enumerable.Empty<string>().ToList();
CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
var messagesFromQueue = queue.GetMessages(count);
foreach (var m in messagesFromQueue)
{
messages.Add(m.AsString);
queue.DeleteMessage(m);
}
return messages;
}
Ingestion messages - JSON document formats
Ingestion message internal structure
The message that the Kusto Data Management service expects to read from the input Azure Queue is a JSON document in the following format.
{
"Id" : "<Id>",
"BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
| Property | Description |
|---|---|
| Id | Message identifier (GUID) |
| BlobPath | Path (URI) to the blob, including the SAS key granting Azure Data Explorer permissions to read/write/delete it. Permissions are required so that Azure Data Explorer can delete the blob once it has completed ingesting the data |
| RawDataSize | Size of the uncompressed data in bytes. Providing this value enables Azure Data Explorer to optimize ingestion by potentially aggregating multiple blobs. This property is optional, but if not given, Azure Data Explorer will access the blob just to retrieve the size |
| DatabaseName | Target database name |
| TableName | Target table name |
| RetainBlobOnSuccess | If set to true, the blob won't be deleted once ingestion is successfully completed. Default is false |
| FlushImmediately | If set to true, any aggregation will be skipped. Default is false |
| ReportLevel | Success/Error reporting level: 0-Failures, 1-None, 2-All |
| ReportMethod | Reporting mechanism: 0-Queue, 1-Table |
| AdditionalProperties | Additional properties such as format, tags, and creationTime. For more information, see data ingestion properties. |
Ingestion failure message structure
The message that the Data Management expects to read from the input Azure Queue is a JSON document in the following format.
| Property | Description |
|---|---|
| OperationId | Operation identifier (GUID) that can be used to track the operation on the service side |
| Database | Target database name |
| Table | Target table name |
| FailedOn | Failure timestamp |
| IngestionSourceId | GUID identifying the data chunk that Azure Data Explorer failed to ingest |
| IngestionSourcePath | Path (URI) to the data chunk that Azure Data Explorer failed to ingest |
| Details | Failure message |
| ErrorCode | Azure Data Explorer error code (see all the error codes here) |
| FailureStatus | Indicates whether the failure is permanent or transient |
| RootActivityId | Azure Data Explorer correlation identifier (GUID) that can be used to track the operation on the service side |
| OriginatesFromUpdatePolicy | Indicates whether the failure was caused by an erroneous transactional update policy |
| ShouldRetry | Indicates whether the ingestion could succeed if retried as is |