Inserimento senza kusto. inserimento libreriaIngestion without Kusto.Ingest Library

Per l'inserimento dei dati in Azure Esplora dati è preferibile la libreria kusto. inserimenti.The Kusto.Ingest library is preferred for ingesting data to Azure Data Explorer. Tuttavia, è comunque possibile ottenere quasi la stessa funzionalità senza dipendere dal pacchetto kusto. inserimenti.However, you can still achieve almost the same functionality, without being dependent on the Kusto.Ingest package. Questo articolo illustra come usare l'inserimento in coda in Esplora dati di Azure per le pipeline di livello produzione.This article shows you how, by using Queued Ingestion to Azure Data Explorer for production-grade pipelines.

Nota

Il codice riportato di seguito è scritto in C# e USA Azure Storage SDK, la libreria di autenticazione ADAL e il NewtonSoft.JSnel pacchetto, per semplificare il codice di esempio.The code below is written in C#, and makes use of the Azure Storage SDK, the ADAL Authentication library, and the NewtonSoft.JSON package, to simplify the sample code. Se necessario, il codice corrispondente può essere sostituito con le chiamate all' API REST di archiviazione di Azure appropriate, il pacchetto non-.NET adaled eventuali pacchetti di gestione JSON disponibili.If needed, the corresponding code can be replaced with appropriate Azure Storage REST API calls, non-.NET ADAL package, and any available JSON handling package.

Questo articolo riguarda la modalità di inserimento consigliata.This article deals with the recommended mode of ingestion. Per la libreria kusto. ingerite, l'entità corrispondente è l'interfaccia IKustoQueuedIngestClient .For the Kusto.Ingest library, its corresponding entity is the IKustoQueuedIngestClient interface. In questo caso, il codice client interagisce con il servizio Esplora dati di Azure inviando i messaggi di notifica di inserimento a una coda di Azure.Here, the client code interacts with the Azure Data Explorer service by posting ingestion notification messages to an Azure queue. I riferimenti ai messaggi vengono ottenuti dal servizio kusto Gestione dati (noto anche come inserimento).References to the messages are obtained from the Kusto Data Management (also known as the Ingestion) service. L'interazione con il servizio deve essere autenticata con Azure Active Directory (Azure AD).Interaction with the service must be authenticated with Azure Active Directory (Azure AD).

Nel codice seguente viene illustrato il modo in cui il servizio Gestione dati kusto gestisce l'inserimento dei dati in coda senza utilizzare la libreria kusto. Ingest.The following code shows how the Kusto Data Management service handles queued data ingestion without using the Kusto.Ingest library. Questo esempio può essere utile se la versione completa di .NET è inaccessibile o non disponibile a causa dell'ambiente o di altre restrizioni.This example may be useful if full .NET is inaccessible or unavailable because of the environment, or other restrictions.

Il codice include i passaggi per creare un client di archiviazione di Azure e caricare i dati in un BLOB.The code includes the steps to create an Azure Storage client and upload the data to a blob. Ogni passaggio viene descritto in modo più dettagliato, dopo il codice di esempio.Each step is described in greater detail, after the sample code.

  1. Ottenere un token di autenticazione per l'accesso al servizio di inserimento Esplora dati di AzureObtain an authentication token for accessing the Azure Data Explorer ingestion service
  2. Eseguire una query sul servizio di inserimento Esplora dati di Azure per ottenere:Query the Azure Data Explorer ingestion service to obtain:
  3. Caricare i dati in un BLOB in uno dei contenitori BLOB ottenuti da kusto in (2)Upload data to a blob on one of the blob containers obtained from Kusto in (2)
  4. Comporre un messaggio di inserimento che identifichi il database e la tabella di destinazione e che punti al BLOB da (3)Compose an ingestion message that identifies the target database and table and that points to the blob from (3)
  5. Inserire il messaggio di inserimento che è stato composto in (4) in una coda di inserimento ottenuta da Azure Esplora dati in (2)**Post the ingestion message we composed in (4) to an ingestion queue obtained from Azure Data Explorer in (2)**
  6. Recupera tutti gli errori rilevati dal servizio durante l'inserimentoRetrieve any error found by the service during ingestion
// A container class for ingestion resources we are going to obtain from Azure Data Explorer
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    string DmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";

    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    string bearerToken = AuthenticateInteractiveUser(DmServiceBaseUri);

    // 2a. Retrieve ingestion resources
    IngestionResourcesSnapshot ingestionResources = RetrieveIngestionResources(DmServiceBaseUri, bearerToken);

    // 2b. Retrieve Kusto identity token
    string identityToken = RetrieveKustoIdentityToken(DmServiceBaseUri, bearerToken);

    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    long blobSizeBytes = 0;
    string blobName = $"TestData{DateTime.UtcNow.ToString("yyyy-MM-dd_HH-mm-ss.FFF")}";
    string blobUriWithSas = UploadFileToBlobContainer(file, ingestionResources.TempStorageContainers.First(),
                                                            "temp001", blobName, out blobSizeBytes);

    // 4. Compose ingestion command
    string ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);

    // 5. Post ingestion command to one of the ingestion queues we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Uso dell'inserimento in coda in Esplora dati di Azure per pipeline di livello di produzioneUsing Queued Ingestion to Azure Data Explorer for production-grade pipelines

