Ingérer des données à l’aide de la bibliothèque Python de l’Explorateur de données Azure

Dans cet article, vous allez ingérer des données à l’aide de la bibliothèque Python d’Azure Data Explorer. L’Explorateur de données Azure est un service d’exploration de données rapide et hautement évolutive pour les données des journaux et les données de télémétrie. L’Explorateur de données Azure fournit deux bibliothèques clientes pour Python : une bibliothèque d’ingestion et une bibliothèque de données. Ces bibliothèques vous permettent d’ingérer (charger) des données dans un cluster et d’interroger les données à partir de votre code.

Vous allez d’abord créer une table et un mappage de données dans un cluster. Ensuite, vous allez mettre en file d’attente l’ingestion sur le cluster et valider les résultats.

Prérequis

Installer les données et ingérer les bibliothèques

Installez azure-kusto-data et azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Ajouter les constantes et les instructions d’importation

Importer des classes à partir d’azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Pour authentifier une application, Azure Data Explorer utilise votre ID de locataire Microsoft Entra. Pour trouver votre ID de locataire, utilisez l’URL suivante en remplaçant YourDomain par votre domaine.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Par exemple, si votre domaine est contoso.com, l’URL est : https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Cliquez sur cette URL pour voir les résultats. La première ligne est la suivante.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Ici, l’ID de locataire est 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Définissez les valeurs pour AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI et KUSTO_DATABASE avant d’exécuter ce code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Maintenant, créez la chaîne de connexion. L’exemple suivant utilise l’authentification de l’appareil pour accéder au cluster. Vous pouvez également utiliser l’authentification d’identité managée, Microsoft Entra certificat d’application, Microsoft Entra clé d’application et Microsoft Entra utilisateur et mot de passe.

Vous allez créer la table de destination et le mappage dans une étape ultérieure.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Définir les informations du fichier source

Importez des classes supplémentaires et définissez des constantes pour le fichier de source de données. Cet exemple utilise un exemple de fichier hébergé sur Stockage Blob Azure. L’exemple de jeu de données StormEvents contient des données météorologiques des Centres nationaux d’information environnementale.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Créer une table sur votre cluster

Créez une table qui correspond au schéma des données du fichier StormEvents.csv. Lorsque ce code s’exécute, il retourne un message semblable au suivant : Pour vous connecter, utilisez un navigateur web afin d'ouvrir la page https://microsoft.com/devicelogin et entrez le code F3W4VWZDM pour vous authentifier. Suivez les étapes pour vous connecter, puis revenez pour exécuter le bloc de code suivant. Les blocs de code suivants qui établissent une connexion vous demandent de vous reconnecter.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Définir le mappage d’ingestion

Mappez les données CSV entrantes aux noms de colonne et aux types de données utilisés lors de la création de la table. Cela mappe les champs de données sources aux colonnes de table de destination

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Mettre en file d’attente un message pour l’ingestion

Mettez en file d’attente un message pour extraire les données du stockage d’objets blob et ingérer ces données dans l’Explorateur de données Azure.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Interroger les données ingérées dans la table

Attendez cinq à dix minutes avant que l’ingestion en file d’attente planifie l’ingestion et charge les données dans Azure Data Explorer. Ensuite, exécutez le code suivant pour obtenir le nombre d’enregistrements de la table StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Exécuter des requêtes de dépannage

Connectez-vous à https://dataexplorer.azure.com et à votre cluster. Exécutez la commande suivante dans votre base de données pour voir si des échecs d’ingestion se sont produits ces quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Exécutez la commande suivante pour voir l’état de toutes les opérations d’ingestion des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Nettoyer les ressources

Si vous envisagez de suivre nos autres articles, conservez les ressources que vous avez créées. Dans le cas contraire, exécutez la commande suivante dans votre base de données pour nettoyer la table StormEvents.

.drop table StormEvents

Étape suivante