Comment ingérer des données avec l'API REST

La bibliothèque Kusto.Ingest est recommandée pour ingérer des données dans votre cluster. Vous pouvez cependant obtenir presque les mêmes fonctionnalités, sans être dépendant du package Kusto.Ingest. Cet article vous montre comment utiliser l’ingestion en file d’attente dans votre cluster pour les pipelines de qualité production.

Notes

Le code ci-dessous est écrit en C# et utilise le Kit de développement logiciel (SDK) Stockage Azure, la bibliothèque d’authentification Microsoft (MSAL) et le package NewtonSoft.JSON pour simplifier l’exemple de code. Si nécessaire, le code correspondant peut être remplacé par des appels d’API REST stockage Azure appropriés, non-.NET package MSAL et tout package de gestion JSON disponible.

Cet article traite du mode d’ingestion recommandé. Pour la bibliothèque Kusto.Ingest, son entité correspondante est l’interface IKustoQueuedIngestClient . Ici, le code client interagit avec votre cluster en publiant des messages de notification d’ingestion dans une file d’attente Azure. Les références aux messages sont obtenues à partir du service kusto Gestion des données (également appelé ingestion). L’interaction avec le service doit être authentifiée avec Microsoft Entra’ID.

Le code suivant montre comment le service Kusto Gestion des données gère l’ingestion des données mises en file d’attente sans utiliser la bibliothèque Kusto.Ingest. Cet exemple peut être utile si .NET complet est inaccessible ou indisponible en raison de l’environnement ou d’autres restrictions.

Le code inclut les étapes permettant de créer un client stockage Azure et de charger les données dans un objet blob. Chaque étape est décrite plus en détail, après l’exemple de code.

  1. Obtenir un jeton d’authentification pour accéder au service d’ingestion
  2. Interrogez le service d’ingestion pour obtenir :
  3. Charger des données dans un objet blob sur l’un des conteneurs d’objets blob obtenus à partir de Kusto dans (2)
  4. Composer un message d’ingestion qui identifie la base de données et la table cible et qui pointe vers l’objet blob à partir de (3)
  5. Publiez le message d’ingestion que nous avons composé dans (4) dans une file d’attente d’ingestion obtenue dans (2)
  6. Récupérer toute erreur détectée par le service pendant l’ingestion
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Utilisation de l’ingestion en file d’attente pour les pipelines de qualité production

Obtenir une preuve d’authentification à partir de l’ID Microsoft Entra

Ici, nous utilisons la bibliothèque d’authentification Microsoft (MSAL) pour obtenir un jeton Microsoft Entra pour accéder au service kusto Gestion des données et demander ses files d’attente d’entrée. MSAL est disponible sur plusieurs plateformes.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Récupérer des ressources d’ingestion

Construisez manuellement une requête HTTP POST pour le service Gestion des données, en demandant le retour des ressources d’ingestion. Ces ressources incluent les files d’attente que le service DM écoute et les conteneurs d’objets blob pour le chargement des données. Le service Gestion des données traite tous les messages contenant des demandes d’ingestion qui arrivent dans l’une de ces files d’attente.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Obtenir un jeton d’identité Kusto

Les messages d’ingestion sont transmis à votre cluster via un canal non direct (file d’attente Azure), ce qui rend impossible la validation d’autorisation inbande pour accéder au service d’ingestion. La solution consiste à attacher un jeton d’identité à chaque message d’ingestion. Le jeton active la validation d’autorisation dans la bande. Ce jeton signé peut ensuite être validé par le service d’ingestion lorsqu’il reçoit le message d’ingestion.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Charger des données dans le conteneur d’objets blob Azure

Cette étape consiste à charger un fichier local dans un objet blob Azure qui sera remis pour ingestion. Ce code utilise le Kit de développement logiciel (SDK) Stockage Azure. Si la dépendance n’est pas possible, elle peut être obtenue avec l’API REST du service Blob Azure.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Composer le message d’ingestion

