Azure Data Explorer Python 라이브러리를 사용하여 데이터 수집Ingest data using the Azure Data Explorer Python library

이 문서에서는 Azure 데이터 탐색기 Python 라이브러리를 사용 하 여 데이터를 수집 합니다.In this article, you ingest data using the Azure Data Explorer Python library. Azure 데이터 탐색기는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장성이 우수한 데이터 탐색 서비스입니다.Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 데이터 탐색기는 2개의 Python용 클라이언트 라이브러리, 수집 라이브러리데이터 라이브러리를 제공합니다.Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. 이러한 라이브러리를 사용 하면 데이터를 클러스터로 수집 또는 로드 하 고 코드에서 데이터를 쿼리할 수 있습니다.These libraries enable you to ingest, or load, data into a cluster and query data from your code.

먼저 클러스터에서 테이블 및 데이터 매핑을 만듭니다.First, create a table and data mapping in a cluster. 그런 다음, 클러스터 큐에 수집을 넣고 결과의 유효성을 검사합니다.You then queue ingestion to the cluster and validate the results.

이 문서는 Azure 노트북으로도 사용할 수 있습니다.This article is also available as an Azure Notebook.

사전 요구 사항Prerequisites

데이터 및 수집 라이브러리 설치Install the data and ingest libraries

azure-kusto-dataazure-kusto-ingest 를 설치합니다.Install azure-kusto-data and azure-kusto-ingest .

pip install azure-kusto-data
pip install azure-kusto-ingest

import 문 및 상수 추가Add import statements and constants

azure-kusto-data에서 클래스를 가져옵니다.Import classes from azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

애플리케이션을 인증하기 위해 Azure 데이터 탐색기는 Azure Active Directory 테넌트 ID를 사용합니다.To authenticate an application, Azure Data Explorer uses your Azure Active Directory tenant ID. 테 넌 트 ID를 찾으려면 다음 URL을 사용 하 여 YourDomain 에 대 한 도메인을 바꿉니다.To find your tenant ID, use the following URL, replacing your domain for YourDomain .

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

예를 들어 도메인이 contoso.com 인 경우 URL은 https://login.windows.net/contoso.com/.well-known/openid-configuration/입니다.For example, if your domain is contoso.com , the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. 결과를 보려면 이 URL을 클릭합니다. 첫 번째 줄은 다음과 같습니다.Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

이 경우의 테넌트 ID는 6babcaad-604b-40ac-a9d7-9fd97c0b779f입니다.The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. 이 코드를 실행하기 전에 AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI 및 KUSTO_DATABASE의 값을 설정합니다.Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_DATABASE = "<DatabaseName>"

이제 연결 문자열을 구성합니다.Now construct the connection string. 이 예제에서는 디바이스 인증을 사용하여 클러스터에 액세스합니다.This example uses device authentication to access the cluster. Azure Active Directory 응용 프로그램 인증서, Azure Active Directory 응용 프로그램 키Azure Active Directory 사용자 및 암호를 사용할 수도 있습니다.You can also use Azure Active Directory application certificate, Azure Active Directory application key, and Azure Active Directory user and password.

이후 단계에서 대상 테이블 및 매핑을 만듭니다.You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

소스 파일 정보 설정Set source file information

추가 클래스를 가져오고 데이터 원본 파일에 대한 상수를 설정합니다.Import additional classes and set constants for the data source file. 이 예제에서는 Azure Blob Storage에 호스트된 예제 파일을 사용합니다.This example uses a sample file hosted on Azure Blob Storage. Stormevents 샘플 데이터 집합에는 환경적 정보에 대 한 국가별 센터의 날씨 관련 데이터가 포함 되어 있습니다.The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

클러스터에 테이블 만들기Create a table on your cluster

StormEvents.csv 파일에 있는 데이터 스키마와 일치하는 테이블을 만듭니다.Create a table that matches the schema of the data in the StormEvents.csv file. 이 코드가 실행 되 면 다음과 같은 메시지를 반환 합니다. 로그인 하려면 웹 브라우저를 사용 하 여 페이지를 열고 https://microsoft.com/devicelogin 인증을 위해 F3W4VWZDM 코드를 입력 합니다.When this code runs, it returns a message like the following message: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate . 단계에 따라 로그인한 후 돌아가서 다음 코드 블록을 실행합니다.Follow the steps to sign in, then return to run the next code block. 연결을 만드는 후속 코드 블록을 위해 다시 로그인해야 합니다.Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

수집 매핑 정의Define ingestion mapping

테이블을 만들 때 사용되는 데이터 형식과 열 이름에 들어오는 CSV 데이터를 매핑합니다.Map incoming CSV data to the column names and data types used when creating the table. 원본 데이터 필드를 대상 테이블 열에 매핑합니다.This maps source data fields to destination table columns

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

수집을 위해 메시지를 큐에 넣음Queue a message for ingestion

Blob Storage에서 데이터를 끌어온 후 Azure 데이터 탐색기에 수집하기 위해 메시지를 큐에 넣습니다.Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.CSV,
                                           mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

테이블에 수집된 데이터 쿼리Query data that was ingested into the table

수집을 예약 하 고 데이터를 Azure 데이터 탐색기에 로드 하려면 대기 중인 수집에 대해 5 ~ 10 분 동안 기다립니다.Wait for five to 10 minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. 그런 후, 다음 코드를 실행하여 StormEvents 테이블의 레코드 수를 가져옵니다.Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

쿼리 문제 해결 실행Run troubleshooting queries

에 로그인 하 https://dataexplorer.azure.com 고 클러스터에 연결 합니다.Sign in to https://dataexplorer.azure.com and connect to your cluster. 데이터베이스에서 다음 명령을 실행하여 지난 4시간 동안 수집 실패가 있었는지 확인합니다.Run the following command in your database to see if there were any ingestion failures in the last four hours. 실행하기 전에 데이터베이스 이름을 바꿉니다.Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

다음 명령을 실행하여 지난 4시간 동안 진행된 모든 수집 작업의 상태를 확인합니다.Run the following command to view the status of all ingestion operations in the last four hours. 실행하기 전에 데이터베이스 이름을 바꿉니다.Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

리소스 정리Clean up resources

다른 문서를 따르려면 만든 리소스를 유지 합니다.If you plan to follow our other articles, keep the resources you created. 그렇지 않으면 데이터베이스에서 다음 명령을 실행하여 StormEvents 테이블을 정리합니다.If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

다음 단계Next steps