Compartir vía


Ejemplos de código de ingesta de Kusto.Ingest

Esta colección de fragmentos de código cortos muestra varias técnicas de ingesta de datos en una tabla de Kusto.

Nota:

Estos ejemplos parecen como si el cliente de ingesta se destruye inmediatamente después de la ingesta. No tomes esto literalmente. Los clientes de ingesta son reentrantes y seguros para subprocesos, y no deben crearse en números grandes. La cardinalidad recomendada de las instancias de cliente de ingesta es una por proceso de hospedaje, por clúster de Kusto de destino.

Ingesta asincrónica desde un único blob de Azure

Use KustoQueuedIngestClient, con retryPolicy opcional, para la ingesta asincrónica desde un único blob de Azure.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create an ingest client
// Note, that creating a separate instance per ingestion operation is an anti-pattern.
// IngestClient classes are thread-safe and intended for reuse
using var client = KustoIngestFactory.CreateQueuedIngestClient(
    kustoConnectionStringBuilderDM,
    // Create your custom retry policy, which will affect how the ingest client handles retrying on transient failures
    new QueueOptions { MaxRetries = 0 }
);
var blobUriWithSasKey = "<blobUriWithSasKey>";
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
var sourceOptions = new StorageSourceOptions { DeleteSourceOnSuccess = true };
await client.IngestFromStorageAsync(blobUriWithSasKey, kustoIngestionProperties, sourceOptions);

Ingesta desde el archivo local

Use KustoDirectIngestClient para ingerir desde un archivo local.

Nota:

Se recomienda este método para la ingesta limitada de volumen y baja frecuencia.

var kustoUri = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderEngine = new KustoConnectionStringBuilder(kustoUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilderEngine);
//Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
await client.IngestFromStorageAsync("<filePath>", kustoIngestionProperties);

Ingesta desde archivos locales y validación de la ingesta

Use KustoQueuedIngestClient para ingerir archivos locales y, a continuación, validar la ingesta.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
await client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
await client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "Failures expected");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "Failures expected");

Ingesta de archivos locales y estado de informe en una cola

Use KustoQueuedIngestClient para ingerir archivos locales y, a continuación, notificar el estado a una cola.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties("<databaseName>", "<tableName>")
{
    // Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
    // (Rather than the default "FailuresOnly" level - which is demonstrated in the
    // 'Ingest From Local File(s) using KustoQueuedIngestClient and Ingestion Validation' section)
    ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
    // Choose the report method of choice. 'Queue' is the default method.
    // For the sake of the example, we will choose it anyway. 
    ReportMethod = IngestionReportMethod.Queue
};
await client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
await client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Verify the success has also been reported to the queue
var ingestionSuccesses = await client.GetAndDiscardTopIngestionSuccessesAsync();
Ensure.ConditionIsMet(ingestionSuccesses.Any(), "The successful ingestion should have been reported to the successful ingestions queue");

Ingesta de archivos locales y estado de informe en una tabla

Use KustoQueuedIngestClient para ingerir archivos locales y notificar el estado a una tabla.

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
    .WithAadApplicationKeyAuthentication(
        applicationClientId: appId,
        applicationKey: appKey,
        authority: appTenant
    );
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties("<databaseName>", "<tableName>")
{
    // Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
    // (Rather than the default "FailuresOnly" level)
    ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
    // Choose the report method of choice
    ReportMethod = IngestionReportMethod.Table
};
var filePath = "<filePath>";
var fileIdentifier = Guid.NewGuid();
var sourceOptions = new StorageSourceOptions { SourceId = fileIdentifier };
// Execute the ingest operation and save the result.
var clientResult = await client.IngestFromStorageAsync(filePath, kustoIngestionProperties, sourceOptions);
// Use the fileIdentifier you supplied to get the status of your ingestion 
var ingestionStatus = clientResult.Result.GetIngestionStatusBySourceId(fileIdentifier);
while (ingestionStatus.Status == Status.Pending)
{
    // Wait a minute...
    Thread.Sleep(TimeSpan.FromMinutes(1));
    // Try again
    ingestionStatus = clientResult.Result.GetIngestionStatusBySourceId(fileIdentifier);
}
// Verify the results of the ingestion
Ensure.ConditionIsMet(ingestionStatus.Status == Status.Succeeded, "The file should have been ingested successfully");