Dela via


Mata in data med hjälp av Python-biblioteket i Azure Data Explorer

I den här artikeln matar du in data med hjälp av Azure Data Explorer Python-biblioteket. Azure Data Explorer är en snabb och mycket skalbar datautforskningstjänst för logg- och telemetridata. Azure Data Explorer tillhandahåller två klientbibliotek för Python: ett bibliotek för inmatning och ett databibliotek. Med de här biblioteken kan du mata in eller läsa in data i ett kluster och köra frågor mot data från koden.

Skapa först en tabell och datamappning i ett kluster. Sedan köar du inmatningen till klustret och verifierar resultaten.

Förutsättningar

Installera data och mata in bibliotek

Installera azure-kusto-data och azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Lägg till importuttryck och konstanter

Importera klasser från azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

För att autentisera ett program använder Azure Data Explorer ditt Microsoft Entra klientorganisations-ID. Om du vill hitta ditt klientorganisations-ID använder du följande URL och ersätter din domän för YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Om din domän till exempel är contoso.com blir URL:en: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klicka på den här URL:en för att se resultatet. Den första raden ser ut så här.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Klient-ID är i det här fallet 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Ange värdena för AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI och KUSTO_DATABASE innan du kör koden.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Nu kan du skapa anslutningssträngen. I följande exempel används enhetsautentisering för att komma åt klustret. Du kan också använda hanterad identitetsautentisering, Microsoft Entra programcertifikat, Microsoft Entra programnyckel och Microsoft Entra användare och lösenord.

Du kan skapa måltabellen och mappningen i ett senare steg.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Ange information om källfilen

Importera ytterligare klasser och ange konstanter för datakällan. Det här exemplet används en exempelfil som finns på Azure Blob Storage. StormEvents-exempeldatauppsättningen innehåller väderrelaterade data från National Centers for Environmental Information.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Skapa en tabell i ditt kluster

Skapa en tabell som matchar schemat för data i filen StormEvents.csv. När den här koden körs returneras ett meddelande som liknar följande meddelande: Om du vill logga in använder du en webbläsare för att öppna sidan https://microsoft.com/devicelogin och anger koden F3W4VWZDM för att autentisera. Följ stegen för att logga in och gå sedan tillbaka för att köra nästa kodblock. Efterföljande kodblock som upprättar en anslutning kräver att du loggar in igen.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definiera mappning av inmatning

Mappa inkommande CSV-data till kolumnnamnen och datatyperna som används när du skapade tabellen. Detta mappar källdatafält som till måltabellkolumner

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Köa ett meddelande för inmatning

Köa ett meddelande för att hämta data från Blob Storage och mata in data i Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Köra frågor mot data som matades in i tabellen

Vänta i fem till tio minuter tills den köade inmatningen schemalägger inmatningen och läser in data i Azure Data Explorer. Kör sedan följande kod för att hämta posterna i tabellen StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Köra frågor för felsökning

Logga in på https://dataexplorer.azure.com och anslut till klustret. Kör följande kommando i din databas för att se om det fanns inmatningsfel under de senaste fyra timmarna. Ersätt namnet på databasen innan du kör.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Kör följande kommando för att visa status för alla åtgärder för inmatning under de sista fyra timmarna. Ersätt namnet på databasen innan du kör.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Rensa resurser

Om du planerar att följa våra andra artiklar behåller du de resurser som du har skapat. Om inte kör du följande kommando i din databas för att rensa tabellen StormEvents.

.drop table StormEvents

Nästa steg