Ingerir dados usando a biblioteca Python do Azure Data Explorer

Neste artigo, você ingerirá dados usando a biblioteca do Python do Azure Data Explorer. O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados de log e telemetria. O Data Explorer do Azure fornece duas bibliotecas de cliente para Python: uma biblioteca de ingestão e uma biblioteca de dados. Essas bibliotecas permitem ingerir ou carregar dados em um cluster e dados de consulta do código.

Primeiro, você criará um mapeamento de tabela e dados em um cluster. Você, em seguida, enfileira ao cluster e valida os resultados.

Pré-requisitos

Instalar os dados e bibliotecas de ingestão

Instale azure-kusto-data e azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Adicionar instruções de importação e constantes

Importar classes de azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Para autenticar um aplicativo, o Azure Data Explorer usa sua ID de locatário Microsoft Entra. Para encontrar a ID do locatário, use a seguinte URL, substituindo o domínio por YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o seu domínio for contoso.com, a URL será https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique nesta URL para ver os resultados; a primeira linha é a seguinte.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

A ID do locatário neste caso é 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Defina os valores para AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI e KUSTO_DATABASE antes de executar esse código.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Agora, construa a cadeia de caracteres de conexão. O exemplo a seguir usa a autenticação do dispositivo para acessar o cluster. Você também pode usar a autenticação de identidade gerenciada, Microsoft Entra certificado do aplicativo, Microsoft Entra chave de aplicativo e Microsoft Entra usuário e senha.

Criar a tabela de destino e o mapeamento em uma etapa posterior.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Definir informações de arquivo de origem

Importar classes adicionais e definir constantes para o arquivo de fonte de dados. Este exemplo usa um arquivo de exemplo hospedado no armazenamento de BLOBs do Azure. O conjunto de dados de exemplo StormEvents contém dados relacionados ao clima dos Centros Nacionais de Informações Ambientais.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Criar uma tabela no cluster

Criar uma tabela que corresponde ao esquema dos dados no arquivo StormEvents.csv. Quando esse código é executado, retorna uma mensagem como a seguinte: para entrar, use um navegador da Web para abrir a página https://microsoft.com/devicelogin e digite o código F3W4VWZDM para autenticar. Siga as etapas para entrar e retorne para executar o próximo bloco de código. Blocos de código subsequente que compõem uma conexão exigem que você entre novamente.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definir mapeamento de ingestão

Mapear os dados JSON de entrada para os nomes de colunas e tipos de dados usados ao criar a tabela. Isso mapeia os campos de dados de origem para colunas da tabela de destino

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Enfileirar uma mensagem para ingestão

Enfileirar uma mensagem para extrair dados do armazenamento de BLOBs e ingerir esses dados no Data Explorer do Azure.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Consultar os dados ingeridos na tabela

Aguardar de cinco a dez minutos para que a ingestão na fila agende a ingestão e carregue os dados no Azure Data Explorer. Em seguida, execute o seguinte código para obter a contagem de registros na tabela StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Executar consultas de solução de problemas

Conectar https://dataexplorer.azure.com e conectar ao seu cluster. Execute o seguinte comando no banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para exibir o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes da execução.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpar os recursos

Caso pretenda seguir nossos outros artigos, mantenha os recursos criados. Caso contrário, execute o seguinte comando no seu banco de dados para limpar a tabela StormEvents.

.drop table StormEvents

Próxima etapa