Inserire dati con Azure Esplora dati .NET SDKIngest data using the Azure Data Explorer .NET SDK

Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria.Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Fornisce due librerie client per .NET: una libreria di inserimento e una raccolta dati.It provides two client libraries for .NET: an ingest library and a data library. Per ulteriori informazioni su .NET SDK, vedere informazioni su .NET SDK.For more information on .NET SDK, see about .NET SDK. Queste librerie consentono di inserire (caricare) i dati in un cluster ed eseguire una query di dati dal codice.These libraries enable you to ingest (load) data into a cluster and query data from your code. In questo articolo viene innanzitutto creata una tabella e il mapping dei dati in un cluster di test.In this article, you first create a table and data mapping in a test cluster. Quindi viene accodato un inserimento nel cluster e vengono convalidati i risultati.You then queue an ingestion to the cluster and validate the results.

PrerequisitiPrerequisites

Installare la libreria di inserimentoInstall the ingest library

Install-Package Microsoft.Azure.Kusto.Ingest

Aggiungere l'autenticazione e costruire la stringa di connessioneAdd authentication and construct connection string

AuthenticationAuthentication

Per autenticare un'applicazione, Azure Esplora dati SDK usa l'ID tenant di AAD.To authenticate an application, Azure Data Explorer SDK uses your AAD tenant ID. Per trovare l'ID del tenant, usare l'URL seguente, sostituendo il dominio a YourDomain.To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Ad esempio, se il dominio è contoso.com, l'URL è: https://login.windows.net/contoso.com/.well-known/openid-configuration/.For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Fare clic su questo URL per visualizzare i risultati; la prima riga è come indicato di seguito.Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Il tenant ID in questo caso è 6babcaad-604b-40ac-a9d7-9fd97c0b779f.The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Questo esempio usa un'autenticazione utente AAD interattiva per accedere al cluster.This example uses an interactive AAD user authentication to access the cluster. È anche possibile usare l'autenticazione dell'applicazione AAD con un certificato o un segreto dell'applicazione.You can also use AAD application authentication with certificate or application secret. Assicurarsi di impostare i valori corretti per tenantId e clusterUri prima di eseguire il codice.Make sure to set the correct values for tenantId and clusterUri before running this code.

Azure Esplora dati SDK offre un modo pratico per configurare il metodo di autenticazione come parte della stringa di connessione.Azure Data Explorer SDK provides a convenient way to set up the authentication method as part of the connection string. Per la documentazione completa sulle stringhe di connessione Esplora dati di Azure, vedere stringhe di connessione.For complete documentation on Azure Data Explorer connection strings, see connection strings.

Nota

La versione corrente dell'SDK non supporta l'autenticazione UER interattiva in .NET Core.The current version of the SDK doesn't support interactive uer authentication on .NET Core. Se necessario, usare invece il nome utente/password AAD o l'autenticazione dell'applicazione.If required, use AAD username/password or application authentication instead.

Costruire la stringa di connessioneConstruct the connection string

A questo punto è possibile costruire la stringa di connessione di Azure Esplora dati.Now you can construct the Azure Data Explorer connection string. La tabella e il mapping di destinazione vengono creati in un passaggio successivo.You will create the destination table and mapping in a later step.

var tenantId = "<TenantId>";
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";

var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Impostare le informazioni sul file di origineSet source file information

Impostare il percorso del file di origine.Set the path for the source file. Questo esempio usa un file di esempio ospitato nell'archiviazione BLOB di Azure.This example uses a sample file hosted on Azure Blob Storage. Il set di dati di esempio StormEvents contiene dati relativi al meteo dei centri nazionali per informazioni ambientali.The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

var blobPath = "https://kustosamplefiles.blob.core.windows.net/samplefiles/StormEvents.csv?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";

Creare una tabella nel cluster di provaCreate a table on your test cluster

Creare una tabella denominata StormEvents che corrisponda allo schema dei dati nel file StormEvents.csv.Create a table named StormEvents that matches the schema of the data in the StormEvents.csv file.

Suggerimento

