Comment ingérer des données avec l'API REST
La bibliothèque Kusto.Ingest est recommandée pour ingérer des données dans votre cluster. Vous pouvez cependant obtenir presque les mêmes fonctionnalités, sans être dépendant du package Kusto.Ingest. Cet article vous montre comment utiliser l’ingestion en file d’attente dans votre cluster pour les pipelines de qualité production.
Notes
Le code ci-dessous est écrit en C# et utilise le Kit de développement logiciel (SDK) Stockage Azure, la bibliothèque d’authentification Microsoft (MSAL) et le package NewtonSoft.JSON pour simplifier l’exemple de code. Si nécessaire, le code correspondant peut être remplacé par des appels d’API REST stockage Azure appropriés, non-.NET package MSAL et tout package de gestion JSON disponible.
Cet article traite du mode d’ingestion recommandé. Pour la bibliothèque Kusto.Ingest, son entité correspondante est l’interface IKustoQueuedIngestClient . Ici, le code client interagit avec votre cluster en publiant des messages de notification d’ingestion dans une file d’attente Azure. Les références aux messages sont obtenues à partir du service kusto Gestion des données (également appelé ingestion). L’interaction avec le service doit être authentifiée avec Microsoft Entra’ID.
Le code suivant montre comment le service Kusto Gestion des données gère l’ingestion des données mises en file d’attente sans utiliser la bibliothèque Kusto.Ingest. Cet exemple peut être utile si .NET complet est inaccessible ou indisponible en raison de l’environnement ou d’autres restrictions.
Le code inclut les étapes permettant de créer un client stockage Azure et de charger les données dans un objet blob. Chaque étape est décrite plus en détail, après l’exemple de code.
- Obtenir un jeton d’authentification pour accéder au service d’ingestion
- Interrogez le service d’ingestion pour obtenir :
- Charger des données dans un objet blob sur l’un des conteneurs d’objets blob obtenus à partir de Kusto dans (2)
- Composer un message d’ingestion qui identifie la base de données et la table cible et qui pointe vers l’objet blob à partir de (3)
- Publiez le message d’ingestion que nous avons composé dans (4) dans une file d’attente d’ingestion obtenue dans (2)
- Récupérer toute erreur détectée par le service pendant l’ingestion
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
// 2a. Retrieve ingestion resources
var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers we got from Azure Data Explorer.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
var blobUriWithSas = UploadFileToBlobContainer(
file, ingestionResources.TempStorageContainers.First(), blobName,
out var blobSizeBytes
);
// 4. Compose ingestion command
var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the previously obtained ingestion queues.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
Utilisation de l’ingestion en file d’attente pour les pipelines de qualité production
Obtenir une preuve d’authentification à partir de l’ID Microsoft Entra
Ici, nous utilisons la bibliothèque d’authentification Microsoft (MSAL) pour obtenir un jeton Microsoft Entra pour accéder au service kusto Gestion des données et demander ses files d’attente d’entrée. MSAL est disponible sur plusieurs plateformes.
// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create an authentication client for Azure AD:
var authClient = PublicClientApplicationBuilder.Create("<appId>")
.WithAuthority("https://login.microsoftonline.com/<appTenant>")
.WithRedirectUri("<appRedirectUri>")
.Build();
// Acquire user token for the interactive user for Azure Data Explorer:
var result = authClient.AcquireTokenInteractive(
new[] { $"{resource}/.default" } // Define scopes
).ExecuteAsync().Result;
return result.AccessToken;
}
Récupérer des ressources d’ingestion
Construisez manuellement une requête HTTP POST pour le service Gestion des données, en demandant le retour des ressources d’ingestion. Ces ressources incluent les files d’attente que le service DM écoute et les conteneurs d’objets blob pour le chargement des données. Le service Gestion des données traite tous les messages contenant des demandes d’ingestion qui arrivent dans l’une de ces files d’attente.
// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get ingestion resources\" }";
var ingestionResources = new IngestionResourcesSnapshot();
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
// Input queues
var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string)token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
var request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
using var bodyStream = request.GetRequestStream();
using (var sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
Obtenir un jeton d’identité Kusto
Les messages d’ingestion sont transmis à votre cluster via un canal non direct (file d’attente Azure), ce qui rend impossible la validation d’autorisation inbande pour accéder au service d’ingestion. La solution consiste à attacher un jeton d’identité à chaque message d’ingestion. Le jeton active la validation d’autorisation dans la bande. Ce jeton signé peut ensuite être validé par le service d’ingestion lorsqu’il reçoit le message d’ingestion.
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get kusto identity token\" }";
var jsonPath = "Tables[0].Rows[*].[0]";
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return (string)identityToken;
}
Charger des données dans le conteneur d’objets blob Azure
Cette étape consiste à charger un fichier local dans un objet blob Azure qui sera remis pour ingestion. Ce code utilise le Kit de développement logiciel (SDK) Stockage Azure. Si la dépendance n’est pas possible, elle peut être obtenue avec l’API REST du service Blob Azure.
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
var blobContainer = new BlobContainerClient(blobUri);
var blob = blobContainer.GetBlobClient(blobName);
using (var stream = File.OpenRead(filePath))
{
blob.UploadAsync(BinaryData.FromStream(stream));
blobSize = blob.GetProperties().Value.ContentLength;
}
return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}
Composer le message d’ingestion
Le package NewtonSoft.JSON compose à nouveau une demande d’ingestion valide pour identifier la base de données et la table cibles, et qui pointe vers l’objet blob. Le message sera publié dans la file d’attente Azure sur laquelle le service Kusto Gestion des données approprié est à l’écoute.
Voici quelques points à prendre en compte.
- Cette requête est le strict minimum pour le message d’ingestion.
Notes
Le jeton d’identité est obligatoire et doit faire partie de l’objet JSON AdditionalProperties .
- Chaque fois que nécessaire, les propriétés CsvMapping ou JsonMapping doivent également être fournies
- Pour plus d’informations, consultez l’article sur la précréation du mappage d’ingestion.
- La structure interne du message d’ingestion de section fournit une explication de la structure du message d’ingestion
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject
{
{ "Id", Guid.NewGuid().ToString() },
{ "BlobPath", dataUri },
{ "RawDataSize", blobSizeBytes },
{ "DatabaseName", db },
{ "TableName", table },
{ "RetainBlobOnSuccess", true }, // Do not delete the blob on success
{ "FlushImmediately", true }, // Do not aggregate
{ "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
{ "ReportMethod", 0 }, // Failures are reported to an Azure Queue
{
"AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("mappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "multijson")
)
}
};
return message.ToString();
}
Publier le message d’ingestion dans la file d’attente d’ingestion
Enfin, publiez le message que vous avez construit dans la file d’attente d’ingestion sélectionnée que vous avez obtenue précédemment.
Notes
Les versions clientes de stockage .Net sous v12, par défaut, encodent le message en base64 Pour plus d’informations, consultez la documentation de stockage. Si vous utilisez des versions clientes de stockage .Net au-dessus de v12, vous devez encoder correctement le contenu du message.
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
queue.SendMessage(message);
}
Rechercher les messages d’erreur de la file d’attente Azure
Après l’ingestion, nous case activée pour les messages d’échec de la file d’attente appropriée dans laquelle le Gestion des données écrit. Pour plus d’informations sur la structure des messages d’échec, consultez Structure des messages d’échec d’ingestion.
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
var messages = messagesFromQueue.Select(m => m.MessageText);
return messages;
}
Messages d’ingestion - Formats de document JSON
Structure interne du message d’ingestion
Le message que le service Kusto Gestion des données s’attend à lire à partir de la file d’attente Azure d’entrée est un document JSON au format suivant.
{
"Id" : "<ID>",
"BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Propriété | Description |
---|---|
Id | Identificateur de message (GUID) |
BlobPath | Chemin d’accès (URI) à l’objet blob, y compris la clé SAS accordant des autorisations de lecture/écriture/suppression. Des autorisations sont requises pour que le service d’ingestion puisse supprimer l’objet blob une fois qu’il a terminé l’ingestion des données. |
RawDataSize | Taille des données non compressées en octets. Fournir cette valeur permet au service d’ingestion d’optimiser l’ingestion en agrégeant potentiellement plusieurs objets blob. Cette propriété est facultative, mais si elle n’est pas donnée, le service accède à l’objet blob uniquement pour récupérer la taille. |
nom_base_de_données | Nom de la base de données cible |
TableName | Nom de la table cible |
RetainBlobOnSuccess | Si la valeur true est définie sur , l’objet blob ne sera pas supprimé une fois l’ingestion terminée. La valeur par défaut est false |
Vider immédiatement | Si la valeur true est définie sur , toute agrégation est ignorée. La valeur par défaut est false |
ReportLevel | Niveau de rapport réussite/erreur : 0-Échecs, 1-Aucun, 2-All |
ReportMethod | Mécanisme de création de rapports : 0-Queue, 1-Table |
AdditionalProperties | D’autres propriétés telles que format , tags et creationTime . Pour plus d’informations, consultez Propriétés d’ingestion des données. |
Structure des messages d’échec d’ingestion
Le message que le Gestion des données s’attend à lire à partir de la file d’attente Azure d’entrée est un document JSON au format suivant.
Propriété | Description |
---|---|
OperationId | Identificateur d’opération (GUID) qui peut être utilisé pour suivre l’opération côté service |
Base de données | Nom de la base de données cible |
Table de charge de travail | Nom de la table cible |
FailedOn | Horodatage de l’échec |
IngestionSourceId | GUID identifiant le bloc de données qui n’a pas pu être ingéré |
IngestionSourcePath | Chemin d’accès (URI) au bloc de données qui n’a pas pu être ingéré |
Détails | Message d’échec |
ErrorCode | Code d'erreur. Pour tous les codes d’erreur, consultez Codes d’erreur d’ingestion. |
FailureStatus | Indique si l’échec est permanent ou temporaire |
RootActivityId | Identificateur de corrélation (GUID) qui peut être utilisé pour suivre l’opération côté service |
OriginatesFromUpdatePolicy | Indique si l’échec a été provoqué par une stratégie de mise à jour transactionnelle erronée |
ShouldRetry | Indique si l’ingestion peut réussir en cas de nouvelle tentative en l’état |
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de 2024, nous allons supprimer progressivement GitHub Issues comme mécanisme de commentaires pour le contenu et le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultezEnvoyer et afficher des commentaires pour