Azure Veri Gezgini Python kitaplığını kullanarak veri toplama

Bu makalede, Python kitaplığını kullanarak Azure Veri Gezgini edinebilirsiniz. Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve üst düzeyde ölçeklenebilir veri keşfetme hizmetidir. Azure Veri Gezgini Python için iki istemci kitaplığı sağlar: alma kitaplığı ve veri kitaplığı. Bu kitaplıklar, verileri bir kümeye alamanıza veya yüklemenize ve kodunuzdan veri sorgulamanıza olanak sağlar.

İlk olarak, bir kümede bir tablo ve veri eşlemesi oluşturun. Ardından veri alımını kümenin kuyruğuna ekler ve sonuçları doğrularsınız.

Önkoşullar

Veri ve alma kitaplarını yükleme

azure-kusto-data ve azure-kusto-ingest'i yükleyin.

pip install azure-kusto-data
pip install azure-kusto-ingest

İçeri aktarma deyimlerini ve sabitlerini ekleme

azure-kusto-data'dan sınıfları içeri aktarma.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Azure Veri Gezgini uygulamanın kimliğini doğrulamak için Azure Active Directory kiracı kimliğinizi kullanır. Kiracı kimliğinizi bulmak için aşağıdaki URL'yi kullanın ve Etki alanınız yerine Etki Alanınız yazın.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Örneğin, etki alanınız contoso.com olduğunda URL şöyle olur: . Sonuçları görmek için bu URL'ye tıklayın; ilk satır aşağıdaki gibidir.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Bu örnekte kiracı kimliği 6babcaad-604b-40ac-a9d7-9fd97c0b779f değeridir. Bu kodu çalıştırmadan önce AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI ve KUSTO_DATABASE değerlerini ayarlayın.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Şimdi bağlantı dizesini hazırlayın. Bu örnekte kümeye erişmek için cihaz kimlik doğrulaması kullanılır. Ayrıca, uygulama Azure Active Directory, uygulama Azure Active Directoryvekullanıcı ve parola Azure Active Directory kullanabilirsiniz.

Sonraki bir adımda hedef tabloyu ve eşlemeyi oluşturursunuz.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Kaynak dosya bilgilerini ayarlama

Ek sınıfları içeri aktarın ve veri kaynak dosyası için sabitleri ayarlayın. Bu örnekte, Azure Blob Depolama'da barındırılan bir örnek dosya kullanılır. StormEvents örnek veri kümesi, Ulusal Çevre Bilgileri Merkezleri'nden gelen hava durumu verilerini içerir.

from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?sv=2019-12-12&ss=b&srt=o&sp=r&se=2022-09-05T02:23:52Z&st=2020-09-04T18:23:52Z&spr=https&sig=VrOfQMT1gUrHltJ8uhjYcCequEcfhjyyMX%2FSc3xsCy4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Kümeniz üzerinde tablo oluşturma

StormEvents.csv dosyasındaki verilerin şemasıyla eşleşen bir tablo oluşturun. Bu kod çalıştırıldıklarında şu iletiye benzer bir ileti döndürür: Oturum açmak için bir web tarayıcısı kullanarak sayfayı açın ve kimliğini doğrulamak için F3W4VWZDM kodunu girin. Adımları izleyerek oturum açın, sonra da dönüp bir sonraki kod bloğunu çalıştırın. Bağlantı kuran sonraki kod blokları için yeniden oturum açmak gerekir.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Veri alımı eşlemesini tanımlama

Gelen CSV verilerini tablo oluştururken kullanılan sütun adları ve veri türleriyle eşler. Bu, kaynak veri alanlarını hedef tablo sütunlarıyla eşler

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Veri alımı için bir iletiyi kuyruğa alma

Blob depolamadan verileri çekmek ve bu verileri Azure Veri Gezgini'ne almak için bir iletiyi kuyruğa alın.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Tabloya alınan verileri sorgulama

Kuyruğa alınan alımın verileri veri alımını zamanlaması ve verileri veri kaynağına yüklemesi için beş ile 10 Azure Veri Gezgini. Ardından aşağıdaki kodu çalıştırarak StormEvents tablosundaki kayıtların sayısını alın.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Sorun giderme sorguları çalıştırma

https://dataexplorer.azure.com adresinde oturum açın ve kümenize bağlanın. Son dört saatte hiç veri alımı hatası olup olmadığını görmek için veritabanınızda aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Son dört saatteki tüm veri alım işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Kaynakları temizleme

Diğer makalelerimizi takip etmek için oluşturduğunuz kaynakları kullanın. Aksi takdirde, veritabanınızda aşağıdaki komutu çalıştırarak StormEvents tablosunu temizleyin.

.drop table StormEvents

Sonraki adımlar