Come inserire dati con l'API REST

La libreria Kusto.Ingest è preferibile per l'inserimento dei dati nel cluster. Tuttavia, è comunque possibile ottenere quasi la stessa funzionalità, senza dipendere dal pacchetto Kusto.Ingest. Questo articolo illustra come usare l'inserimento in coda nel cluster per le pipeline di livello di produzione.

Nota

Il codice seguente è scritto in C# e usa Azure Storage SDK, Microsoft Authentication Library (MSAL) e il pacchetto NewtonSoft.JSON per semplificare il codice di esempio. Se necessario, il codice corrispondente può essere sostituito con le chiamate API REST di Archiviazione di Azure appropriate, non-.NET pacchetto MSAL e qualsiasi pacchetto di gestione JSON disponibile.

Questo articolo riguarda la modalità consigliata di inserimento. Per la libreria Kusto.Ingest, l'entità corrispondente è l'interfaccia IKustoQueuedIngestClient . In questo caso, il codice client interagisce con il cluster pubblicando messaggi di notifica di inserimento in una coda di Azure. I riferimenti ai messaggi vengono ottenuti dal servizio Kusto Gestione dati (noto anche come inserimento). L'interazione con il servizio deve essere autenticata con Microsoft Entra ID.

Il codice seguente illustra come il servizio Kusto Gestione dati gestisce l'inserimento dei dati in coda senza usare la libreria Kusto.Ingest. Questo esempio può essere utile se .NET completo è inaccessibile o non disponibile a causa dell'ambiente o di altre restrizioni.

Il codice include i passaggi per creare un client di Archiviazione di Azure e caricare i dati in un BLOB. Ogni passaggio viene descritto in modo più dettagliato, dopo il codice di esempio.

  1. Ottenere un token di autenticazione per l'accesso al servizio di inserimento
  2. Eseguire una query sul servizio di inserimento per ottenere:
  3. Caricare dati in un BLOB in uno dei contenitori BLOB ottenuti da Kusto in (2)
  4. Comporre un messaggio di inserimento che identifica il database e la tabella di destinazione e che punta al BLOB da (3)
  5. Inserire il messaggio di inserimento composto in (4) in una coda di inserimento ottenuta in (2)
  6. Recuperare eventuali errori rilevati dal servizio durante l'inserimento
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Uso dell'inserimento in coda per le pipeline di livello di produzione

Ottenere l'evidenza di autenticazione da Microsoft Entra ID

In questo caso viene usato Microsoft Authentication Library (MSAL) per ottenere un token di Microsoft Entra per accedere al servizio Kusto Gestione dati e richiedere le code di input. MSAL è disponibile su più piattaforme.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Recuperare le risorse di inserimento

Costruire manualmente una richiesta HTTP POST al servizio Gestione dati, richiedendo la restituzione delle risorse di inserimento. Queste risorse includono le code su cui è in ascolto il servizio Dm Dm e i contenitori BLOB per il caricamento dei dati. Il servizio Gestione dati elabora tutti i messaggi contenenti richieste di inserimento che arrivano su una di queste code.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Ottenere un token di identità Kusto

I messaggi di inserimento vengono inviati al cluster tramite un canale non diretto (coda di Azure), rendendo impossibile eseguire la convalida dell'autorizzazione in banda per l'accesso al servizio di inserimento. La soluzione consiste nel collegare un token di identità a ogni messaggio di inserimento. Il token abilita la convalida dell'autorizzazione in banda. Questo token firmato può quindi essere convalidato dal servizio di inserimento quando riceve il messaggio di inserimento.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Caricare dati nel contenitore BLOB di Azure

Questo passaggio riguarda il caricamento di un file locale in un BLOB di Azure che verrà passato per l'inserimento. Questo codice usa Azure Storage SDK. Se la dipendenza non è possibile, può essere ottenuta con l'API REST del servizio BLOB di Azure.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Comporre il messaggio di inserimento

