使用 Azure 資料總管 Python 程式庫內嵌資料Ingest data using the Azure Data Explorer Python library

Azure 資料總管是一項快速又可高度調整的資料探索服務,可用於處理記錄和遙測資料。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 資料總管提供兩個適用於 Python 的用戶端程式庫:內嵌程式庫資料程式庫Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. 這些程式庫可讓您將資料內嵌 (載入) 至叢集,並從您的程式碼查詢資料。These libraries enable you to ingest (load) data into a cluster and query data from your code. 在本文中,您先建立資料表和叢集中的資料對應。In this article, you first create a table and data mapping in a cluster. 然後,您將叢集的擷取排入佇列並驗證結果。You then queue ingestion to the cluster and validate the results.

這篇文章也會當作Azure NotebookThis article is also available as an Azure Notebook.

必要條件Prerequisites

安裝資料並內嵌程式庫Install the data and ingest libraries

安裝 azure-kusto-dataazure-kusto-ingestInstall azure-kusto-data and azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

新增 Import 陳述式和常數Add import statements and constants

從 zure-kusto-data 匯入類別。Import classes from azure-kusto-data.

from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

為了驗證應用程式,Azure 資料總管會使用您的 AAD 租用戶識別碼。To authenticate an application, Azure Data Explorer uses your AAD tenant ID. 若要尋找您的租用戶識別碼,請使用下列 URL,並以您的網域取代 YourDomainTo find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

例如,如果您的網域為 contoso.com,則 URL 會是:https://login.windows.net/contoso.com/.well-known/openid-configuration/For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. 按一下此 URL 來查看結果;第一行如下所示。Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在此情況下,租用戶識別碼為 6babcaad-604b-40ac-a9d7-9fd97c0b779fThe tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. 先設定 AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI 和 KUSTO_DATABASE 的值,再執行此程式碼。Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_DATABASE = "<DatabaseName>"

現在,請建構連接字串。Now construct the connection string. 此範例使用服務驗證來存取叢集。This example uses device authentication to access the cluster. 您也可以使用 AAD 應用程式憑證AAD 應用程式金鑰,以及 AAD 使用者和密碼You can also use AAD application certificate, AAD application key, and AAD user and password.

您可以在稍後的步驟中建立目的地資料表和對應。You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

設定來源檔案資訊Set source file information

匯入其他類別,並設定資料來源檔案的常數。Import additional classes and set constants for the data source file. 本範例使用裝載於 Azure Blob 儲存體的範例檔案。This example uses a sample file hosted on Azure Blob Storage. StormEvents 範例資料集包含來自美國國家環境資訊中心的氣象相關資料。The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.storage.blob import BlockBlobService
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + CONTAINER + "/" + FILE_PATH + SAS_TOKEN

在叢集上建立資料表Create a table on your cluster

建立符合 StormEvents.csv 檔案中資料結構描述的資料表。Create a table that matches the schema of the data in the StormEvents.csv file. 此程式碼執行時,它會傳回類似以下的訊息:若要登入,請使用網頁瀏覽器開啟頁面 https://microsoft.com/devicelogin ,並輸入代碼 F3W4VWZDM 以進行驗證When this code runs, it returns a message like the following: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate. 請遵循下列步驟來登入,然後返回執行下一個程式碼區塊。Follow the steps to sign in, then return to run the next code block. 建立連線的後續程式碼區塊需要您重新登入。Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

定義擷取對應Define ingestion mapping

將傳入的 CSV 資料對應到建立資料表時使用的資料行名稱與資料類型。Map incoming CSV data to the column names and data types used when creating the table. 這會將來源資料欄位對應至目的地資料表資料行This maps source data fields to destination table columns

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

將訊息排入佇列以供擷取Queue a message for ingestion

將訊息放入佇列,以從 Blob 儲存體提取資料,並將該資料內嵌至 Azure 資料總管。Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.csv, mappingReference = DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)  # FILE_SIZE is the raw size of the data in bytes
INGESTION_CLIENT.ingest_from_blob(BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

查詢已內嵌至資料表的資料Query data that was ingested into the table

等待已排入佇列的擷取五到十分鐘,以排定將資料內嵌和載入至 Azure 資料總管。Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. 然後執行下列程式碼,以取得 StormEvents 資料表中的記錄計數。Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

執行疑難排解查詢Run troubleshooting queries

登入 https://dataexplorer.azure.com,並連線至您的叢集。Sign in to https://dataexplorer.azure.com and connect to your cluster. 在資料庫中執行下列命令,以查看最後四個小時是否有任何擷取失敗。Run the following command in your database to see if there were any ingestion failures in the last four hours. 先取代資料庫名稱,再執行。Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

執行下列命令,以檢視最後四個小時內的所有擷取作業狀態。Run the following command to view the status of all ingestion operations in the last four hours. 先取代資料庫名稱,再執行。Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清除資源Clean up resources

如果您打算遵循我們的其他文章,讓您建立的資源。If you plan to follow our other articles, keep the resources you created. 否則,請在資料庫中執行下列命令,來清除 StormEvents 資料表。If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

後續步驟Next steps