Прием данных с помощью пакета SDK Kusto для .NET

Существует две клиентские библиотеки для .NET: библиотека приема и библиотека данных. Дополнительные сведения о пакете SDK .NET см. в разделе Сведения о пакете SDK .NET. Они позволяют принимать (загружать) данные в кластер и запрашивать данные из кода. В этой статье вы сначала создадите таблицу и сопоставление данных в тестовом кластере. Затем вы поставите в очередь прием данных в кластер и проверите результаты.

Предварительные требования

Установка библиотеки приема

Install-Package Microsoft.Azure.Kusto.Ingest

Добавление проверки подлинности и создание строки подключения

Аутентификация

Для проверки подлинности приложения пакет SDK использует идентификатор клиента Microsoft Entra. Чтобы найти идентификатор клиента, используйте следующий URL-адрес, заменив домен на имя_вашего_домена.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Например, если ваш домен называется contoso.com, URL-адрес будет следующим: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Щелкните этот URL-адрес, чтобы просмотреть результаты. Первая строка выглядит следующим образом:

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

В данном случае идентификатор клиента — 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

В этом примере используется интерактивная Microsoft Entra проверка подлинности пользователя для доступа к кластеру. Вы также можете использовать Microsoft Entra проверку подлинности приложения с помощью сертификата или секрета приложения. Обязательно задайте правильные значения для tenantId и clusterUri перед выполнением этого кода.

Пакет SDK предоставляет удобный способ настройки метода проверки подлинности в рамках строка подключения. Полную документацию по строкам подключения см. в разделе Строки подключения.

Примечание

Текущая версия пакета SDK не поддерживает интерактивную проверку подлинности пользователя в .NET Core. При необходимости используйте Microsoft Entra имя пользователя и пароль или проверку подлинности приложения.

Создание строки подключения

Теперь можно создать строка подключения. Целевую таблицу и сопоставление вы создадите позднее.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Определение данных исходного файла

Указание пути к исходному файлу. В этом примере используется пример файла, размещенный в хранилище BLOB-объектов Azure. Пример набора данных StormEvents содержит данные, связанные с погодой, из Национальных центров экологической информации.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Создание таблицы в тестовом кластере

Создайте таблицу с именем StormEvents, которая соответствует схеме данных в файле StormEvents.csv.

Совет

Следующие фрагменты кода создают экземпляр клиента почти для каждого вызова. Это делается для того, чтобы каждый фрагмент можно было запускать отдельно. В рабочей среде экземпляры клиента можно использовать повторно, они могут храниться столько, сколько необходимо. Одного экземпляра клиента для каждого универсального кода ресурса достаточно даже при работе с несколькими базами данных (база данных может быть указана на уровне команд).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Определение сопоставления приема

Выполните сопоставление входящих данных CSV с именами столбцов, использованными при создании таблицы. Подготовка объекта сопоставления столбца CSV в таблице.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Определите политику пакетной обработки для таблицы.

Пакетная обработка входящих данных оптимизирует размер сегментов данных. Это процесс контролируется политикой пакетного приема данных. Измените политику с помощью команды управления политикой пакетной обработки приема. Используйте эту политику, чтобы сократить задержку медленно поступающих данных.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Мы рекомендуем определить значение Raw Data Size для принимаемых данных и постепенно уменьшить размер до 250 МБ, при этом проверяя, повысилась ли производительность.

С помощью свойства Flush Immediately пакетную обработку можно пропустить, хотя это не рекомендуется при приеме большого объема данных, так как это может привести к снижению производительности.

Отправка сообщения в очередь на прием

Поставьте сообщение в очередь, чтобы извлечь данные из хранилища BLOB-объектов и получить эти данные. Подключение устанавливается к кластеру приема, а для работы с этой конечной точкой создается другой клиент.

Совет

Следующие фрагменты кода создают экземпляр клиента почти для каждого вызова. Это делается для того, чтобы каждый фрагмент можно было запускать отдельно. В рабочей среде экземпляры клиента можно использовать повторно, они могут храниться столько, сколько необходимо. Одного экземпляра клиента для каждого универсального кода ресурса достаточно даже при работе с несколькими базами данных (база данных может быть указана на уровне команд).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Проверка получения данных в таблицу

Подождите от пяти до десяти минут, пока прием в очереди запланировать прием и загрузить данные в кластер. Затем выполните следующий код, чтобы получить количество записей в таблице StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Выполнение запросов по устранению неполадок

Войдите в https://dataexplorer.azure.com и подключитесь к кластеру. Выполните в своей базе данных следующую команду, чтобы проверить, не было ли в ней сбоев приема за последние четыре часа. Замените имя базы данных перед запуском.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Выполните следующую команду, чтобы узнать состояние всех операций приема за последние четыре часа. Замените имя базы данных перед запуском.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Очистка ресурсов

Если вы планируете следить за другими нашими статьями, сохраните созданные вами ресурсы. В противном случае выполните в своей базе данных следующую команду, чтобы очистить таблицу StormEvents.

.drop table StormEvents