Come inserire dati con l'API REST
La libreria Kusto.Ingest è preferibile per l'inserimento dei dati nel cluster. Tuttavia, è comunque possibile ottenere quasi la stessa funzionalità, senza dipendere dal pacchetto Kusto.Ingest. Questo articolo illustra come usare l'inserimento in coda nel cluster per le pipeline di livello di produzione.
Nota
Il codice seguente è scritto in C# e usa Azure Storage SDK, Microsoft Authentication Library (MSAL) e il pacchetto NewtonSoft.JSON per semplificare il codice di esempio. Se necessario, il codice corrispondente può essere sostituito con le chiamate API REST di Archiviazione di Azure appropriate, non-.NET pacchetto MSAL e qualsiasi pacchetto di gestione JSON disponibile.
Questo articolo riguarda la modalità consigliata di inserimento. Per la libreria Kusto.Ingest, l'entità corrispondente è l'interfaccia IKustoQueuedIngestClient . In questo caso, il codice client interagisce con il cluster pubblicando messaggi di notifica di inserimento in una coda di Azure. I riferimenti ai messaggi vengono ottenuti dal servizio Kusto Gestione dati (noto anche come inserimento). L'interazione con il servizio deve essere autenticata con Microsoft Entra ID.
Il codice seguente illustra come il servizio Kusto Gestione dati gestisce l'inserimento dei dati in coda senza usare la libreria Kusto.Ingest. Questo esempio può essere utile se .NET completo è inaccessibile o non disponibile a causa dell'ambiente o di altre restrizioni.
Il codice include i passaggi per creare un client di Archiviazione di Azure e caricare i dati in un BLOB. Ogni passaggio viene descritto in modo più dettagliato, dopo il codice di esempio.
- Ottenere un token di autenticazione per l'accesso al servizio di inserimento
- Eseguire una query sul servizio di inserimento per ottenere:
- Caricare dati in un BLOB in uno dei contenitori BLOB ottenuti da Kusto in (2)
- Comporre un messaggio di inserimento che identifica il database e la tabella di destinazione e che punta al BLOB da (3)
- Inserire il messaggio di inserimento composto in (4) in una coda di inserimento ottenuta in (2)
- Recuperare eventuali errori rilevati dal servizio durante l'inserimento
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
// 2a. Retrieve ingestion resources
var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers we got from Azure Data Explorer.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
var blobUriWithSas = UploadFileToBlobContainer(
file, ingestionResources.TempStorageContainers.First(), blobName,
out var blobSizeBytes
);
// 4. Compose ingestion command
var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the previously obtained ingestion queues.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
Uso dell'inserimento in coda per le pipeline di livello di produzione
Ottenere l'evidenza di autenticazione da Microsoft Entra ID
In questo caso viene usato Microsoft Authentication Library (MSAL) per ottenere un token di Microsoft Entra per accedere al servizio Kusto Gestione dati e richiedere le code di input. MSAL è disponibile su più piattaforme.
// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create an authentication client for Azure AD:
var authClient = PublicClientApplicationBuilder.Create("<appId>")
.WithAuthority("https://login.microsoftonline.com/<appTenant>")
.WithRedirectUri("<appRedirectUri>")
.Build();
// Acquire user token for the interactive user for Azure Data Explorer:
var result = authClient.AcquireTokenInteractive(
new[] { $"{resource}/.default" } // Define scopes
).ExecuteAsync().Result;
return result.AccessToken;
}
Recuperare le risorse di inserimento
Costruire manualmente una richiesta HTTP POST al servizio Gestione dati, richiedendo la restituzione delle risorse di inserimento. Queste risorse includono le code su cui è in ascolto il servizio Dm Dm e i contenitori BLOB per il caricamento dei dati. Il servizio Gestione dati elabora tutti i messaggi contenenti richieste di inserimento che arrivano su una di queste code.
// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get ingestion resources\" }";
var ingestionResources = new IngestionResourcesSnapshot();
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
// Input queues
var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string)token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
var request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
using var bodyStream = request.GetRequestStream();
using (var sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
Ottenere un token di identità Kusto
I messaggi di inserimento vengono inviati al cluster tramite un canale non diretto (coda di Azure), rendendo impossibile eseguire la convalida dell'autorizzazione in banda per l'accesso al servizio di inserimento. La soluzione consiste nel collegare un token di identità a ogni messaggio di inserimento. Il token abilita la convalida dell'autorizzazione in banda. Questo token firmato può quindi essere convalidato dal servizio di inserimento quando riceve il messaggio di inserimento.
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get kusto identity token\" }";
var jsonPath = "Tables[0].Rows[*].[0]";
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return (string)identityToken;
}
Caricare dati nel contenitore BLOB di Azure
Questo passaggio riguarda il caricamento di un file locale in un BLOB di Azure che verrà passato per l'inserimento. Questo codice usa Azure Storage SDK. Se la dipendenza non è possibile, può essere ottenuta con l'API REST del servizio BLOB di Azure.
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
var blobContainer = new BlobContainerClient(blobUri);
var blob = blobContainer.GetBlobClient(blobName);
using (var stream = File.OpenRead(filePath))
{
blob.UploadAsync(BinaryData.FromStream(stream));
blobSize = blob.GetProperties().Value.ContentLength;
}
return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}
Comporre il messaggio di inserimento
Il pacchetto NewtonSoft.JSON componi nuovamente una richiesta di inserimento valida per identificare il database e la tabella di destinazione e che punta al BLOB. Il messaggio verrà inviato alla coda di Azure su cui è in ascolto il servizio Kusto Gestione dati pertinente.
Ecco alcuni punti da considerare.
- Questa richiesta è il minimo minimo per il messaggio di inserimento.
Nota
Il token di identità è obbligatorio e deve far parte dell'oggetto JSON AdditionalProperties .
- Quando necessario, è necessario specificare anche le proprietà CsvMapping o JsonMapping
- Per altre informazioni, vedere l'articolo relativo alla pre-creazione del mapping di inserimento.
- La struttura interna del messaggio di inserimento sezione fornisce una spiegazione della struttura dei messaggi di inserimento
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject
{
{ "Id", Guid.NewGuid().ToString() },
{ "BlobPath", dataUri },
{ "RawDataSize", blobSizeBytes },
{ "DatabaseName", db },
{ "TableName", table },
{ "RetainBlobOnSuccess", true }, // Do not delete the blob on success
{ "FlushImmediately", true }, // Do not aggregate
{ "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
{ "ReportMethod", 0 }, // Failures are reported to an Azure Queue
{
"AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("mappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "multijson")
)
}
};
return message.ToString();
}
Pubblicare il messaggio di inserimento nella coda di inserimento
Infine, pubblicare il messaggio creato nella coda di inserimento selezionata ottenuta in precedenza.
Nota
Per impostazione predefinita, le versioni del client di archiviazione .Net precedenti alla versione 12 codificano il messaggio in base64. Per altre informazioni, vedere la documentazione sull'archiviazione. Se si usano versioni client di archiviazione .NET precedenti alla versione 12, è necessario codificare correttamente il contenuto del messaggio.
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
queue.SendMessage(message);
}
Verificare la presenza di messaggi di errore dalla coda di Azure
Dopo l'inserimento, viene verificata la presenza di messaggi di errore dalla coda pertinente in cui scrive il Gestione dati. Per altre informazioni sulla struttura dei messaggi di errore, vedere Struttura dei messaggi di errore di inserimento.
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
var messages = messagesFromQueue.Select(m => m.MessageText);
return messages;
}
Messaggi di inserimento - Formati di documento JSON
Struttura interna del messaggio di inserimento
Il messaggio che il servizio Kusto Gestione dati prevede di leggere dalla coda di Azure di input è un documento JSON nel formato seguente.
{
"Id" : "<ID>",
"BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Proprietà | Descrizione |
---|---|
ID | Identificatore del messaggio (GUID) |
BlobPath | Percorso (URI) al BLOB, inclusa la chiave di firma di accesso condiviso che concede le autorizzazioni per la lettura/scrittura/eliminazione. Le autorizzazioni sono necessarie in modo che il servizio di inserimento possa eliminare il BLOB dopo aver completato l'inserimento dei dati. |
RawDataSize | Dimensioni dei dati non compressi in byte. Questo valore consente al servizio di inserimento di ottimizzare l'inserimento aggregando potenzialmente più BLOB. Questa proprietà è facoltativa, ma se non viene specificata, il servizio accederà al BLOB solo per recuperare le dimensioni. |
DatabaseName | Nome del database di destinazione |
TableName | Nome tabella di destinazione |
RetainBlobOnSuccess | Se impostato su true , il BLOB non verrà eliminato al termine dell'inserimento. Il valore predefinito è false |
FlushImmediately | Se impostato su true , qualsiasi aggregazione verrà ignorata. Il valore predefinito è false |
ReportLevel | Livello di segnalazione errori/esito positivo: 0-failures, 1-None, 2-All |
ReportMethod | Meccanismo di creazione di report: 0-Queue, 1-Table |
AdditionalProperties | Altre proprietà, ad format esempio , tags e creationTime . Per altre informazioni, vedere Proprietà di inserimento dati. |
Struttura dei messaggi di errore di inserimento
Il messaggio che il Gestione dati prevede di leggere dalla coda di Azure di input è un documento JSON nel formato seguente.
Proprietà | Descrizione |
---|---|
OperationId | Identificatore dell'operazione (GUID) che può essere usato per tenere traccia dell'operazione sul lato servizio |
Database | Nome del database di destinazione |
Tabella | Nome tabella di destinazione |
FailedOn | Timestamp degli errori |
InserimentoSourceId | GUID che identifica il blocco di dati che non è riuscito ad inserire |
InserimentoSourcePath | Percorso (URI) al blocco di dati che non è riuscito ad inserire |
Dettagli | Messaggio di errore |
ErrorCode | Codice di errore. Per tutti i codici di errore, vedere Codici di errore di inserimento. |
FailureStatus | Indica se l'errore è permanente o temporaneo |
RootActivityId | Identificatore di correlazione (GUID) che può essere usato per tenere traccia dell'operazione sul lato servizio |
OriginsFromUpdatePolicy | Indica se l'errore è stato causato da un criterio di aggiornamento transazionale errato |
ShouldRetry | Indica se l'inserimento potrebbe avere esito positivo se è stato eseguito di nuovo il tentativo |
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per