Kusto.Ingest ingestion code examples
This collection of short code snippets demonstrates various techniques of ingesting data into a Kusto table.
Note
These examples look as if the ingest client is destroyed immediately following the ingestion. Do not take this literally. Ingest clients are reentrant and thread-safe, and should not be created in large numbers. The recommended cardinality of ingest client instances is one per hosting process, per target Kusto cluster.
Async ingestion from a single Azure blob
Use KustoQueuedIngestClient, with optional RetryPolicy, for async ingestion from a single Azure blob.
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM =
new KustoConnectionStringBuilder(@"https://ingest-{clusterNameAndRegion}.kusto.windows.net").WithAadApplicationKeyAuthentication(
applicationClientId: "{Application Client ID}",
applicationKey: "{Application Key (secret)}",
authority: "{AAD TenantID or name}");
// Create an ingest client
// Note, that creating a separate instance per ingestion operation is an anti-pattern.
// IngestClient classes are thread-safe and intended for reuse
IKustoIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties(databaseName: "myDB", tableName: "myTable");
var sourceOptions = new StorageSourceOptions() { DeleteSourceOnSuccess = true };
//// Create your custom implementation of IRetryPolicy, which will affect how the ingest client handles retrying on transient failures
IRetryPolicy retryPolicy = new NoRetry();
//// This line sets the retry policy on the ingest client that will be enforced on every ingest call from here on
((IKustoQueuedIngestClient)client).QueueOptions.QueueRequestOptions.RetryPolicy = retryPolicy;
await client.IngestFromStorageAsync(uri: @"BLOB-URI-WITH-SAS-KEY", ingestionProperties: kustoIngestionProperties, sourceOptions);
client.Dispose();
Ingest from local file
Use KustoDirectIngestClient to ingest from a local file.
Note
We recommend this method for limited volume and low frequency ingestion.
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderEngine =
new KustoConnectionStringBuilder(@"https://{clusterNameAndRegion}.kusto.windows.net").WithAadApplicationKeyAuthentication(
applicationClientId: "{Application Client ID}",
applicationKey: "{Application Key (secret)}",
authority: "{AAD TenantID or name}");
// Create a disposable client that will execute the ingestion
using (IKustoIngestClient client = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilderEngine))
{
//Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties(databaseName: "myDB", tableName: "myTable");
client.IngestFromStorageAsync(@"< Path to local file >", ingestionProperties: kustoIngestionProperties).GetAwaiter().GetResult();
}
Ingest from local files and validate ingestion
Use KustoQueuedIngestClient to ingest from local files and then validate the ingestion.
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM =
new KustoConnectionStringBuilder(@"https://ingest-{clusterNameAndRegion}.kusto.windows.net").WithAadApplicationKeyAuthentication(
applicationClientId: "{Application Client ID}",
applicationKey: "{Application Key (secret)}",
authority: "{AAD TenantID or name}");
// Create a disposable client that will execute the ingestion
IKustoQueuedIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from files according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties(databaseName: "myDB", tableName: "myTable");
client.IngestFromStorageAsync(@"ValidTestFile.csv", kustoIngestionProperties);
client.IngestFromStorageAsync(@"InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = client.PeekTopIngestionFailuresAsync().GetAwaiter().GetResult();
Ensure.IsTrue((ingestionFailures.Count() > 0), "Failures expected");
// Retrieve, delete and validate failures
ingestionFailures = client.GetAndDiscardTopIngestionFailuresAsync().GetAwaiter().GetResult();
Ensure.IsTrue((ingestionFailures.Count() > 0), "Failures expected");
// Dispose of the client
client.Dispose();
Ingest from local files and report status to a queue
Use KustoQueuedIngestClient to ingest from local files and then report the status to a queue.
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM =
new KustoConnectionStringBuilder(@"https://ingest-{clusterNameAndRegion}.kusto.windows.net").WithAadApplicationKeyAuthentication(
applicationClientId: "{Application Client ID}",
applicationKey: "{Application Key (secret)}",
authority: "{AAD TenantID or name}");
// Create a disposable client that will execute the ingestion
IKustoQueuedIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties(databaseName: "myDB", tableName: "myTable")
{
// Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
// (Rather than the default "FailuresOnly" level - which is demonstrated in the
// 'Ingest From Local File(s) using KustoQueuedIngestClient and Ingestion Validation' section)
ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
// Choose the report method of choice. 'Queue' is the default method.
// For the sake of the example, we will choose it anyway.
ReportMethod = IngestionReportMethod.Queue
};
client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = client.PeekTopIngestionFailures().GetAwaiter().GetResult();
Ensure.IsTrue((ingestionFailures.Count() > 0), "The failed ingestion should have been reported to the failed ingestions queue");
// Retrieve, delete and validate failures
ingestionFailures = client.GetAndDiscardTopIngestionFailures().GetAwaiter().GetResult();
Ensure.IsTrue((ingestionFailures.Count() > 0), "The failed ingestion should have been reported to the failed ingestions queue");
// Verify the success has also been reported to the queue
var ingestionSuccesses = client.GetAndDiscardTopIngestionSuccesses().GetAwaiter().GetResult();
Ensure.ConditionIsMet((ingestionSuccesses.Count() > 0),
"The successful ingestion should have been reported to the successful ingestions queue");
// Dispose of the client
client.Dispose();
Ingest from local files and report status to a table
Use KustoQueuedIngestClient to ingest from local files and report status to a table.
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM =
new KustoConnectionStringBuilder(@"https://ingest-{clusterNameAndRegion}.kusto.windows.net").WithAadApplicationKeyAuthentication(
applicationClientId: "{Application Client ID}",
applicationKey: "{Application Key (secret)}",
authority: "{AAD TenantID or name}");
// Create a disposable client that will execute the ingestion
IKustoQueuedIngestClient client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties(databaseName: "myDB", tableName: "myDB")
{
// Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
// (Rather than the default "FailuresOnly" level)
ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
// Choose the report method of choice
ReportMethod = IngestionReportMethod.Table
};
var filePath = @"< Path to file >";
var fileIdentifier = Guid.NewGuid();
var fileDescription = new FileDescription() { FilePath = filePath, SourceId = fileIdentifier };
var sourceOptions = new StorageSourceOptions() { SourceId = fileDescription.SourceId.Value };
// Execute the ingest operation and save the result.
var clientResult = await client.IngestFromStorageAsync(fileDescription.FilePath,
ingestionProperties: kustoIngestionProperties, sourceOptions);
// Use the fileIdentifier you supplied to get the status of your ingestion
var ingestionStatus = clientResult.GetIngestionStatusBySourceId(fileIdentifier);
while (ingestionStatus.Status == Status.Pending)
{
// Wait a minute...
Thread.Sleep(TimeSpan.FromMinutes(1));
// Try again
ingestionStatus = clientResult.GetIngestionStatusBySourceId(fileIdentifier);
}
// Verify the results of the ingestion
Ensure.ConditionIsMet(ingestionStatus.Status == Status.Succeeded,
"The file should have been ingested successfully");
// Dispose of the client
client.Dispose();
Next steps
Feedback
Submit and view feedback for