Ingérer des données à l’aide du Kit de développement logiciel (SDK) .NET Kusto

Il existe deux bibliothèques clientes pour .NET : une bibliothèque d’ingestion et une bibliothèque de données. Pour plus d’informations sur le SDK .NET, consultez À propos du SDK .NET. Ces bibliothèques vous permettent d’ingérer (charger) des données dans un cluster et d’interroger les données de votre code. Dans cet article, vous allez d’abord créer une table et un mappage de données dans un cluster de test. Ensuite, vous allez mettre en file d’attente l’ingestion sur le cluster et valider les résultats.

Prérequis

Installer la bibliothèque d’ingestion

Install-Package Microsoft.Azure.Kusto.Ingest

Ajouter une authentification et construire une chaîne de connexion

Authentification

Pour authentifier une application, le Kit de développement logiciel (SDK) utilise votre ID de locataire Microsoft Entra. Pour trouver votre ID de locataire, utilisez l’URL suivante en remplaçant YourDomain par votre domaine.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Par exemple, si votre domaine est contoso.com, l’URL est : https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Cliquez sur cette URL pour voir les résultats. La première ligne est la suivante.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Ici, l’ID de locataire est 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Cet exemple utilise une authentification utilisateur Microsoft Entra interactive pour accéder au cluster. Vous pouvez également utiliser Microsoft Entra’authentification d’application avec un certificat ou un secret d’application. Veillez à définir les valeurs appropriées pour tenantId et clusterUri avant d’exécuter ce code.

Le Kit de développement logiciel (SDK) offre un moyen pratique de configurer la méthode d’authentification dans le cadre de la chaîne de connexion. Pour obtenir une documentation complète sur les chaînes de connexion, consultez Chaînes de connexion.

Notes

La version actuelle du SDK ne prend pas en charge l’authentification utilisateur interactive sur .NET Core. Si nécessaire, utilisez Microsoft Entra nom d’utilisateur/mot de passe ou l’authentification de l’application à la place.

Construire la chaîne de connexion

Vous pouvez maintenant construire le chaîne de connexion. Vous allez créer la table de destination et le mappage dans une étape ultérieure.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Définir les informations du fichier source

Définissez le chemin du fichier source. Cet exemple utilise un exemple de fichier hébergé sur Stockage Blob Azure. L’exemple de jeu de données StormEvents contient des données météorologiques provenant des Centres nationaux d’information sur l’environnement.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Créer une table sur votre cluster de test

Créez une table nommée StormEvents qui correspond au schéma des données dans le fichier StormEvents.csv.

Conseil

Les extraits de code suivants créent une instance d’un client pour presque tous les appels. Cela permet de rendre chaque extrait de code exécutable individuellement. En production, les instances de client sont réentrantes et doivent être conservées aussi longtemps que nécessaire. Une seule instance de client par URI suffit, même quand vous utilisez plusieurs bases de données (la base de données peut être spécifiée au niveau de la commande).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Définir le mappage d’ingestion

Mappez les données CSV entrantes aux noms de colonnes utilisés lors de la création de la table. Provisionnez un objet de mappage de colonne CSV sur cette table.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Définir la stratégie de traitement par lots pour votre table

Le traitement par lot des données entrantes optimise la taille de partition de données qui est contrôlée par la stratégie de traitement par lot de l’ingestion. Modifiez la stratégie avec la commande de gestion de la stratégie d’ingestion par lot. Utilisez cette stratégie pour réduire la latence des données dont l’arrivée est lente.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Nous vous recommandons de définir une valeur Raw Data Size pour les données ingérées et de diminuer de façon incrémentielle la taille vers 250 Mo, tout en vérifiant si les performances s’améliorent.

Vous pouvez utiliser la propriété Flush Immediately pour ignorer le traitement par lot, même si cela n’est pas recommandé pour l’ingestion à grande échelle, car cela peut entraîner des performances médiocres.

Mettre en file d’attente un message pour l’ingestion

Mettez en file d’attente un message pour extraire les données du stockage d’objets blob et ingérer les données. Une connexion est établie avec le cluster d’ingestion, et un autre client est créé pour utiliser ce point de terminaison.

Conseil

Les extraits de code suivants créent une instance d’un client pour presque tous les appels. Cela permet de rendre chaque extrait de code exécutable individuellement. En production, les instances de client sont réentrantes et doivent être conservées aussi longtemps que nécessaire. Une seule instance de client par URI suffit, même quand vous utilisez plusieurs bases de données (la base de données peut être spécifiée au niveau de la commande).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Vérifier que les données ont été ingérées dans la table

Attendez cinq à dix minutes que l’ingestion mise en file d’attente planifie l’ingestion et charge les données dans votre cluster. Exécutez ensuite le code suivant pour obtenir le nombre d’enregistrements de la table StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Exécuter des requêtes de dépannage

Connectez-vous à https://dataexplorer.azure.com et à votre cluster. Exécutez la commande suivante dans votre base de données pour voir si des échecs d’ingestion se sont produits ces quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Exécutez la commande suivante pour voir l’état de toutes les opérations d’ingestion des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Nettoyer les ressources

Si vous envisagez de suivre nos autres articles, conservez les ressources que vous avez créées. Dans le cas contraire, exécutez la commande suivante dans votre base de données pour nettoyer la table StormEvents.

.drop table StormEvents