Inserire dati con Kusto .NET SDK

Sono disponibili due librerie client per .NET: una libreria di inserimenti e una libreria dati. Per altre informazioni su .NET SDK, vedere .NET SDK. Queste librerie consentono di inserire (caricare) i dati in un cluster ed eseguire una query di dati dal codice. In questo articolo viene prima creato un mapping di tabella e dati in un cluster di test. Quindi viene accodato un inserimento nel cluster e vengono convalidati i risultati.

Prerequisiti

  • Un account Microsoft o un'identità utente di Microsoft Entra. Non è necessaria una sottoscrizione di Azure.
  • Un cluster e un database. Creare un cluster e un database.

Installare la libreria di inserimento

Install-Package Microsoft.Azure.Kusto.Ingest

Aggiungere l'autenticazione e costruire stringa di connessione

Authentication

Per autenticare un'applicazione, l'SDK usa l'ID tenant Microsoft Entra. Per trovare l'ID del tenant, usare l'URL seguente, sostituendo il dominio a YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Ad esempio, se il dominio è contoso.com, l'URL è: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Fare clic su questo URL per visualizzare i risultati; la prima riga è come indicato di seguito.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Il tenant ID in questo caso è 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

In questo esempio viene usata un'autenticazione utente interattiva Microsoft Entra per accedere al cluster. È anche possibile usare Microsoft Entra'autenticazione dell'applicazione con il certificato o il segreto dell'applicazione. Assicurarsi di impostare i valori corretti per tenantId e clusterUri prima di eseguire questo codice.

L'SDK offre un modo pratico per configurare il metodo di autenticazione come parte della stringa di connessione. Per la documentazione completa sulle stringhe di connessione, vedere Stringhe di connessione.

Nota

La versione corrente dell'SDK non supporta l'autenticazione utente interattiva in .NET Core. Se necessario, usare Microsoft Entra nome utente/password o autenticazione dell'applicazione.

Costruire la stringa di connessione

È ora possibile costruire il stringa di connessione. La tabella di destinazione e il mapping verranno creati in un passaggio successivo.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Impostare le informazioni sul file di origine

Impostare il percorso del file di origine. Questo esempio usa un file di esempio ospitato nell'archiviazione BLOB di Azure. Il set di dati di esempio StormEvents contiene dati relativi al meteo provenienti dai Centri nazionali per le informazioni ambientali.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Creare una tabella nel cluster di prova

Creare una tabella denominata StormEvents che corrisponda allo schema dei dati nel file StormEvents.csv.

Suggerimento

I frammenti di codice seguenti creano un'istanza di un client per quasi ogni chiamata. Questa operazione viene eseguita per rendere eseguibile singolarmente ogni frammento. Nell'ambiente di produzione, le istanze client sono reentrant e devono essere mantenute finché necessario. Una singola istanza client per URI è sufficiente, anche quando si usano più database (il database può essere specificato a livello di comando).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definire il mapping di inserimento

Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna usati durante la creazione della tabella. Effettuare il provisioning di un oggetto mapping di colonne CSV in tale tabella.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definire i criteri di invio in batch per la tabella

L'invio in batch dei dati in ingresso ottimizza le dimensioni della partizione dei dati, controllate dai criteri di inserimento in batch. Modificare il criterio con il comando di gestione dei criteri di inserimento in batch. Usare questo criterio per ridurre la latenza dei dati in arrivo lentamente.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

È consigliabile definire un Raw Data Size valore per i dati inseriti e ridurre in modo incrementale le dimensioni verso 250 MB, controllando se le prestazioni migliorano.

È possibile usare la proprietà per ignorare l'invio Flush Immediately in batch, anche se questa operazione non è consigliata per l'inserimento su larga scala perché può causare prestazioni scarse.

Accodare un messaggio per l'inserimento

Accodare un messaggio per eseguire il pull dei dati dall'archivio BLOB e inserire i dati. Viene stabilita una connessione al cluster di inserimento e viene creato un altro client per lavorare con tale endpoint.

Suggerimento

I frammenti di codice seguenti creano un'istanza di un client per quasi ogni chiamata. Questa operazione viene eseguita per rendere eseguibile singolarmente ogni frammento. Nell'ambiente di produzione, le istanze client sono reentrant e devono essere mantenute finché necessario. Una singola istanza client per URI è sufficiente, anche quando si usano più database (il database può essere specificato a livello di comando).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Verificare che i dati siano stati inseriti nella tabella

Attendere cinque-dieci minuti per l'inserimento in coda per pianificare l'inserimento e caricare i dati nel cluster. Eseguire quindi il codice seguente per ottenere il numero di record nella tabella StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Eseguire query sulla risoluzione dei problemi

Accedere al https://dataexplorer.azure.com e connettersi al cluster. Eseguire il comando seguente nel database per verificare la presenza di eventuali errori di inserimento nelle ultime quattro ore. Sostituire il nome del database prima dell'esecuzione.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Eseguire il comando seguente per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore. Sostituire il nome del database prima dell'esecuzione.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Pulire le risorse

Se si prevede di seguire gli altri articoli, mantenere le risorse create. In caso contrario, eseguire il comando seguente nel database per pulire la tabella StormEvents.

.drop table StormEvents