Ingerir dados utilizando o Azure Data Explorer .NET SDKIngest data using the Azure Data Explorer .NET SDK

O Azure Data Explorer é um serviço de exploração de dados rápido e altamente dimensionável para dados telemétricos e de registo.Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Fornece duas bibliotecas de clientes para .NET: uma biblioteca de ingerir e uma biblioteca de dados.It provides two client libraries for .NET: an ingest library and a data library. Para obter mais informações sobre .NET SDK, consulte cerca de .NET SDK.For more information on .NET SDK, see about .NET SDK. Estas bibliotecas permitem ingerir (carregar) dados para um cluster e consultar dados a partir do código.These libraries enable you to ingest (load) data into a cluster and query data from your code. Neste artigo, cria-se primeiro uma tabela e mapeamento de dados num cluster de teste.In this article, you first create a table and data mapping in a test cluster. Em seguida, faça fila para a ingestão no cluster e valide os resultados.You then queue an ingestion to the cluster and validate the results.

Pré-requisitosPrerequisites

Instalar a biblioteca de ingerirInstall the ingest library

Install-Package Microsoft.Azure.Kusto.Ingest

Adicione a autenticação e construa a cadeia de ligaçãoAdd authentication and construct connection string

AutenticaçãoAuthentication

Para autenticar uma aplicação, a Azure Data Explorer SDK utiliza o seu ID do inquilino AAD.To authenticate an application, Azure Data Explorer SDK uses your AAD tenant ID. Para localizar o ID de inquilino, utilize o seguinte URL, substituindo o domínio pelo SeuDomínio.To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o seu domínio for contoso.com, o URL é: https://login.windows.net/contoso.com/.well-known/openid-configuration/.For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Clique neste URL para ver os resultados; a primeira linha é igual à seguinte.Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Neste caso, o ID de inquilino é 6babcaad-604b-40ac-a9d7-9fd97c0b779f.The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Este exemplo utiliza uma autenticação interativa do utilizador AAD para aceder ao cluster.This example uses an interactive AAD user authentication to access the cluster. Também pode utilizar a autenticação de aplicações AAD com certificado ou pedido secreto.You can also use AAD application authentication with certificate or application secret. Certifique-se de que define os valores corretos para tenantId e antes de executar este clusterUri código.Make sure to set the correct values for tenantId and clusterUri before running this code.

O Azure Data Explorer SDK fornece uma forma conveniente de configurar o método de autenticação como parte da cadeia de ligação.Azure Data Explorer SDK provides a convenient way to set up the authentication method as part of the connection string. Para obter documentação completa sobre as cordas de ligação do Azure Data Explorer, consulte as cordas de ligação.For complete documentation on Azure Data Explorer connection strings, see connection strings.

Nota

A versão atual do SDK não suporta a autenticação interativa uer em .NET Core.The current version of the SDK doesn't support interactive uer authentication on .NET Core. Se necessário, utilize o nome de utilizador/palavra-passe da AAD ou a autenticação da aplicação.If required, use AAD username/password or application authentication instead.

Construa a cadeia de ligaçãoConstruct the connection string

Agora pode construir a cadeia de ligação Azure Data Explorer.Now you can construct the Azure Data Explorer connection string. Você vai criar a mesa de destino e mapeamento em um passo posterior.You will create the destination table and mapping in a later step.

var tenantId = "<TenantId>";
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";

var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Definir as informações do ficheiro de origemSet source file information

Desacorda o caminho para o ficheiro de origem.Set the path for the source file. Este exemplo utiliza um ficheiro de exemplo alojado no Armazenamento de Blobs do Azure.This example uses a sample file hosted on Azure Blob Storage. O conjunto de dados de exemplo StormEvents contém dados relacionados com Meteorologia dos Centros Nacionais de Informações Ambientais.The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

var blobPath = "https://kustosamplefiles.blob.core.windows.net/samplefiles/StormEvents.csv?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";

Criar uma tabela no cluster de testeCreate a table on your test cluster

Crie uma tabela com o nome StormEvents que corresponda ao esquema dos dados do StormEvents.csv ficheiro.Create a table named StormEvents that matches the schema of the data in the StormEvents.csv file.

Dica

