Gegevens opnemen met behulp van de Python-bibliotheek voor Azure Data Explorer
In dit artikel gaat u gegevens opnemen met behulp van de Azure Data Explorer Python-bibliotheek. Azure Data Explorer is een snelle en zeer schaalbare service voor gegevensverkenning voor telemetrische gegevens en gegevens uit logboeken. Azure Data Explorer biedt twee clientbibliotheken voor Python: een ingest-bibliotheek en een data-bibliotheek. Met deze bibliotheken kunt u gegevens opnemen in of laden in een cluster en query's uitvoeren op gegevens uit uw code.
Maak eerst een tabel en gegevenstoewijzing in een cluster. Vervolgens plaatst u op te nemen gegevens in de wachtrij en valideert u de resultaten.
Vereisten
- Een Azure-abonnement. Maak een gratis Azure-account.
- Maak een cluster en database.
- Python 3.4+.
De bibliotheken data en ingest installeren
Installeer azure-kusto-data en azure-kusto-ingest.
pip install azure-kusto-data
pip install azure-kusto-ingest
Importinstructies en constanten toevoegen
Klassen importeren uit azure-kusto-data.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Azure Data Explorer maakt gebruik van de id van uw Azure Active Directory-tenant om een toepassing te verifiëren. Als u uw tenant-id wilt vinden, gebruikt u de volgende URL, en vervangt u uw domein door YourDomain.
https://login.windows.net/<YourDomain>/.well-known/openid-configuration/
Dus als uw domein contoso.com is, wordt de URL bijvoorbeeld: . Klik op deze URL om de resultaten weer te geven. De eerste regel is als volgt.
"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
De tenant-id is in dit geval 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Stel de waarden voor AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI en KUSTO_DATABASE in voordat deze code wordt uitgevoerd.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Nu maakt u de verbindingsreeks. In dit voorbeeld wordt apparaatverificatie gebruikt voor toegang tot het cluster. U kunt ook Azure Active Directory toepassingscertificaat, Azure Active Directory-toepassingssleutelen Azure Active Directory en wachtwoord opgeven.
U maakt de doeltabel en toewijzing in een latere stap.
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
KUSTO_INGEST_URI, AAD_TENANT_ID)
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
KUSTO_URI, AAD_TENANT_ID)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Gegevens van bronbestand instellen
Importeer aanvullende klassen en stel constanten in voor het gegevensbronbestand. In dit voorbeeld wordt een voorbeeldbestand gebruikt dat wordt gehost in Azure Blob Storage. De stormEvents-voorbeeldgegevensset bevat weergerelateerde gegevens van de National Centers for Environmental Information.
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?sv=2019-12-12&ss=b&srt=o&sp=r&se=2022-09-05T02:23:52Z&st=2020-09-04T18:23:52Z&spr=https&sig=VrOfQMT1gUrHltJ8uhjYcCequEcfhjyyMX%2FSc3xsCy4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Een tabel maken in het cluster
Maak een tabel die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv. Wanneer deze code wordt uitgevoerd, wordt een bericht als het volgende bericht weergegeven: Als u zich wilt aanmelden, gebruikt u een webbrowser om de pagina te openen en voert u de code F3W4VWZDMin om te verifiëren. Volg de stappen om u aan te melden en ga vervolgens terug om het volgende codeblok uit te voeren. Als volgende codeblokken verbinding moeten maken, moet u zich opnieuw aanmelden.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Toewijzing van opname definiëren
Wijs binnenkomende CSV-gegevens toe aan de kolomnamen en gegevenstypen die zijn gebruikt bij het maken van de tabel. Hiermee worden brongegevensvelden toegewezen aan tabelkolommen op de bestemming
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Bericht in wachtrij zetten voor opname
Zet een bericht in de wachtrij om gegevens op te halen uit blob-opslag en die gegevens op te nemen in Azure Data Explorer.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Gegevens opvragen die zijn opgenomen in de tabel
Wacht vijf tot tien minuten tot de opname in de wachtrij de opname heeft gepland en laad de gegevens in Azure Data Explorer. Voer vervolgens de volgende code uit om het aantal records in de tabel StormEvents te bepalen.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Query's voor probleemoplossing uitvoeren
Meld u aan bij https://dataexplorer.azure.com en maak verbinding met uw cluster. Voer de volgende opdracht uit in uw database om te zien of er in de afgelopen vier uur fouten zijn opgetreden tijdens het opnemen van gegevens. Vervang de naam van de database voordat u de opdracht uitvoert.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Voer de volgende opdracht uit om de status op te vragen van alle bewerkingen voor het opnemen van gegevens van de afgelopen vier uur. Vervang de naam van de database voordat u de opdracht uitvoert.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Resources opschonen
Als u van plan bent onze andere artikelen te volgen, bewaar dan de resources die u hebt gemaakt. Voer anders de volgende opdracht uit in uw database om de tabel StormEvents op te schonen.
.drop table StormEvents