Ottenere l'evidenza di autenticazione da Azure ADObtain authentication evidence from Azure AD

Qui viene usato ADAL per ottenere un token di Azure AD per accedere al servizio kusto Gestione dati e richiedere le relative code di input.Here we use ADAL to obtain an Azure AD token to access the Kusto Data Management service and ask for its input queues. Se necessario, ADAL è disponibile su piattaforme non Windows .ADAL is available on non-Windows platforms if needed.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create Auth Context for MSFT Azure AD:
    AuthenticationContext authContext = new AuthenticationContext("https://login.microsoftonline.com/{Azure AD Tenant ID or name}");

    // Acquire user token for the interactive user for Azure Data Explorer:
    AuthenticationResult result =
        authContext.AcquireTokenAsync(resource, "<your client app ID>", new Uri(@"<your client app URI>"),
                                        new PlatformParameters(PromptBehavior.Auto), UserIdentifier.AnyUser, "prompt=select_account").Result;
    return result.AccessToken;
}

Recuperare le risorse di inserimento Esplora dati di AzureRetrieve Azure Data Explorer ingestion resources

Creare manualmente una richiesta HTTP POST al servizio Gestione dati, richiedendo la restituzione delle risorse di inserimento.Manually construct an HTTP POST request to the Data Management service, requesting the return of the ingestion resources. Queste risorse includono le code su cui è in ascolto il servizio DM e i contenitori BLOB per il caricamento dei dati.These resources include queues that the DM service is listening on, and blob containers for data uploading. Il servizio Gestione dati elaborerà tutti i messaggi che contengono richieste di inserimento che arrivano a una di queste code.The Data Management service will process any messages containing ingestion requests that arrive on one of those queues.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified Azure Data Explorer Ingestion service using supplied Access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    string requestBody = $"{{ \"csl\": \".get ingestion resources\" }}";

    IngestionResourcesSnapshot ingestionResources = new IngestionResourcesSnapshot();

    using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
    using (StreamReader sr = new StreamReader(response.GetResponseStream()))
    using (JsonTextReader jtr = new JsonTextReader(sr))
    {
        JObject responseJson = JObject.Load(jtr);
        IEnumerable<JToken> tokens;

        // Input queues
        tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
        foreach (var token in tokens)
        {
            ingestionResources.IngestionQueues.Add((string) token[1]);
        }

        // Temp storage containers
        tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
        foreach (var token in tokens)
        {
            ingestionResources.TempStorageContainers.Add((string)token[1]);
        }

        // Failure notifications queue
        var singleToken =
            responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
        ingestionResources.FailureNotificationsQueue = (string)singleToken;

        // Success notifications queue
        singleToken =
            responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
        ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    }

    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    WebRequest request = WebRequest.Create(uriString);

    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");

    Stream bodyStream = request.GetRequestStream();
    using (StreamWriter sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }

    bodyStream.Close();
    return request.GetResponse();
}

Ottenere un token di identità kustoObtain a Kusto identity token

I messaggi di inserimento vengono passati ad Azure Esplora dati tramite un canale non diretto (coda di Azure), rendendo impossibile la convalida delle autorizzazioni in banda per accedere al servizio di inserimento Esplora dati di Azure.Ingest messages are handed off to Azure Data Explorer via a non-direct channel (Azure queue), making it impossible to do in-band authorization validation for accessing the Azure Data Explorer ingestion service. La soluzione consiste nel aggiungere un token di identità a ogni messaggio di inserimento.The solution is to attach an identity token to every ingest message. Il token Abilita la convalida delle autorizzazioni in banda.The token enables in-band authorization validation. Questo token firmato può quindi essere convalidato dal servizio Esplora dati di Azure quando riceve il messaggio di inserimento.This signed token can then be validated by the Azure Data Explorer service when it receives the ingestion message.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    string requestBody = $"{{ \"csl\": \".get kusto identity token\" }}";
    string jsonPath = "Tables[0].Rows[*].[0]";

    using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
    using (StreamReader sr = new StreamReader(response.GetResponseStream()))
    using (JsonTextReader jtr = new JsonTextReader(sr))
    {
        JObject responseJson = JObject.Load(jtr);
        JToken identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();

        return ((string)identityToken);
    }
}

