Como ingerir dados com a API REST

A biblioteca Kusto.Ingest é preferencial para ingerir dados no cluster. No entanto, ainda pode obter quase a mesma funcionalidade, sem depender do pacote Kusto.Ingest. Este artigo mostra-lhe como utilizar a Ingestão em Fila no cluster para pipelines de nível de produção.

Nota

O código abaixo é escrito em C#e utiliza o SDK de Armazenamento do Azure, a Biblioteca de Autenticação da Microsoft (MSAL) e o pacote NewtonSoft.JSON para simplificar o código de exemplo. Se necessário, o código correspondente pode ser substituído por chamadas adequadas à API REST do Armazenamento do Azure , non-.NET pacote MSAL e qualquer pacote de processamento JSON disponível.

Este artigo aborda o modo recomendado de ingestão. Para a biblioteca Kusto.Ingest, a entidade correspondente é a interface IKustoQueuedIngestClient . Aqui, o código do cliente interage com o cluster ao publicar mensagens de notificação de ingestão numa fila do Azure. As referências às mensagens são obtidas a partir do serviço kusto Gestão de Dados (também conhecido como Ingestão). A interação com o serviço tem de ser autenticada com Microsoft Entra ID.

O código seguinte mostra como o serviço kusto Gestão de Dados processa a ingestão de dados em fila sem utilizar a biblioteca Kusto.Ingest. Este exemplo pode ser útil se o .NET completo estiver inacessível ou indisponível devido ao ambiente ou a outras restrições.

O código inclui os passos para criar um cliente do Armazenamento do Azure e carregar os dados para um blob. Cada passo é descrito em maior detalhe, após o código de exemplo.

  1. Obter um token de autenticação para aceder ao serviço de ingestão
  2. Consulte o serviço de ingestão para obter:
  3. Carregar dados para um blob num dos contentores de blobs obtidos a partir do Kusto no (2)
  4. Compor uma mensagem de ingestão que identifica a base de dados e a tabela de destino e que aponta para o blob de (3)
  5. Publique a mensagem de ingestão que compôs em (4) numa fila de ingestão obtida em (2)
  6. Obter qualquer erro encontrado pelo serviço durante a ingestão
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Utilizar a ingestão em fila para pipelines de nível de produção

Obter provas de autenticação a partir de Microsoft Entra ID

Aqui, utilizamos a Biblioteca de Autenticação da Microsoft (MSAL) para obter um token de Microsoft Entra para aceder ao serviço kusto Gestão de Dados e pedir as respetivas filas de entrada. O MSAL está disponível em várias plataformas.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Obter recursos de ingestão

Crie manualmente um pedido HTTP POST para o serviço Gestão de Dados, solicitando a devolução dos recursos de ingestão. Estes recursos incluem filas nas quais o serviço DM está a escutar e contentores de blobs para carregamento de dados. O serviço Gestão de Dados processará quaisquer mensagens que contenham pedidos de ingestão que cheguem a uma dessas filas.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Obter um token de identidade kusto

As mensagens de ingestão são entregues ao cluster através de um canal não direto (fila do Azure), impossibilitando a validação de autorização dentro da banda para aceder ao serviço de ingestão. A solução consiste em anexar um token de identidade a cada mensagem de ingestão. O token ativa a validação de autorização dentro da banda. Este token assinado pode ser validado pelo serviço de ingestão quando receber a mensagem de ingestão.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Carregar dados para o contentor de Blobs do Azure

Este passo consiste em carregar um ficheiro local para um Blob do Azure que será entregue para ingestão. Este código utiliza o SDK de Armazenamento do Azure. Se a dependência não for possível, pode ser obtida com a API REST do Serviço de Blobs do Azure.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Compor a mensagem de ingestão

O pacote NewtonSoft.JSON irá compor novamente um pedido de ingestão válido para identificar a base de dados de destino e a tabela, o que aponta para o blob. A mensagem será publicada na Fila do Azure na qual o serviço de Gestão de Dados Kusto relevante está a escutar.

Seguem-se alguns pontos a considerar.

  • Este pedido é o mínimo para a mensagem de ingestão.

Nota

O token de identidade é obrigatório e tem de fazer parte do objeto JSON AdditionalProperties .

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Publicar a mensagem de ingestão na fila de ingestão

Por fim, publique a mensagem que construiu na fila de ingestão selecionada que obteve anteriormente.

Nota

Por predefinição, as versões do cliente de armazenamento .Net abaixo de v12 codificam a mensagem para base64 Para obter mais informações, consulte documentos de armazenamento. Se estiver a utilizar versões de cliente de armazenamento .Net acima da v12, tem de codificar corretamente o conteúdo da mensagem.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Procurar mensagens de erro da fila do Azure

Após a ingestão, verificamos se existem mensagens de falha da fila relevante na qual o Gestão de Dados escreve. Para obter mais informações sobre a estrutura da mensagem de falha, veja Estrutura de mensagens de falha de ingestão.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

Mensagens de ingestão – formatos de documento JSON

Estrutura interna da mensagem de ingestão

A mensagem que o serviço kusto Gestão de Dados espera ler a partir da Fila do Azure de entrada é um documento JSON no seguinte formato.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Propriedade Descrição
Id Identificador de mensagens (GUID)
BlobPath Caminho (URI) para o blob, incluindo a chave SAS que concede permissões para lê-lo/escrever/eliminá-lo. São necessárias permissões para que o serviço de ingestão possa eliminar o blob depois de concluir a ingestão dos dados.
RawDataSize Tamanho dos dados não comprimidos em bytes. Fornecer este valor permite ao serviço de ingestão otimizar a ingestão ao agregar potencialmente vários blobs. Esta propriedade é opcional, mas se não for dada, o serviço acederá ao blob apenas para obter o tamanho.
DatabaseName Nome da base de dados de destino
TableName Nome da tabela de destino
RetainBlobOnSuccess Se estiver definido como true, o blob não será eliminado quando a ingestão for concluída com êxito. A predefinição é false
FlushImmediately Se estiver definido como true, qualquer agregação será ignorada. A predefinição é false
ReportLevel Nível de relatório de Êxito/Erro: 0-Falhas, 1-Nenhum, 2-All
ReportMethod Mecanismo de relatórios: 0-Fila, 1-Tabela
Propriedades Adicionais Outras propriedades, como format, tagse creationTime. Para obter mais informações, veja Propriedades de ingestão de dados.

Estrutura da mensagem de falha de ingestão

A mensagem que o Gestão de Dados espera ler a partir da Fila do Azure de entrada é um documento JSON no seguinte formato.

Propriedade Descrição
OperationId Identificador de operação (GUID) que pode ser utilizado para controlar a operação no lado do serviço
Base de Dados Nome da base de dados de destino
Tabela Nome da tabela de destino
FailedOn Carimbo de data/hora da falha
IngestionSourceId GUID que identifica o segmento de dados que não ingeriu
IngestionSourcePath Caminho (URI) para o segmento de dados que não ingeriu
Detalhes Mensagem de falha
CódigoDoErro O código de erro. Para todos os códigos de erro, veja Códigos de erro de ingestão.
FailureStatus Indica se a falha é permanente ou transitória
RootActivityId O identificador de correlação (GUID) que pode ser utilizado para controlar a operação no lado do serviço
OriginesFromUpdatePolicy Indica se a falha foi causada por uma política de atualização transacional errónea
ShouldRetry Indica se a ingestão pode ser bem-sucedida se for repetida como está