Inserire dati usando la libreria di Esplora dati di Azure per PythonIngest data using the Azure Data Explorer Python library

Questo articolo illustra come inserire i dati usando la libreria Python di Azure Esplora dati.In this article, you ingest data using the Azure Data Explorer Python library. Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria.Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Esplora dati di Azure offre due librerie client per Python: una libreria di inserimento e una libreria di dati.Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. Queste librerie consentono di inserire o caricare dati in un cluster ed eseguire query sui dati del codice.These libraries enable you to ingest, or load, data into a cluster and query data from your code.

Per prima cosa, creare una tabella e il mapping dei dati in un cluster.First, create a table and data mapping in a cluster. Quindi viene accodato l'inserimento nel cluster e vengono convalidati i risultati.You then queue ingestion to the cluster and validate the results.

Questo articolo è disponibile anche come notebook di Azure.This article is also available as an Azure Notebook.

PrerequisitiPrerequisites

Installare le librerie di dati e di inserimentoInstall the data and ingest libraries

Installare Installareo e azure-kusto-ingest.Install azure-kusto-data and azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Aggiungere le costanti e le istruzioni importAdd import statements and constants

Importare le classi da azure-kusto-data.Import classes from azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Per autenticare un'applicazione, Esplora dati di Azure usa l'ID tenant AAD.To authenticate an application, Azure Data Explorer uses your AAD tenant ID. Per trovare l'ID del tenant, usare l'URL seguente, sostituendo il dominio a YourDomain.To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Ad esempio, se il dominio è contoso.com, l'URL è: https://login.windows.net/contoso.com/.well-known/openid-configuration/.For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Fare clic su questo URL per visualizzare i risultati; la prima riga è come indicato di seguito.Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Il tenant ID in questo caso è 6babcaad-604b-40ac-a9d7-9fd97c0b779f.The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Impostare i valori per AAD_TENANT_ID KUSTO_URI, KUSTO_INGEST_URI e KUSTO_DATABASE prima di eseguire questo codice.Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_DATABASE = "<DatabaseName>"

Costruire ora la stringa di connessione.Now construct the connection string. Questo esempio usa l'autenticazione del dispositivo per accedere al cluster.This example uses device authentication to access the cluster. È anche possibile usare il certificato dell'applicazione AAD, la chiave dell'applicazione AADe l' utente e la password di AAD.You can also use AAD application certificate, AAD application key, and AAD user and password.

Creare la tabella di destinazione e il mapping in un passaggio successivo.You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Impostare le informazioni sul file di origineSet source file information

Importare le classi aggiuntive e impostare le costanti per il file di origine dati.Import additional classes and set constants for the data source file. Questo esempio usa un file di esempio ospitato nell'archiviazione BLOB di Azure.This example uses a sample file hosted on Azure Blob Storage. Il set di dati di esempio StormEvents contiene dati relativi al meteo dei centri nazionali per informazioni ambientali.The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Creare una tabella nel clusterCreate a table on your cluster

Creare una tabella che corrisponde allo schema dei dati nel file StormEvents.csv.Create a table that matches the schema of the data in the StormEvents.csv file. Quando viene eseguito questo codice, restituisce un messaggio simile al seguente: Per accedere, usare un web browser per aprire la pagina https://microsoft.com/devicelogin e immettere il codice F3W4VWZDM per l'autenticazione.When this code runs, it returns a message like the following: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate. Seguire i passaggi per l'accesso, quindi tornare a eseguire il blocco di codice successivo.Follow the steps to sign in, then return to run the next code block. I blocchi di codice successivi che stabiliscono una connessione richiedono di eseguire nuovamente l'accesso.Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definire il mapping di inserimentoDefine ingestion mapping

Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna e tipi di dati usati durante la creazione della tabella.Map incoming CSV data to the column names and data types used when creating the table. Associa in modo deterministico i campi dati di origine alle colonne della tabella di destinazioneThis maps source data fields to destination table columns

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Accodare un messaggio per l'inserimentoQueue a message for ingestion

Accodare un messaggio per eseguire il pull dei dati dall'archiviazione BLOB e inserire i dati in Esplora dati di Azure.Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.CSV,
                                           mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Eseguire query sui dati che sono stati inseriti nella tabellaQuery data that was ingested into the table

Attendere da cinque a dieci minuti per l'operazione di inserimento in coda per pianificare l'inserimento e caricare i dati in Esplora dati di Azure.Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. Eseguire quindi il codice seguente per ottenere il numero di record nella tabella StormEvents.Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Eseguire query sulla risoluzione dei problemiRun troubleshooting queries

Accedere a https://dataexplorer.azure.com e connettersi al cluster.Sign in to https://dataexplorer.azure.com and connect to your cluster. Eseguire il comando seguente nel database per verificare la presenza di eventuali errori di inserimento nelle ultime quattro ore.Run the following command in your database to see if there were any ingestion failures in the last four hours. Sostituire il nome del database prima dell'esecuzione.Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Eseguire il comando seguente per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore.Run the following command to view the status of all ingestion operations in the last four hours. Sostituire il nome del database prima dell'esecuzione.Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Pulire le risorseClean up resources

Se si prevede di seguire gli altri articoli, è necessario salvare le risorse create.If you plan to follow our other articles, keep the resources you created. In caso contrario, eseguire il comando seguente nel database per pulire la tabella StormEvents.If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

Passaggi successiviNext steps