Caricare dati nel contenitore BLOB di AzureUpload data to the Azure Blob container

Questo passaggio riguarda il caricamento di un file locale in un BLOB di Azure che verrà passato per l'inserimento.This step is about uploading a local file to an Azure Blob that will be handed off for ingestion. Questo codice USA Azure Storage SDK.This code uses the Azure Storage SDK. Se la dipendenza non è possibile, è possibile ottenerla con l' API REST del servizio BLOB di Azure.If dependency isn't possible, it can be achieved with Azure Blob Service REST API.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string containerName, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    CloudBlobContainer blobContainer = new CloudBlobContainer(blobUri);
    CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(blobName);

    using (Stream stream = File.OpenRead(filePath))
    {
        blockBlob.UploadFromStream(stream);
        blobSize = blockBlob.Properties.Length;
    }

    return string.Format("{0}{1}", blockBlob.Uri.AbsoluteUri, blobUri.Query);
}

Comporre il messaggio di inserimento del Esplora dati di AzureCompose the Azure Data Explorer ingestion message

Il NewtonSoft.JSnel pacchetto comporrà nuovamente una richiesta di inserimento valida per identificare il database e la tabella di destinazione e che punta al BLOB.The NewtonSoft.JSON package will again compose a valid ingestion request to identify the target database and table, and that points to the blob. Il messaggio verrà inserito nella coda di Azure su cui è in ascolto il servizio Gestione dati di Kusto pertinente.The message will be posted to the Azure Queue that the relevant Kusto Data Management service is listening on.

Di seguito sono riportati alcuni punti da considerare.Here are some points to consider.

  • Questa richiesta è il minimo indispensabile per il messaggio di inserimento.This request is the bare minimum for the ingestion message.

Nota

Il token di identità è obbligatorio e deve far parte dell'oggetto AdditionalProperties JSON.The identity token is mandatory and must be part of the AdditionalProperties JSON object.

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject();

    message.Add("Id", Guid.NewGuid().ToString());
    message.Add("BlobPath", dataUri);
    message.Add("RawDataSize", blobSizeBytes);
    message.Add("DatabaseName", db);
    message.Add("TableName", table);
    message.Add("RetainBlobOnSuccess", true);   // Do not delete the blob on success
    message.Add("FlushImmediately", true);      // Do not aggregate
    message.Add("ReportLevel", 2);              // Report failures and successes (might incur perf overhead)
    message.Add("ReportMethod", 0);             // Failures are reported to an Azure Queue

    message.Add("AdditionalProperties", new JObject(
                                            new JProperty("authorizationContext", identityToken),
                                            new JProperty("jsonMappingReference", mappingRef),
                                            // Data is in JSON format
                                            new JProperty("format", "json")));
    return message.ToString();
}

Inserire il messaggio di inserimento del Esplora dati di Azure nella coda di inserimento Esplora dati di AzurePost the Azure Data Explorer ingestion message to the Azure Data Explorer ingestion queue

Infine, pubblicare il messaggio che è stato creato nella coda di inserimento selezionata ottenuta dal Esplora dati di Azure.Finally, post the message that you constructed, to the selected ingestion queue that you obtained from Azure Data Explorer.

Nota

Le versioni del client di archiviazione .NET sotto V12, per impostazione predefinita, codificano il messaggio in Base64 per ulteriori informazioni, vedere la documentazione di archiviazione. Se si usano le versioni del client di archiviazione .NET sopra V12, è necessario codificare correttamente il contenuto del messaggio..Net storage client versions below v12, by default, encode the message to base64 For more information, see storage docs. If you are using .Net storage client versions above v12, you must properly encode the message content.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
    CloudQueueMessage queueMessage = new CloudQueueMessage(message);

    queue.AddMessage(queueMessage, null, null, null, null);
}

Verificare la presenza di messaggi di errore dalla coda di AzureCheck for error messages from the Azure queue

Dopo l'inserimento, vengono verificati i messaggi di errore dalla coda pertinente in cui il Gestione dati scrive.After ingestion, we check for failure messages from the relevant queue that the Data Management writes to. Per ulteriori informazioni sulla struttura dei messaggi di errore, vedere la struttura del messaggio di erroredi inserimento.For more information on the failure message structure, see Ingestion failure message structure.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    List<string> messages = Enumerable.Empty<string>().ToList();
    CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.GetMessages(count);
    foreach (var m in messagesFromQueue)
    {
        messages.Add(m.AsString);
        queue.DeleteMessage(m);
    }

    return messages;
}

Messaggi di inserimento-formati di documento JSONIngestion messages - JSON document formats

Struttura interna del messaggio di inserimentoIngestion message internal structure

