Exemplos de código de ingestão kusto.ingestão

Esta coleção de snippets de código curtos demonstra várias técnicas de ingestão de dados em uma tabela Kusto.

Observação

Esses exemplos parecem que o cliente de ingestão é destruído imediatamente após a ingestão. Não leve isso literalmente. Os clientes de ingestão são reentrantes e thread-safe e não devem ser criados em grande número. A cardinalidade recomendada de instâncias de cliente de ingestão é uma por processo de hospedagem, por cluster Kusto de destino.

Ingestão assíncrona de um único blob do Azure

Use KustoQueuedIngestClient, com RetryPolicy opcional, para ingestão assíncrona de um único blob do Azure.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create an ingest client
// Note, that creating a separate instance per ingestion operation is an anti-pattern.
// IngestClient classes are thread-safe and intended for reuse
using var client = KustoIngestFactory.CreateQueuedIngestClient(
    kustoConnectionStringBuilderDM,
    // Create your custom retry policy, which will affect how the ingest client handles retrying on transient failures
    new QueueOptions { MaxRetries = 0 }
);
var blobUriWithSasKey = "<blobUriWithSasKey>";
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
var sourceOptions = new StorageSourceOptions { DeleteSourceOnSuccess = true };
await client.IngestFromStorageAsync(blobUriWithSasKey, kustoIngestionProperties, sourceOptions);

Ingestão do arquivo local

Use KustoDirectIngestClient para ingerir de um arquivo local.

Observação

Recomendamos esse método para volume limitado e ingestão de baixa frequência.

var kustoUri = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderEngine = new KustoConnectionStringBuilder(kustoUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilderEngine);
//Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
await client.IngestFromStorageAsync("<filePath>", kustoIngestionProperties);

Ingestão de arquivos locais e validação da ingestão

Use KustoQueuedIngestClient para ingerir de arquivos locais e validar a ingestão.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
await client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
await client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "Failures expected");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "Failures expected");

Ingerir de arquivos locais e relatar status para uma fila

Use KustoQueuedIngestClient para ingerir de arquivos locais e, em seguida, relatar o status para uma fila.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties("<databaseName>", "<tableName>")
{
    // Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
    // (Rather than the default "FailuresOnly" level - which is demonstrated in the
    // 'Ingest From Local File(s) using KustoQueuedIngestClient and Ingestion Validation' section)
    ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
    // Choose the report method of choice. 'Queue' is the default method.
    // For the sake of the example, we will choose it anyway. 
    ReportMethod = IngestionReportMethod.Queue
};
await client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
await client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Verify the success has also been reported to the queue
var ingestionSuccesses = await client.GetAndDiscardTopIngestionSuccessesAsync();
Ensure.ConditionIsMet(ingestionSuccesses.Any(), "The successful ingestion should have been reported to the successful ingestions queue");

Ingerir de arquivos locais e status de relatório para uma tabela

Use KustoQueuedIngestClient para ingerir de arquivos locais e relatar status a uma tabela.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties("<databaseName>", "<tableName>")
{
    // Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
    // (Rather than the default "FailuresOnly" level)
    ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
    // Choose the report method of choice
    ReportMethod = IngestionReportMethod.Table
};
var filePath = "<filePath>";
var fileIdentifier = Guid.NewGuid();
var sourceOptions = new StorageSourceOptions { SourceId = fileIdentifier };
// Execute the ingest operation and save the result.
var clientResult = await client.IngestFromStorageAsync(filePath, kustoIngestionProperties, sourceOptions);
// Use the fileIdentifier you supplied to get the status of your ingestion 
var ingestionStatus = clientResult.Result.GetIngestionStatusBySourceId(fileIdentifier);
while (ingestionStatus.Status == Status.Pending)
{
    // Wait a minute...
    Thread.Sleep(TimeSpan.FromMinutes(1));
    // Try again
    ingestionStatus = clientResult.Result.GetIngestionStatusBySourceId(fileIdentifier);
}
// Verify the results of the ingestion
Ensure.ConditionIsMet(ingestionStatus.Status == Status.Succeeded, "The file should have been ingested successfully");