Ingesta de datos mediante el SDK de .NET de Kusto

Hay dos bibliotecas cliente para .NET: una biblioteca de ingesta y una biblioteca de datos. Para más información sobre el SDK para .NET, consulte la documentación sobre el SDK para .NET. Estas bibliotecas permiten ingerir (cargar) datos en un clúster y consultar datos desde el código. En este artículo, primero creará una tabla y la asignación de datos en un clúster de prueba. A continuación, pondrá en cola la ingesta en el clúster y validará los resultados.

Requisitos previos

  • Una cuenta de Microsoft o una identidad de usuario Microsoft Entra. No se necesita una suscripción a Azure.
  • Un clúster y una base de datos. Cree un clúster y una base de datos.

Instalación de la biblioteca de ingesta

Install-Package Microsoft.Azure.Kusto.Ingest

Adición de autenticación y creación de la cadena de conexión

Authentication

Para autenticar una aplicación, el SDK usa el identificador de inquilino de Microsoft Entra. Para buscar el identificador de inquilino, use la dirección URL siguiente, sustituyendo su dominio por SuDominio.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por ejemplo, si el nombre de dominio es contoso.com, la dirección URL es: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Haga clic en esta dirección URL para ver los resultados. la primera línea es como sigue.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

En este caso es el id. de inquilino es 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

En este ejemplo se usa una autenticación interactiva Microsoft Entra usuario para acceder al clúster. También puede usar Microsoft Entra autenticación de aplicaciones con certificado o secreto de aplicación. Asegúrese de establecer los valores de tenantId y clusterUri antes de ejecutar este código.

El SDK proporciona una manera cómoda de configurar el método de autenticación como parte del cadena de conexión. Para obtener documentación completa sobre las cadenas de conexión, consulte Cadenas de conexión.

Nota

La versión actual del SDK no admite la autenticación interactiva de usuarios en .NET Core. Si es necesario, use Microsoft Entra autenticación de nombre de usuario o contraseña o aplicación en su lugar.

Creación de la cadena de conexión

Ahora puede construir el cadena de conexión. Creará la tabla de destino y la asignación en un paso posterior.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Definición de la información del archivo de origen

Establezca la ruta de acceso del archivo de origen. Este ejemplo utiliza un archivo de ejemplo hospedado en Azure Blob Storage. El conjunto de datos de ejemplo StormEvents contiene datos relacionados con el tiempo de los Centros Nacionales de Información Ambiental.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Creación de una tabla en el clúster de prueba

Cree una tabla que denominada StormEvents que coincida con el esquema de los datos del archivo StormEvents.csv.

Sugerencia

Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definición de la asignación de ingesta

Asigna los datos de CSV entrantes a los nombres de columna utilizados al crear la tabla. Aprovisione un objeto de asignación de columnas de CSV en esa tabla.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definición de la directiva de procesamiento por lotes para la tabla

El procesamiento por lotes de datos entrantes optimiza el tamaño de la partición de datos, que está controlado por la directiva de procesamiento por lotes de ingesta. Modifique la directiva con el comando de administración de directivas de procesamiento por lotes de ingesta. Use esta directiva para reducir la latencia de los datos que llegan lentamente.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Se recomienda definir un valor Raw Data Size para los datos ingeridos y reducir de manera incremental el tamaño a 250 MB, a la vez que se comprueba si mejora el rendimiento.

Puede usar la propiedad Flush Immediately para omitir el procesamiento por lotes, aunque no se recomienda para la ingesta a gran escala, ya que puede provocar un rendimiento deficiente.

Colocación de un mensaje en cola para la ingesta

Coloca en cola un mensaje para extraer datos desde Blob Storage e ingerirlos. Se establece una conexión al clúster de ingesta y se crea otro cliente para trabajar con ese punto de conexión.

Sugerencia

Los fragmentos de código siguientes crean una instancia de un cliente para casi todas las llamadas. Esto se hace para que cada fragmento de código se pueda ejecutar de forma individual. En producción, las instancias de cliente son de reentrada y deben mantenerse todo el tiempo que sea necesario. Una única instancia de cliente por URI es suficiente, aunque se trabaje con varias bases de datos (la base de datos se puede especificar en el nivel de comando).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Validación de la ingesta de los datos en la tabla

Espere entre cinco y diez minutos para que la ingesta en cola programe la ingesta y cargue los datos en el clúster. A continuación, ejecute el siguiente código para obtener el recuento de registros de la tabla StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Ejecución de consultas de solución de problemas

Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster Ejecute el siguiente comando en la base de datos para ver si se ha producido algún error de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Ejecute el siguiente comando para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpieza de recursos

Si tiene previsto seguir nuestros otros artículos, conserve los recursos que creó. De lo contrario, ejecute el siguiente comando en la base de datos para limpiar la tabla StormEvents.

.drop table StormEvents