Il pacchetto NewtonSoft.JSON componi nuovamente una richiesta di inserimento valida per identificare il database e la tabella di destinazione e che punta al BLOB. Il messaggio verrà inviato alla coda di Azure su cui è in ascolto il servizio Kusto Gestione dati pertinente.

Ecco alcuni punti da considerare.

  • Questa richiesta è il minimo minimo per il messaggio di inserimento.

Nota

Il token di identità è obbligatorio e deve far parte dell'oggetto JSON AdditionalProperties .

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Pubblicare il messaggio di inserimento nella coda di inserimento

Infine, pubblicare il messaggio creato nella coda di inserimento selezionata ottenuta in precedenza.

Nota

Per impostazione predefinita, le versioni del client di archiviazione .Net precedenti alla versione 12 codificano il messaggio in base64. Per altre informazioni, vedere la documentazione sull'archiviazione. Se si usano versioni client di archiviazione .NET precedenti alla versione 12, è necessario codificare correttamente il contenuto del messaggio.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Verificare la presenza di messaggi di errore dalla coda di Azure

Dopo l'inserimento, viene verificata la presenza di messaggi di errore dalla coda pertinente in cui scrive il Gestione dati. Per altre informazioni sulla struttura dei messaggi di errore, vedere Struttura dei messaggi di errore di inserimento.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

Messaggi di inserimento - Formati di documento JSON

Struttura interna del messaggio di inserimento

Il messaggio che il servizio Kusto Gestione dati prevede di leggere dalla coda di Azure di input è un documento JSON nel formato seguente.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Proprietà Descrizione
ID Identificatore del messaggio (GUID)
BlobPath Percorso (URI) al BLOB, inclusa la chiave di firma di accesso condiviso che concede le autorizzazioni per la lettura/scrittura/eliminazione. Le autorizzazioni sono necessarie in modo che il servizio di inserimento possa eliminare il BLOB dopo aver completato l'inserimento dei dati.
RawDataSize Dimensioni dei dati non compressi in byte. Questo valore consente al servizio di inserimento di ottimizzare l'inserimento aggregando potenzialmente più BLOB. Questa proprietà è facoltativa, ma se non viene specificata, il servizio accederà al BLOB solo per recuperare le dimensioni.
DatabaseName Nome del database di destinazione
TableName Nome tabella di destinazione
RetainBlobOnSuccess Se impostato su true, il BLOB non verrà eliminato al termine dell'inserimento. Il valore predefinito è false
FlushImmediately Se impostato su true, qualsiasi aggregazione verrà ignorata. Il valore predefinito è false
ReportLevel Livello di segnalazione errori/esito positivo: 0-failures, 1-None, 2-All
ReportMethod Meccanismo di creazione di report: 0-Queue, 1-Table
AdditionalProperties Altre proprietà, ad formatesempio , tagse creationTime. Per altre informazioni, vedere Proprietà di inserimento dati.

Struttura dei messaggi di errore di inserimento

Il messaggio che il Gestione dati prevede di leggere dalla coda di Azure di input è un documento JSON nel formato seguente.

Proprietà Descrizione
OperationId Identificatore dell'operazione (GUID) che può essere usato per tenere traccia dell'operazione sul lato servizio
Database Nome del database di destinazione
Tabella Nome tabella di destinazione
FailedOn Timestamp degli errori
InserimentoSourceId GUID che identifica il blocco di dati che non è riuscito ad inserire
InserimentoSourcePath Percorso (URI) al blocco di dati che non è riuscito ad inserire
Dettagli Messaggio di errore
ErrorCode Codice di errore. Per tutti i codici di errore, vedere Codici di errore di inserimento.
FailureStatus Indica se l'errore è permanente o temporaneo
RootActivityId Identificatore di correlazione (GUID) che può essere usato per tenere traccia dell'operazione sul lato servizio
OriginsFromUpdatePolicy Indica se l'errore è stato causato da un criterio di aggiornamento transazionale errato
ShouldRetry Indica se l'inserimento potrebbe avere esito positivo se è stato eseguito di nuovo il tentativo