Os seguintes códigos de corte criam uma instância de um cliente para quase todas as chamadas.The following code snippets create an instance of a client for almost every call. Isto é feito para tornar cada corte individualmente runnable.This is done to make each snippet individually runnable. Na produção, as instâncias de cliente são reentrantes, e devem ser mantidas o tempo que for necessário.In production, the client instances are reentrant, and should be kept as long as needed. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com várias bases de dados (a base de dados pode ser especificada num nível de comando).A single client instance per URI is sufficient, even when working with multiple databases (database can be specified on a command level).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definir o mapeamento de ingestãoDefine ingestion mapping

Mapear os dados CSV de entrada para os nomes das colunas utilizados na criação da tabela.Map the incoming CSV data to the column names used when creating the table. Fornece um objeto de mapeamento de coluna CSV naquela mesa.Provision a CSV column mapping object on that table.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Defina a política de loteamento para a sua mesaDefine batching policy for your table

A ingestão Azure Data Explorer executa o lote dos dados de entrada para otimizar para o tamanho do fragmento de dados.Azure Data Explorer ingestion performs batching of the incoming data to optimize for data shard size. Este processo é controlado pela política de loteamento de ingestão e pode ser modificado pelo comando de controlo da política de loteamento de ingestão.This process is controlled by the ingestion batching policy and can be modified by the ingestion batching policy control command. Utilize esta política para reduzir a latência dos dados que chegam lentamente.Use this policy to reduce latency of slowly arriving data.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        table,
        new IngestionBatchingPolicy(maximumBatchingTimeSpan: TimeSpan.FromSeconds(10.0), maximumNumberOfItems: 100, maximumRawDataSizeMB: 1024));

    kustoClient.ExecuteControlCommand(command);
}

Colocar uma mensagem em fila para ingestãoQueue a message for ingestion

Coloque uma mensagem em fila para extrair dados do armazenamento de blobs e ingerir esses dados para o Azure Data Explorer.Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer. É estabelecida uma ligação ao ponto final de ingestão de dados do cluster Azure Data Explorer, e outro cliente é criado para trabalhar com esse ponto final.A connection is established to the data ingestion endpoint of the Azure Data Explorer cluster, and another client is created to work with that endpoint.

Dica

Os seguintes códigos de corte criam uma instância de um cliente para quase todas as chamadas.The following code snippets create an instance of a client for almost every call. Isto é feito para tornar cada corte individualmente runnable.This is done to make each snippet individually runnable. Na produção, as instâncias de cliente são reentrantes, e devem ser mantidas o tempo que for necessário.In production, the client instances are reentrant, and should be kept as long as needed. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com várias bases de dados (a base de dados pode ser especificada num nível de comando).A single client instance per URI is sufficient, even when working with multiple databases (database can be specified on a command level).

var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);

using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder))
{
    var properties =
        new KustoQueuedIngestionProperties(database, table)
        {
            Format = DataSourceFormat.csv,
            IngestionMapping = new IngestionMapping()
            { 
                IngestionMappingReference = tableMapping,
                IngestionMappingKind = IngestionMappingKind.Csv
            },
            IgnoreFirstRecord = true
        };

    ingestClient.IngestFromStorageAsync(blobPath, ingestionProperties: properties).GetAwaiter().GetResult();
}

Os dados validados foram ingeridos na tabelaValidate data was ingested into the table

Aguarde cinco a dez minutos para a ingestão em fila para agendar a ingestão e carregar os dados no Azure Data Explorer.Wait five to ten minutes for the queued ingestion to schedule the ingestion and load the data into Azure Data Explorer. Em seguida, execute o seguinte código para obter a contagem de registos na tabela StormEvents.Then run the following code to get the count of records in the StormEvents table.

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
{
    var query = $"{table} | count";

    var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
    Console.WriteLine(results.Single());
}

Executar consultas de resolução de problemasRun troubleshooting queries

Inscreva-se https://dataexplorer.azure.com e ligue-se ao seu cluster.Sign in to https://dataexplorer.azure.com and connect to your cluster. Execute o seguinte comando na base de dados para ver se ocorreram quaisquer falhas de ingestão nas últimas quatro horas.Run the following command in your database to see if there were any ingestion failures in the last four hours. Substitua o nome da base de dados antes de executar.Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para ver o estado de todas as operações de ingestão nas últimas quatro horas.Run the following command to view the status of all ingestion operations in the last four hours. Substitua o nome da base de dados antes de executar.Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpar os recursosClean up resources

Se planeia seguir os nossos outros artigos, mantenha os recursos que criou.If you plan to follow our other articles, keep the resources you created. Caso contrário, execute o seguinte comando na base de dados para limpar a tabela StormEvents.If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

Passos seguintesNext steps