I frammenti di codice seguenti consentono di creare un'istanza di un client per quasi tutte le chiamate.The following code snippets create an instance of a client for almost every call. Questa operazione viene eseguita per rendere ogni frammento eseguibile singolarmente.This is done to make each snippet individually runnable. In produzione, le istanze client sono rientranti e devono essere mantenute fino a quando necessario.In production, the client instances are reentrant, and should be kept as long as needed. È sufficiente una singola istanza client per URI, anche quando si utilizzano più database (è possibile specificare il database a livello di comando).A single client instance per URI is sufficient, even when working with multiple databases (database can be specified on a command level).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definire il mapping di inserimentoDefine ingestion mapping

Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna usati durante la creazione della tabella.Map the incoming CSV data to the column names used when creating the table. Eseguire il provisioning di un oggetto mapping colonne CSV in tale tabella.Provision a CSV column mapping object on that table.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definire i criteri di invio in batch per la tabellaDefine batching policy for your table

L'inserimento di Esplora dati di Azure esegue l'invio in batch dei dati in ingresso per ottimizzare le dimensioni della partizione di dati.Azure Data Explorer ingestion performs batching of the incoming data to optimize for data shard size. Questo processo è controllato dai criteri di invio in batch di inserimento e può essere modificato dal comando di controllo dei criteri batchdi inserimento.This process is controlled by the ingestion batching policy and can be modified by the ingestion batching policy control command. Usare questo criterio per ridurre la latenza dei dati in arrivo lentamente.Use this policy to reduce latency of slowly arriving data.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        table,
        new IngestionBatchingPolicy(maximumBatchingTimeSpan: TimeSpan.FromSeconds(10.0), maximumNumberOfItems: 100, maximumRawDataSizeMB: 1024));

    kustoClient.ExecuteControlCommand(command);
}

Accodare un messaggio per l'inserimentoQueue a message for ingestion

Accodare un messaggio per eseguire il pull dei dati dall'archiviazione BLOB e inserire i dati in Esplora dati di Azure.Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer. Viene stabilita una connessione all'endpoint di inserimento dati del cluster di Azure Esplora dati e viene creato un altro client per lavorare con tale endpoint.A connection is established to the data ingestion endpoint of the Azure Data Explorer cluster, and another client is created to work with that endpoint.

Suggerimento

I frammenti di codice seguenti consentono di creare un'istanza di un client per quasi tutte le chiamate.The following code snippets create an instance of a client for almost every call. Questa operazione viene eseguita per rendere ogni frammento eseguibile singolarmente.This is done to make each snippet individually runnable. In produzione, le istanze client sono rientranti e devono essere mantenute fino a quando necessario.In production, the client instances are reentrant, and should be kept as long as needed. È sufficiente una singola istanza client per URI, anche quando si utilizzano più database (è possibile specificare il database a livello di comando).A single client instance per URI is sufficient, even when working with multiple databases (database can be specified on a command level).

var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);

using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder))
{
    var properties =
        new KustoQueuedIngestionProperties(database, table)
        {
            Format = DataSourceFormat.csv,
            IngestionMapping = new IngestionMapping()
            { 
                IngestionMappingReference = tableMapping,
                IngestionMappingKind = IngestionMappingKind.Csv
            },
            IgnoreFirstRecord = true
        };

    ingestClient.IngestFromStorageAsync(blobPath, ingestionProperties: properties).GetAwaiter().GetResult();
}

Verificare che i dati siano stati inseriti nella tabellaValidate data was ingested into the table

Attendere da cinque a dieci minuti per l'inserimento in coda per pianificare l'inserimento e caricare i dati in Esplora dati di Azure.Wait five to ten minutes for the queued ingestion to schedule the ingestion and load the data into Azure Data Explorer. Eseguire quindi il codice seguente per ottenere il numero di record nella tabella StormEvents.Then run the following code to get the count of records in the StormEvents table.

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
{
    var query = $"{table} | count";

    var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
    Console.WriteLine(results.Single());
}

Eseguire query sulla risoluzione dei problemiRun troubleshooting queries

Accedere a https://dataexplorer.azure.com e connettersi al cluster.Sign in to https://dataexplorer.azure.com and connect to your cluster. Eseguire il comando seguente nel database per verificare la presenza di eventuali errori di inserimento nelle ultime quattro ore.Run the following command in your database to see if there were any ingestion failures in the last four hours. Sostituire il nome del database prima dell'esecuzione.Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Eseguire il comando seguente per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore.Run the following command to view the status of all ingestion operations in the last four hours. Sostituire il nome del database prima dell'esecuzione.Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Pulire le risorseClean up resources

Se si prevede di seguire gli altri articoli, è necessario salvare le risorse create.If you plan to follow our other articles, keep the resources you created. In caso contrario, eseguire il comando seguente nel database per pulire la tabella StormEvents.If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

Passaggi successiviNext steps