Como ingerir dados com a API REST
A biblioteca Kusto.Ingest é preferencial para ingerir dados no cluster. No entanto, ainda pode obter quase a mesma funcionalidade, sem depender do pacote Kusto.Ingest. Este artigo mostra-lhe como utilizar a Ingestão em Fila no cluster para pipelines de nível de produção.
Nota
O código abaixo é escrito em C#e utiliza o SDK de Armazenamento do Azure, a Biblioteca de Autenticação da Microsoft (MSAL) e o pacote NewtonSoft.JSON para simplificar o código de exemplo. Se necessário, o código correspondente pode ser substituído por chamadas adequadas à API REST do Armazenamento do Azure , non-.NET pacote MSAL e qualquer pacote de processamento JSON disponível.
Este artigo aborda o modo recomendado de ingestão. Para a biblioteca Kusto.Ingest, a entidade correspondente é a interface IKustoQueuedIngestClient . Aqui, o código do cliente interage com o cluster ao publicar mensagens de notificação de ingestão numa fila do Azure. As referências às mensagens são obtidas a partir do serviço kusto Gestão de Dados (também conhecido como Ingestão). A interação com o serviço tem de ser autenticada com Microsoft Entra ID.
O código seguinte mostra como o serviço kusto Gestão de Dados processa a ingestão de dados em fila sem utilizar a biblioteca Kusto.Ingest. Este exemplo pode ser útil se o .NET completo estiver inacessível ou indisponível devido ao ambiente ou a outras restrições.
O código inclui os passos para criar um cliente do Armazenamento do Azure e carregar os dados para um blob. Cada passo é descrito em maior detalhe, após o código de exemplo.
- Obter um token de autenticação para aceder ao serviço de ingestão
- Consulte o serviço de ingestão para obter:
- Carregar dados para um blob num dos contentores de blobs obtidos a partir do Kusto no (2)
- Compor uma mensagem de ingestão que identifica a base de dados e a tabela de destino e que aponta para o blob de (3)
- Publique a mensagem de ingestão que compôs em (4) numa fila de ingestão obtida em (2)
- Obter qualquer erro encontrado pelo serviço durante a ingestão
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
// 2a. Retrieve ingestion resources
var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers we got from Azure Data Explorer.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
var blobUriWithSas = UploadFileToBlobContainer(
file, ingestionResources.TempStorageContainers.First(), blobName,
out var blobSizeBytes
);
// 4. Compose ingestion command
var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the previously obtained ingestion queues.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
Utilizar a ingestão em fila para pipelines de nível de produção
Obter provas de autenticação a partir de Microsoft Entra ID
Aqui, utilizamos a Biblioteca de Autenticação da Microsoft (MSAL) para obter um token de Microsoft Entra para aceder ao serviço kusto Gestão de Dados e pedir as respetivas filas de entrada. O MSAL está disponível em várias plataformas.
// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create an authentication client for Azure AD:
var authClient = PublicClientApplicationBuilder.Create("<appId>")
.WithAuthority("https://login.microsoftonline.com/<appTenant>")
.WithRedirectUri("<appRedirectUri>")
.Build();
// Acquire user token for the interactive user for Azure Data Explorer:
var result = authClient.AcquireTokenInteractive(
new[] { $"{resource}/.default" } // Define scopes
).ExecuteAsync().Result;
return result.AccessToken;
}
Obter recursos de ingestão
Crie manualmente um pedido HTTP POST para o serviço Gestão de Dados, solicitando a devolução dos recursos de ingestão. Estes recursos incluem filas nas quais o serviço DM está a escutar e contentores de blobs para carregamento de dados. O serviço Gestão de Dados processará quaisquer mensagens que contenham pedidos de ingestão que cheguem a uma dessas filas.
// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get ingestion resources\" }";
var ingestionResources = new IngestionResourcesSnapshot();
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
// Input queues
var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string)token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
var request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
using var bodyStream = request.GetRequestStream();
using (var sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
Obter um token de identidade kusto
As mensagens de ingestão são entregues ao cluster através de um canal não direto (fila do Azure), impossibilitando a validação de autorização dentro da banda para aceder ao serviço de ingestão. A solução consiste em anexar um token de identidade a cada mensagem de ingestão. O token ativa a validação de autorização dentro da banda. Este token assinado pode ser validado pelo serviço de ingestão quando receber a mensagem de ingestão.
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get kusto identity token\" }";
var jsonPath = "Tables[0].Rows[*].[0]";
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return (string)identityToken;
}
Carregar dados para o contentor de Blobs do Azure
Este passo consiste em carregar um ficheiro local para um Blob do Azure que será entregue para ingestão. Este código utiliza o SDK de Armazenamento do Azure. Se a dependência não for possível, pode ser obtida com a API REST do Serviço de Blobs do Azure.
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
var blobContainer = new BlobContainerClient(blobUri);
var blob = blobContainer.GetBlobClient(blobName);
using (var stream = File.OpenRead(filePath))
{
blob.UploadAsync(BinaryData.FromStream(stream));
blobSize = blob.GetProperties().Value.ContentLength;
}
return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}
Compor a mensagem de ingestão
O pacote NewtonSoft.JSON irá compor novamente um pedido de ingestão válido para identificar a base de dados de destino e a tabela, o que aponta para o blob. A mensagem será publicada na Fila do Azure na qual o serviço de Gestão de Dados Kusto relevante está a escutar.
Seguem-se alguns pontos a considerar.
- Este pedido é o mínimo para a mensagem de ingestão.
Nota
O token de identidade é obrigatório e tem de fazer parte do objeto JSON AdditionalProperties .
- Sempre que necessário, as propriedades CsvMapping ou JsonMapping também têm de ser fornecidas
- Para obter mais informações, veja o artigo sobre a pré-criação do mapeamento de ingestão.
- A estrutura interna da mensagem de ingestão de secções fornece uma explicação da estrutura da mensagem de ingestão
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject
{
{ "Id", Guid.NewGuid().ToString() },
{ "BlobPath", dataUri },
{ "RawDataSize", blobSizeBytes },
{ "DatabaseName", db },
{ "TableName", table },
{ "RetainBlobOnSuccess", true }, // Do not delete the blob on success
{ "FlushImmediately", true }, // Do not aggregate
{ "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
{ "ReportMethod", 0 }, // Failures are reported to an Azure Queue
{
"AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("mappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "multijson")
)
}
};
return message.ToString();
}
Publicar a mensagem de ingestão na fila de ingestão
Por fim, publique a mensagem que construiu na fila de ingestão selecionada que obteve anteriormente.
Nota
Por predefinição, as versões do cliente de armazenamento .Net abaixo de v12 codificam a mensagem para base64 Para obter mais informações, consulte documentos de armazenamento. Se estiver a utilizar versões de cliente de armazenamento .Net acima da v12, tem de codificar corretamente o conteúdo da mensagem.
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
queue.SendMessage(message);
}
Procurar mensagens de erro da fila do Azure
Após a ingestão, verificamos se existem mensagens de falha da fila relevante na qual o Gestão de Dados escreve. Para obter mais informações sobre a estrutura da mensagem de falha, veja Estrutura de mensagens de falha de ingestão.
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
var messages = messagesFromQueue.Select(m => m.MessageText);
return messages;
}
Mensagens de ingestão – formatos de documento JSON
Estrutura interna da mensagem de ingestão
A mensagem que o serviço kusto Gestão de Dados espera ler a partir da Fila do Azure de entrada é um documento JSON no seguinte formato.
{
"Id" : "<ID>",
"BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Propriedade | Descrição |
---|---|
Id | Identificador de mensagens (GUID) |
BlobPath | Caminho (URI) para o blob, incluindo a chave SAS que concede permissões para lê-lo/escrever/eliminá-lo. São necessárias permissões para que o serviço de ingestão possa eliminar o blob depois de concluir a ingestão dos dados. |
RawDataSize | Tamanho dos dados não comprimidos em bytes. Fornecer este valor permite ao serviço de ingestão otimizar a ingestão ao agregar potencialmente vários blobs. Esta propriedade é opcional, mas se não for dada, o serviço acederá ao blob apenas para obter o tamanho. |
DatabaseName | Nome da base de dados de destino |
TableName | Nome da tabela de destino |
RetainBlobOnSuccess | Se estiver definido como true , o blob não será eliminado quando a ingestão for concluída com êxito. A predefinição é false |
FlushImmediately | Se estiver definido como true , qualquer agregação será ignorada. A predefinição é false |
ReportLevel | Nível de relatório de Êxito/Erro: 0-Falhas, 1-Nenhum, 2-All |
ReportMethod | Mecanismo de relatórios: 0-Fila, 1-Tabela |
Propriedades Adicionais | Outras propriedades, como format , tags e creationTime . Para obter mais informações, veja Propriedades de ingestão de dados. |
Estrutura da mensagem de falha de ingestão
A mensagem que o Gestão de Dados espera ler a partir da Fila do Azure de entrada é um documento JSON no seguinte formato.
Propriedade | Descrição |
---|---|
OperationId | Identificador de operação (GUID) que pode ser utilizado para controlar a operação no lado do serviço |
Base de Dados | Nome da base de dados de destino |
Tabela | Nome da tabela de destino |
FailedOn | Carimbo de data/hora da falha |
IngestionSourceId | GUID que identifica o segmento de dados que não ingeriu |
IngestionSourcePath | Caminho (URI) para o segmento de dados que não ingeriu |
Detalhes | Mensagem de falha |
CódigoDoErro | O código de erro. Para todos os códigos de erro, veja Códigos de erro de ingestão. |
FailureStatus | Indica se a falha é permanente ou transitória |
RootActivityId | O identificador de correlação (GUID) que pode ser utilizado para controlar a operação no lado do serviço |
OriginesFromUpdatePolicy | Indica se a falha foi causada por uma política de atualização transacional errónea |
ShouldRetry | Indica se a ingestão pode ser bem-sucedida se for repetida como está |
Comentários
https://aka.ms/ContentUserFeedback.
Brevemente: Ao longo de 2024, vamos descontinuar progressivamente o GitHub Issues como mecanismo de feedback para conteúdos e substituí-lo por um novo sistema de feedback. Para obter mais informações, veja:Submeter e ver comentários