Le package NewtonSoft.JSON compose à nouveau une demande d’ingestion valide pour identifier la base de données et la table cibles, et qui pointe vers l’objet blob. Le message sera publié dans la file d’attente Azure sur laquelle le service Kusto Gestion des données approprié est à l’écoute.

Voici quelques points à prendre en compte.

  • Cette requête est le strict minimum pour le message d’ingestion.

Notes

Le jeton d’identité est obligatoire et doit faire partie de l’objet JSON AdditionalProperties .

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Publier le message d’ingestion dans la file d’attente d’ingestion

Enfin, publiez le message que vous avez construit dans la file d’attente d’ingestion sélectionnée que vous avez obtenue précédemment.

Notes

Les versions clientes de stockage .Net sous v12, par défaut, encodent le message en base64 Pour plus d’informations, consultez la documentation de stockage. Si vous utilisez des versions clientes de stockage .Net au-dessus de v12, vous devez encoder correctement le contenu du message.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Rechercher les messages d’erreur de la file d’attente Azure

Après l’ingestion, nous case activée pour les messages d’échec de la file d’attente appropriée dans laquelle le Gestion des données écrit. Pour plus d’informations sur la structure des messages d’échec, consultez Structure des messages d’échec d’ingestion.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

Messages d’ingestion - Formats de document JSON

Structure interne du message d’ingestion

Le message que le service Kusto Gestion des données s’attend à lire à partir de la file d’attente Azure d’entrée est un document JSON au format suivant.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Propriété Description
Id Identificateur de message (GUID)
BlobPath Chemin d’accès (URI) à l’objet blob, y compris la clé SAS accordant des autorisations de lecture/écriture/suppression. Des autorisations sont requises pour que le service d’ingestion puisse supprimer l’objet blob une fois qu’il a terminé l’ingestion des données.
RawDataSize Taille des données non compressées en octets. Fournir cette valeur permet au service d’ingestion d’optimiser l’ingestion en agrégeant potentiellement plusieurs objets blob. Cette propriété est facultative, mais si elle n’est pas donnée, le service accède à l’objet blob uniquement pour récupérer la taille.
nom_base_de_données Nom de la base de données cible
TableName Nom de la table cible
RetainBlobOnSuccess Si la valeur trueest définie sur , l’objet blob ne sera pas supprimé une fois l’ingestion terminée. La valeur par défaut est false
Vider immédiatement Si la valeur trueest définie sur , toute agrégation est ignorée. La valeur par défaut est false
ReportLevel Niveau de rapport réussite/erreur : 0-Échecs, 1-Aucun, 2-All
ReportMethod Mécanisme de création de rapports : 0-Queue, 1-Table
AdditionalProperties D’autres propriétés telles que format, tagset creationTime. Pour plus d’informations, consultez Propriétés d’ingestion des données.

Structure des messages d’échec d’ingestion

Le message que le Gestion des données s’attend à lire à partir de la file d’attente Azure d’entrée est un document JSON au format suivant.

Propriété Description
OperationId Identificateur d’opération (GUID) qui peut être utilisé pour suivre l’opération côté service
Base de données Nom de la base de données cible
Table de charge de travail Nom de la table cible
FailedOn Horodatage de l’échec
IngestionSourceId GUID identifiant le bloc de données qui n’a pas pu être ingéré
IngestionSourcePath Chemin d’accès (URI) au bloc de données qui n’a pas pu être ingéré
Détails Message d’échec
ErrorCode Code d'erreur. Pour tous les codes d’erreur, consultez Codes d’erreur d’ingestion.
FailureStatus Indique si l’échec est permanent ou temporaire
RootActivityId Identificateur de corrélation (GUID) qui peut être utilisé pour suivre l’opération côté service
OriginatesFromUpdatePolicy Indique si l’échec a été provoqué par une stratégie de mise à jour transactionnelle erronée
ShouldRetry Indique si l’ingestion peut réussir en cas de nouvelle tentative en l’état