Ingesta de datos mediante la biblioteca de Python de Azure Data ExplorerIngest data using the Azure Data Explorer Python library

El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro.Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. El Explorador de datos de Azure proporciona dos bibliotecas cliente para Python: una biblioteca de ingesta y una biblioteca de datos.Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. Estas bibliotecas permiten ingerir (cargar) datos en un clúster y consultar datos desde el código.These libraries enable you to ingest (load) data into a cluster and query data from your code. En este artículo, primero creará una tabla y la asignación de datos en un clúster.In this article, you first create a table and data mapping in a cluster. A continuación, pondrá en cola la ingesta en el clúster y validará los resultados.You then queue ingestion to the cluster and validate the results.

Este artículo también está disponible como Azure Notebook.This article is also available as an Azure Notebook.

Requisitos previosPrerequisites

Instalación de los datos y las bibliotecas de ingestaInstall the data and ingest libraries

Instale azure-kusto-data y azure-kusto-ingest.Install azure-kusto-data and azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Incorporación de instrucciones de importación y constantesAdd import statements and constants

Importación de clases azure-kusto-data.Import classes from azure-kusto-data.

from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Para autenticar una aplicación, el Explorador de datos de Azure usa el identificador del inquilino de AAD.To authenticate an application, Azure Data Explorer uses your AAD tenant ID. Para buscar el identificador de inquilino, use la dirección URL siguiente, sustituyendo su dominio por SuDominio.To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Por ejemplo, si el nombre de dominio es contoso.com, la dirección URL es: https://login.windows.net/contoso.com/.well-known/openid-configuration/.For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Haga clic en esta dirección URL para ver los resultados. la primera línea es como sigue.Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

En este caso es el id. de inquilino es 6babcaad-604b-40ac-a9d7-9fd97c0b779f.The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Establezca los valores para AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI y KUSTO_DATABASE antes de ejecutar este código.Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_DATABASE = "<DatabaseName>"

Ahora, cree la cadena de conexión.Now construct the connection string. En este ejemplo se utiliza la autenticación de dispositivos para acceder al clúster.This example uses device authentication to access the cluster. También puede usar el certificado de la aplicación de AAD, la clave de aplicación de AAD y el usuario y la contraseña de AAD.You can also use AAD application certificate, AAD application key, and AAD user and password.

Creará la tabla de destino y la asignación en un paso posterior.You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Definición de la información del archivo de origenSet source file information

Importe clases adicionales y defina las constantes para el archivo de origen de datos.Import additional classes and set constants for the data source file. Este ejemplo utiliza un archivo de ejemplo hospedado en Azure Blob Storage.This example uses a sample file hosted on Azure Blob Storage. El conjunto de datos de ejemplo de StormEvents contiene datos relacionados con el tiempo de los National Centers for Environmental Information.The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.storage.blob import BlockBlobService
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Creación de una tabla en el clústerCreate a table on your cluster

Cree una tabla que coincida con el esquema de los datos en el archivo StormEvents.csv.Create a table that matches the schema of the data in the StormEvents.csv file. Cuando se ejecuta este código, devuelve un mensaje como el siguiente: Para iniciar sesión, use un explorador web para abrir la página https://microsoft.com/devicelogin y escriba el código F3W4VWZDM para realizar la autenticación.When this code runs, it returns a message like the following: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate. Siga los pasos para iniciar sesión y, a continuación, vuelva a ejecutar el siguiente bloque de código.Follow the steps to sign in, then return to run the next code block. Los bloques de código subsiguientes que establecen una conexión requieren que vuelva a iniciar sesión.Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definición de la asignación de ingestaDefine ingestion mapping

Asigna los datos de CSV entrantes a los nombres de columna y tipos de datos utilizados al crear la tabla.Map incoming CSV data to the column names and data types used when creating the table. De esta forma se asignan los campos de datos de origen a las columnas de la tabla de destinoThis maps source data fields to destination table columns

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Colocación de un mensaje en cola para la ingestaQueue a message for ingestion

Coloca en cola un mensaje para extraer datos desde Blob Storage e introduce esos datos en el Explorador de datos de Azure.Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.csv,
                                           mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Datos de consulta que se ingirieron en la tablaQuery data that was ingested into the table

Espere entre cinco y diez minutos para que la ingesta en cola programe la ingesta y cargue los datos en el Explorador de datos de Azure.Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. A continuación, ejecute el siguiente código para obtener el recuento de registros en la tabla StormEvents.Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Ejecución de consultas de solución de problemasRun troubleshooting queries

Inicie sesión en https://dataexplorer.azure.com y conéctese al clústerSign in to https://dataexplorer.azure.com and connect to your cluster. Ejecute el siguiente comando en la base de datos para ver si se ha producido algún error de ingesta en las últimas cuatro horas.Run the following command in your database to see if there were any ingestion failures in the last four hours. Reemplace el nombre de la base de datos antes de ejecutarlo.Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Ejecute el siguiente comando para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas.Run the following command to view the status of all ingestion operations in the last four hours. Reemplace el nombre de la base de datos antes de ejecutarlo.Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpieza de recursosClean up resources

Si tiene previsto seguir nuestros otros artículos, conserve los recursos que creó.If you plan to follow our other articles, keep the resources you created. De lo contrario, ejecute el siguiente comando en la base de datos para limpiar la tabla StormEvents.If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

Pasos siguientesNext steps