Прием данных с помощью библиотеки Python в Azure Data Explorer

В этой статье вы получаете данные с помощью библиотеки Python в обозревателе данных Azure. Обозреватель данных Azure — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Обозреватель данных Azure предоставляет две клиентские библиотеки для Python: библиотеку приема и библиотеку данных. Эти библиотеки позволяют принимать или загружать данные в кластер и запрашивать данные из вашего кода.

Сначала создайте таблицу и сопоставление данных в кластере. Затем вы поставите в очередь прием данных в кластер и проверите результаты.

Предварительные требования

Установка библиотек данных и приема

Установите azure-kusto-data и azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Добавление операторов и констант импорта

Импортируйте классы из azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Для проверки подлинности приложения azure Data Explorer использует идентификатор клиента Microsoft Entra. Чтобы найти свой идентификатор клиента, используйте следующий URL-адрес, заменив свой домен на YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Например, если ваш домен называется contoso.com, URL-адрес будет следующим: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Щелкните этот URL-адрес, чтобы просмотреть результаты. Первая строка выглядит следующим образом:

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

В данном случае идентификатор клиента — 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Прежде чем выполнять этот код, задайте значения для параметров AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI и KUSTO_DATABASE.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Создайте строку подключения. В следующем примере используется проверка подлинности устройства для доступа к кластеру. Вы также можете использовать проверку подлинности управляемого удостоверения, Microsoft Entra сертификат приложения, Microsoft Entra ключ приложения, а также Microsoft Entra пользователя и пароль.

Целевую таблицу и сопоставление вы создадите позднее.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Определение данных исходного файла

Импортируйте дополнительные классы и задайте константы для исходного файла данных. В этом примере используется пример файла, размещенный в хранилище BLOB-объектов Azure. Пример набора данных StormEvents содержит данные, связанные с погодой, от Национальных центров экологической информации.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Создание таблицы в кластере

Создайте таблицу, которая соответствует схеме данных в файле StormEvents.csv. Когда этот код запускается, он возвращает сообщение, подобное следующему: Чтобы войти в систему, откройте страницу https://microsoft.com/devicelogin в веб-браузере и введите код F3W4VWZDM для аутентификации. Следуйте инструкциям по входу, а затем выполните следующий блок кода. Для выполнения последующих блоков кода, устанавливающих соединение, необходимо повторно выполнить вход.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Определение сопоставления приема

Сопоставьте входящие данные CSV с именами столбцов и типами данных, которые использовались при создании таблицы. Это позволит сопоставить поля исходных данных со столбцами целевой таблицы.

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Отправка сообщения в очередь на прием

Поставьте в очередь сообщение, чтобы получить данные из хранилища BLOB-объектов и получить эти данные в обозреватель данных Azure.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Запрос данных, принятых в таблице

Подождите 5–10 минут, пока запись в очереди не будет запланирована, и данные будут загружены в обозреватель данных Azure. Затем выполните следующий код, чтобы получить количество записей в таблице StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Выполнение запросов по устранению неполадок

Войдите в https://dataexplorer.azure.com и подключитесь к кластеру. Выполните в своей базе данных следующую команду, чтобы проверить, не было ли в ней сбоев приема за последние четыре часа. Замените имя базы данных перед запуском.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Выполните следующую команду, чтобы узнать состояние всех операций приема за последние четыре часа. Замените имя базы данных перед запуском.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Очистка ресурсов

Если вы планируете следить за другими нашими статьями, сохраните созданные вами ресурсы. В противном случае выполните в своей базе данных следующую команду, чтобы очистить таблицу StormEvents.

.drop table StormEvents

Следующий шаг