Ingerir dados utilizando a biblioteca Python do Explorador de Dados AzureIngest data using the Azure Data Explorer Python library

Neste artigo, ingere dados utilizando a biblioteca Azure Data Explorer Python.In this article, you ingest data using the Azure Data Explorer Python library. O Azure Data Explorer é um serviço de exploração de dados rápido e altamente dimensionável para dados telemétricos e de registo.Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. O Azure Data Explorer fornece duas bibliotecas de cliente para o Python: uma biblioteca de ingestão e uma biblioteca de dados.Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. Estas bibliotecas permitem-lhe ingerir, ou carregar, dados num cluster e consultar dados do seu código.These libraries enable you to ingest, or load, data into a cluster and query data from your code.

Primeiro, crie uma tabela e mapeamento de dados num cluster.First, create a table and data mapping in a cluster. Em seguida, vai colocar em fila a ingestão para o cluster e validar os resultados.You then queue ingestion to the cluster and validate the results.

Este artigo também está disponível como um Caderno Azure.This article is also available as an Azure Notebook.

Pré-requisitosPrerequisites

Instalar as bibliotecas de dados e de ingestãoInstall the data and ingest libraries

Instalar azure-kusto-data and azure-kusto-ingest.Install azure-kusto-data and azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Adicionar declarações e constantes de importaçãoAdd import statements and constants

Importações de dados azure-kusto.Import classes from azure-kusto-data.

from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Para autenticar uma aplicação, o Azure Data Explorer utiliza o ID de inquilino do AAD.To authenticate an application, Azure Data Explorer uses your AAD tenant ID. Para localizar o ID de inquilino, utilize o seguinte URL, substituindo o domínio pelo SeuDomínio.To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o seu domínio for contoso.com, o URL é: https://login.windows.net/contoso.com/.well-known/openid-configuration/.For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Clique neste URL para ver os resultados; a primeira linha é igual à seguinte.Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Neste caso, o ID de inquilino é 6babcaad-604b-40ac-a9d7-9fd97c0b779f.The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Defina os valores de AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI e KUSTO_DATABASE antes de executar este código.Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_DATABASE = "<DatabaseName>"

Agora construa a cadeia de ligação.Now construct the connection string. Este exemplo utiliza a autenticação do dispositivo para aceder ao cluster.This example uses device authentication to access the cluster. Também pode utilizar o certificado de aplicação AAD, a chave de aplicação AAD,e o utilizador e a palavra-passe da AAD.You can also use AAD application certificate, AAD application key, and AAD user and password.

Irá criar a tabela de destino e o mapeamento num passo posterior.You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Definir as informações do ficheiro de origemSet source file information

Importe classes adicionais e defina as constantes para o ficheiro de origem de dados.Import additional classes and set constants for the data source file. Este exemplo utiliza um ficheiro de exemplo alojado no Armazenamento de Blobs do Azure.This example uses a sample file hosted on Azure Blob Storage. O conjunto de dados de exemplo StormEvents contém dados relacionados com Meteorologia dos Centros Nacionais de Informações Ambientais.The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.storage.blob import BlockBlobService
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Crie uma mesa no seu clusterCreate a table on your cluster

Crie uma tabela que corresponda ao esquema dos dados no ficheiro StormEvents.csv.Create a table that matches the schema of the data in the StormEvents.csv file. Quando este código é executado, devolve uma mensagem semelhante à seguinte: Para iniciar sessão, utilize um browser para abrir a página https://microsoft.com/devicelogin e introduza o código F3W4VWZDM para autenticar.When this code runs, it returns a message like the following: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate. Siga os passos para iniciar sessão e regresse para executar o próximo bloco de código.Follow the steps to sign in, then return to run the next code block. Os blocos de código subsequentes que estabelecerem uma ligação têm de iniciar sessão novamente.Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definir o mapeamento de ingestãoDefine ingestion mapping

Mapeie os dados recebidos do CSV para os nomes de coluna e tipos de dados utilizados ao criar a tabela.Map incoming CSV data to the column names and data types used when creating the table. Isto mapeia campos de dados de origem para colunas de tabelas de destinoThis maps source data fields to destination table columns

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Colocar uma mensagem em fila para ingestãoQueue a message for ingestion

Coloque uma mensagem em fila para extrair dados do armazenamento de blobs e ingerir esses dados para o Azure Data Explorer.Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.CSV,
                                           mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Dados de consulta que foram ingeridos na tabelaQuery data that was ingested into the table

Aguarde cinco a dez minutos para que a ingestão colocada em fila agende a ingestão e carregue os dados para o Azure Data Explorer.Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. Em seguida, execute o seguinte código para obter a contagem de registos na tabela StormEvents.Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Executar consultas de resolução de problemasRun troubleshooting queries

Inscreva-se https://dataexplorer.azure.com e ligue-se ao seu cluster.Sign in to https://dataexplorer.azure.com and connect to your cluster. Execute o seguinte comando na base de dados para ver se ocorreram quaisquer falhas de ingestão nas últimas quatro horas.Run the following command in your database to see if there were any ingestion failures in the last four hours. Substitua o nome da base de dados antes de executar.Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para ver o estado de todas as operações de ingestão nas últimas quatro horas.Run the following command to view the status of all ingestion operations in the last four hours. Substitua o nome da base de dados antes de executar.Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpar recursosClean up resources

Se planeia seguir os nossos outros artigos, guarde os recursos que criou.If you plan to follow our other articles, keep the resources you created. Caso contrário, execute o seguinte comando na base de dados para limpar a tabela StormEvents.If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

Passos seguintesNext steps