Azure Veri Gezgini Python kitaplığını kullanarak veri toplama
Bu makalede, Python kitaplığını kullanarak Azure Veri Gezgini edinebilirsiniz. Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve üst düzeyde ölçeklenebilir veri keşfetme hizmetidir. Azure Veri Gezgini Python için iki istemci kitaplığı sağlar: alma kitaplığı ve veri kitaplığı. Bu kitaplıklar, verileri bir kümeye alamanıza veya yüklemenize ve kodunuzdan veri sorgulamanıza olanak sağlar.
İlk olarak, bir kümede bir tablo ve veri eşlemesi oluşturun. Ardından veri alımını kümenin kuyruğuna ekler ve sonuçları doğrularsınız.
Önkoşullar
- Azure aboneliği. Ücretsiz bir Azure hesabı oluşturun.
- Bir küme ve veritabanı oluşturun.
- Python 3.4+.
Veri ve alma kitaplarını yükleme
azure-kusto-data ve azure-kusto-ingest'i yükleyin.
pip install azure-kusto-data
pip install azure-kusto-ingest
İçeri aktarma deyimlerini ve sabitlerini ekleme
azure-kusto-data'dan sınıfları içeri aktarma.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Azure Veri Gezgini uygulamanın kimliğini doğrulamak için Azure Active Directory kiracı kimliğinizi kullanır. Kiracı kimliğinizi bulmak için aşağıdaki URL'yi kullanın ve Etki alanınız yerine Etki Alanınız yazın.
https://login.windows.net/<YourDomain>/.well-known/openid-configuration/
Örneğin, etki alanınız contoso.com olduğunda URL şöyle olur: . Sonuçları görmek için bu URL'ye tıklayın; ilk satır aşağıdaki gibidir.
"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
Bu örnekte kiracı kimliği 6babcaad-604b-40ac-a9d7-9fd97c0b779f değeridir. Bu kodu çalıştırmadan önce AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI ve KUSTO_DATABASE değerlerini ayarlayın.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Şimdi bağlantı dizesini hazırlayın. Bu örnekte kümeye erişmek için cihaz kimlik doğrulaması kullanılır. Ayrıca, uygulama Azure Active Directory, uygulama Azure Active Directoryvekullanıcı ve parola Azure Active Directory kullanabilirsiniz.
Sonraki bir adımda hedef tabloyu ve eşlemeyi oluşturursunuz.
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
KUSTO_INGEST_URI, AAD_TENANT_ID)
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
KUSTO_URI, AAD_TENANT_ID)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Kaynak dosya bilgilerini ayarlama
Ek sınıfları içeri aktarın ve veri kaynak dosyası için sabitleri ayarlayın. Bu örnekte, Azure Blob Depolama'da barındırılan bir örnek dosya kullanılır. StormEvents örnek veri kümesi, Ulusal Çevre Bilgileri Merkezleri'nden gelen hava durumu verilerini içerir.
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?sv=2019-12-12&ss=b&srt=o&sp=r&se=2022-09-05T02:23:52Z&st=2020-09-04T18:23:52Z&spr=https&sig=VrOfQMT1gUrHltJ8uhjYcCequEcfhjyyMX%2FSc3xsCy4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Kümeniz üzerinde tablo oluşturma
StormEvents.csv dosyasındaki verilerin şemasıyla eşleşen bir tablo oluşturun. Bu kod çalıştırıldıklarında şu iletiye benzer bir ileti döndürür: Oturum açmak için bir web tarayıcısı kullanarak sayfayı açın ve kimliğini doğrulamak için F3W4VWZDM kodunu girin. Adımları izleyerek oturum açın, sonra da dönüp bir sonraki kod bloğunu çalıştırın. Bağlantı kuran sonraki kod blokları için yeniden oturum açmak gerekir.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Veri alımı eşlemesini tanımlama
Gelen CSV verilerini tablo oluştururken kullanılan sütun adları ve veri türleriyle eşler. Bu, kaynak veri alanlarını hedef tablo sütunlarıyla eşler
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Veri alımı için bir iletiyi kuyruğa alma
Blob depolamadan verileri çekmek ve bu verileri Azure Veri Gezgini'ne almak için bir iletiyi kuyruğa alın.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Tabloya alınan verileri sorgulama
Kuyruğa alınan alımın verileri veri alımını zamanlaması ve verileri veri kaynağına yüklemesi için beş ile 10 Azure Veri Gezgini. Ardından aşağıdaki kodu çalıştırarak StormEvents tablosundaki kayıtların sayısını alın.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Sorun giderme sorguları çalıştırma
https://dataexplorer.azure.com adresinde oturum açın ve kümenize bağlanın. Son dört saatte hiç veri alımı hatası olup olmadığını görmek için veritabanınızda aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Son dört saatteki tüm veri alım işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Kaynakları temizleme
Diğer makalelerimizi takip etmek için oluşturduğunuz kaynakları kullanın. Aksi takdirde, veritabanınızda aşağıdaki komutu çalıştırarak StormEvents tablosunu temizleyin.
.drop table StormEvents