Ingerir dados com o SDK .NET do Kusto

Existem duas bibliotecas de cliente para .NET: uma biblioteca de ingestão e uma biblioteca de dados. Para obter mais informações sobre o SDK .NET, veja sobre o SDK .NET. Estas bibliotecas permitem ingerir (carregar) dados para um cluster e consultar dados a partir do código. Neste artigo, vai criar primeiro uma tabela e mapeamento de dados num cluster de teste. Em seguida, coloca em fila uma ingestão para o cluster e valida os resultados.

Pré-requisitos

  • Uma conta Microsoft ou uma identidade de utilizador Microsoft Entra. Não é necessária uma subscrição do Azure.
  • Um cluster e uma base de dados. Criar um cluster e uma base de dados.

Instalar a biblioteca de ingestão

Install-Package Microsoft.Azure.Kusto.Ingest

Adicionar autenticação e construção cadeia de ligação

Autenticação

Para autenticar uma aplicação, o SDK utiliza o seu ID de inquilino Microsoft Entra. Para localizar o ID de inquilino, utilize o seguinte URL, substituindo o domínio pelo SeuDomínio.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o seu domínio for contoso.com, o URL é: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique neste URL para ver os resultados; a primeira linha é igual à seguinte.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Neste caso, o ID de inquilino é 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Este exemplo utiliza uma autenticação interativa Microsoft Entra do utilizador para aceder ao cluster. Também pode utilizar Microsoft Entra autenticação da aplicação com o segredo do certificado ou da aplicação. Certifique-se de que define os valores corretos para tenantId e clusterUri antes de executar este código.

O SDK fornece uma forma conveniente de configurar o método de autenticação como parte do cadeia de ligação. Para obter a documentação completa sobre as cadeias de ligação, veja Cadeias de ligação.

Nota

A versão atual do SDK não suporta a autenticação interativa de utilizadores no .NET Core. Se necessário, utilize Microsoft Entra nome de utilizador/palavra-passe ou autenticação de aplicação.

Construir o cadeia de ligação

Agora pode construir o cadeia de ligação. Irá criar a tabela de destino e o mapeamento num passo posterior.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Definir as informações do ficheiro de origem

Defina o caminho para o ficheiro de origem. Este exemplo utiliza um ficheiro de exemplo alojado no Armazenamento de Blobs do Azure. O conjunto de dados de exemplo StormEvents contém dados relacionados com as condições meteorológicas dos Centros Nacionais de Informação Ambiental.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Criar uma tabela no cluster de teste

Crie uma tabela com o nome StormEvents que corresponda ao esquema dos dados no StormEvents.csv ficheiro.

Dica

Os fragmentos de código seguintes criam uma instância de um cliente para quase todas as chamadas. Isto é feito para tornar cada fragmento individualmente runnable. Na produção, as instâncias de cliente são reentrantes e devem ser mantidas o tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando trabalha com várias bases de dados (a base de dados pode ser especificada ao nível do comando).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definir o mapeamento de ingestão

Mapeie os dados CSV recebidos para os nomes de coluna utilizados ao criar a tabela. Aprovisione um objeto de mapeamento de colunas CSV nessa tabela.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definir a política de criação de lotes para a sua tabela

O batching de dados recebidos otimiza o tamanho da partição horizontal de dados, que é controlada pela política de criação de lotes de ingestão. Modifique a política com o comando de gestão de políticas de batching de ingestão. Utilize esta política para reduzir a latência de dados que chegam lentamente.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Recomendamos que defina um Raw Data Size valor para dados ingeridos e diminua incrementalmente o tamanho para 250 MB, ao mesmo tempo que verifica se o desempenho melhora.

Pode utilizar a propriedade para ignorar o Flush Immediately batching, embora não seja recomendado para ingestão em grande escala, uma vez que pode causar um mau desempenho.

Colocar uma mensagem em fila para ingestão

Coloque uma mensagem em fila para extrair dados do armazenamento de blobs e ingerir os dados. É estabelecida uma ligação ao cluster de ingestão e é criado outro cliente para trabalhar com esse ponto final.

Dica

Os fragmentos de código seguintes criam uma instância de um cliente para quase todas as chamadas. Isto é feito para tornar cada fragmento individualmente runnable. Na produção, as instâncias de cliente são reentrantes e devem ser mantidas o tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando trabalha com várias bases de dados (a base de dados pode ser especificada ao nível do comando).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Validar dados que foram ingeridos na tabela

Aguarde 5 a 10 minutos para que a ingestão em fila agende a ingestão e carregue os dados para o cluster. Em seguida, execute o seguinte código para obter a contagem de registos na tabela StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Executar consultas de resolução de problemas

Inicie sessão no https://dataexplorer.azure.com e ligue ao cluster. Execute o seguinte comando na base de dados para ver se ocorreram quaisquer falhas de ingestão nas últimas quatro horas. Substitua o nome da base de dados antes de executar.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para ver o estado de todas as operações de ingestão nas últimas quatro horas. Substitua o nome da base de dados antes de executar.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpar os recursos

Se pretender seguir os nossos outros artigos, mantenha os recursos que criou. Caso contrário, execute o seguinte comando na base de dados para limpar a tabela StormEvents.

.drop table StormEvents