Il messaggio che il servizio kusto Gestione dati prevede di leggere dalla coda di Azure di input è un documento JSON nel formato seguente.The message that the Kusto Data Management service expects to read from the input Azure Queue is a JSON document in the following format.

{
    "Id" : "<Id>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
ProprietàProperty DescrizioneDescription
IDId Identificatore del messaggio (GUID)Message identifier (GUID)
BlobPathBlobPath Percorso (URI) del BLOB, inclusa la chiave di firma di accesso condiviso che concede le autorizzazioni di Esplora dati di Azure per la lettura, la scrittura e l'eliminazione.Path (URI) to the blob, including the SAS key granting Azure Data Explorer permissions to read/write/delete it. Sono necessarie autorizzazioni in modo che Azure Esplora dati possa eliminare il BLOB dopo aver completato l'inserimento dei datiPermissions are required so that Azure Data Explorer can delete the blob once it has completed ingesting the data
RawDataSizeRawDataSize Dimensioni in byte dei dati non compressi.Size of the uncompressed data in bytes. Se si specifica questo valore, Esplora dati di Azure può ottimizzare l'inserimento, aggregando potenzialmente più BLOB.Providing this value enables Azure Data Explorer to optimize ingestion by potentially aggregating multiple blobs. Questa proprietà è facoltativa, ma se non viene specificata, Azure Esplora dati accederà al BLOB solo per recuperare le dimensioniThis property is optional, but if not given, Azure Data Explorer will access the blob just to retrieve the size
DatabaseNameDatabaseName Nome del database di destinazioneTarget database name
TableNameTableName Nome tabella di destinazioneTarget table name
RetainBlobOnSuccessRetainBlobOnSuccess Se impostato su true , il BLOB non verrà eliminato al termine dell'inserimento.If set to true, the blob won't be deleted once ingestion is successfully completed. Il valore predefinito è falseDefault is false
FlushImmediatelyFlushImmediately Se impostato su true , qualsiasi aggregazione verrà ignorata.If set to true, any aggregation will be skipped. Il valore predefinito è falseDefault is false
ReportLevelReportLevel Livello di segnalazione esito positivo/errore: 0-errori, 1-nessuno, 2-tuttiSuccess/Error reporting level: 0-Failures, 1-None, 2-All
ReportMethodReportMethod Meccanismo di creazione di report: 0-coda, 1 tabellaReporting mechanism: 0-Queue, 1-Table
AdditionalPropertiesAdditionalProperties Proprietà aggiuntive, ad esempio format , tags e creationTime .Additional properties such as format, tags, and creationTime. Per ulteriori informazioni, vedere Proprietàdi inserimento dati.For more information, see data ingestion properties.

Struttura del messaggio di errore di inserimentoIngestion failure message structure

Il messaggio che il Gestione dati prevede di leggere dalla coda di Azure di input è un documento JSON nel formato seguente.The message that the Data Management expects to read from the input Azure Queue is a JSON document in the following format.

ProprietàProperty DescrizioneDescription
OperationIdOperationId Identificatore dell'operazione (GUID) che può essere usato per tenere traccia dell'operazione sul lato del servizioOperation identifier (GUID) that can be used to track the operation on the service side
DatabaseDatabase Nome del database di destinazioneTarget database name
TabellaTable Nome tabella di destinazioneTarget table name
FailedOnFailedOn Timestamp erroreFailure timestamp
IngestionSourceIdIngestionSourceId GUID che identifica il blocco di dati che Esplora dati di Azure non è riuscito a inserireGUID identifying the data chunk that Azure Data Explorer failed to ingest
IngestionSourcePathIngestionSourcePath Percorso (URI) del blocco di dati che Azure Esplora dati non è riuscito a inserirePath (URI) to the data chunk that Azure Data Explorer failed to ingest
DettagliDetails Messaggio di erroreFailure message
ErrorCodeErrorCode Codice di errore di Azure Esplora dati (vedere quitutti i codici di errore)Azure Data Explorer error code (see all the error codes here)
FailureStatusFailureStatus Indica se l'errore è permanente o temporaneo.Indicates whether the failure is permanent or transient
RootActivityIdRootActivityId Identificatore di correlazione di Azure Esplora dati (GUID) che può essere usato per tenere traccia dell'operazione sul lato del servizioAzure Data Explorer correlation identifier (GUID) that can be used to track the operation on the service side
OriginatesFromUpdatePolicyOriginatesFromUpdatePolicy Indica se l'errore è stato causato da un criterio di aggiornamento transazionale erratoIndicates whether the failure was caused by an erroneous transactional update policy
ShouldRetryShouldRetry Indica se l'inserimento potrebbe avere esito positivo se viene eseguito un nuovo tentativoIndicates whether the ingestion could succeed if retried as is