REST API를 사용하여 데이터를 수집하는 방법

Kusto.Ingest 라이브러리는 클러스터에 데이터를 수집하는 데 선호됩니다. 그러나 Kusto.Ingest 패키지에 종속되지 않고도 거의 동일한 기능을 달성할 수 있습니다. 이 문서에서는 프로덕션 등급 파이프라인을 위해 클러스터에 대기 중인 수집 을 사용하는 방법을 보여 줍니다.

참고

아래 코드는 C#으로 작성되었으며 Azure Storage SDK, MSAL(Microsoft 인증 라이브러리) 및 NewtonSoft.JSON 패키지를 사용하여 샘플 코드를 간소화합니다. 필요한 경우 해당 코드를 적절한 Azure Storage REST API 호출, non-.NET MSAL 패키지 및 사용 가능한 JSON 처리 패키지로 바꿀 수 있습니다.

이 문서에서는 권장되는 수집 모드를 다룹니다. Kusto.Ingest 라이브러리의 경우 해당 엔터티는 IKustoQueuedIngestClient 인터페이스입니다. 여기서 클라이언트 코드는 수집 알림 메시지를 Azure 큐에 게시하여 클러스터와 상호 작용합니다. 메시지에 대한 참조는 Kusto 데이터 관리(수집이라고도 함) 서비스에서 가져옵니다. 서비스와의 상호 작용은 Microsoft Entra ID로 인증되어야 합니다.

다음 코드에서는 Kusto 데이터 관리 서비스가 Kusto.Ingest 라이브러리를 사용하지 않고 대기 중인 데이터 수집을 처리하는 방법을 보여 줍니다. 이 예제는 환경 또는 기타 제한 사항으로 인해 전체 .NET에 액세스할 수 없거나 사용할 수 없는 경우에 유용할 수 있습니다.

이 코드에는 Azure Storage 클라이언트를 만들고 Blob에 데이터를 업로드하는 단계가 포함되어 있습니다. 각 단계는 샘플 코드 다음에 자세히 설명되어 있습니다.

  1. 수집 서비스에 액세스하기 위한 인증 토큰 가져오기
  2. 수집 서비스를 쿼리하여 다음을 가져옵니다.
  3. Kusto에서 가져온 Blob 컨테이너 중 하나의 Blob에 데이터 업로드(2)
  4. 대상 데이터베이스 및 테이블을 식별하고 에서 Blob을 가리키는 수집 메시지 작성(3)
  5. (4)에서 작성한 수집 메시지를 (2)에서 가져온 수집 큐에 게시합니다.
  6. 수집 중에 서비스에서 찾은 오류를 검색합니다.
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

프로덕션 등급 파이프라인에 대기 중인 수집 사용

Microsoft Entra ID에서 인증 증명 정보 가져오기

여기서는 MSAL(Microsoft 인증 라이브러리)을 사용하여 Kusto 데이터 관리 서비스에 액세스하고 입력 큐를 요청하는 Microsoft Entra 토큰을 가져옵니다. MSAL은 여러 플랫폼에서 사용할 수 있습니다.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

수집 리소스 검색

수집 리소스의 반환을 요청하여 데이터 관리 서비스에 대한 HTTP POST 요청을 수동으로 생성합니다. 이러한 리소스에는 DM 서비스가 수신 대기 중인 큐와 데이터 업로드를 위한 Blob 컨테이너가 포함됩니다. 데이터 관리 서비스는 해당 큐 중 하나에 도착하는 수집 요청이 포함된 메시지를 처리합니다.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Kusto ID 토큰 가져오기

수집 메시지는 비 직접 채널(Azure 큐)을 통해 클러스터에 전달되므로 수집 서비스에 액세스하기 위한 대역 내 권한 부여 유효성 검사를 수행할 수 없습니다. 해결 방법은 모든 수집 메시지에 ID 토큰을 연결하는 것입니다. 토큰은 대역 내 권한 부여 유효성 검사를 사용하도록 설정합니다. 그런 다음 이 서명된 토큰은 수집 메시지를 받을 때 수집 서비스에서 유효성을 검사할 수 있습니다.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Azure Blob 컨테이너에 데이터 업로드

이 단계는 수집을 위해 전달될 Azure Blob에 로컬 파일을 업로드하는 것입니다. 이 코드는 Azure Storage SDK를 사용합니다. 종속성이 불가능한 경우 Azure Blob Service REST API를 사용하여 수행할 수 있습니다.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

수집 메시지 작성

NewtonSoft.JSON 패키지는 유효한 수집 요청을 다시 작성하여 대상 데이터베이스와 테이블을 식별하고 Blob을 가리킵니다. 관련 Kusto 데이터 관리 서비스가 수신 대기 중인 메시지가 Azure Queue에 게시됩니다.

다음은 고려해야 할 몇 가지 사항입니다.

  • 이 요청은 수집 메시지에 대한 최소값입니다.

참고

ID 토큰은 필수이며 AdditionalProperties JSON 개체의 일부여야 합니다.

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

수집 메시지를 수집 큐에 게시

마지막으로 생성한 메시지를 이전에 가져온 선택한 수집 큐에 게시합니다.

참고

v12 아래의 .Net Storage 클라이언트 버전은 기본적으로 메시지를 base64로 인코딩합니다. 자세한 내용은 스토리지 문서를 참조하세요. v12 위의 .Net 스토리지 클라이언트 버전을 사용하는 경우 메시지 콘텐츠를 제대로 인코딩해야 합니다.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Azure 큐에서 오류 메시지 확인

수집 후 데이터 관리 쓰는 관련 큐의 오류 메시지를 검사. 오류 메시지 구조에 대한 자세한 내용은 수집 실패 메시지 구조를 참조하세요.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

수집 메시지 - JSON 문서 형식

수집 메시지 내부 구조

Kusto 데이터 관리 서비스에서 입력 Azure Queue에서 읽어야 하는 메시지는 다음 형식의 JSON 문서입니다.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
속성 설명
Id 메시지 식별자(GUID)
BlobPath 읽기/쓰기/삭제할 수 있는 권한을 부여하는 SAS 키를 포함하여 Blob에 대한 경로(URI)입니다. 수집 서비스가 데이터 수집을 완료한 후 Blob을 삭제할 수 있도록 권한이 필요합니다.
RawDataSize 압축되지 않은 데이터의 크기(바이트)입니다. 이 값을 제공하면 수집 서비스가 잠재적으로 여러 Blob을 집계하여 수집을 최적화할 수 있습니다. 이 속성은 선택 사항이지만 지정되지 않은 경우 서비스는 Blob에 액세스하여 크기를 검색합니다.
DatabaseName 대상 데이터베이스 이름
TableName 대상 테이블 이름
RetainBlobOnSuccess true설정하면 수집이 성공적으로 완료되면 Blob이 삭제되지 않습니다. 기본값은 false
FlushImmediately true설정하면 집계는 건너뜁습니다. 기본값은 false
ReportLevel 성공/오류 보고 수준: 0-실패, 1-없음, 2-All
ReportMethod 보고 메커니즘: 0-큐, 1-테이블
AdditionalProperties , 및 tagscreationTime와 같은 format기타 속성입니다. 자세한 내용은 데이터 수집 속성을 참조하세요.

수집 실패 메시지 구조

입력 Azure Queue에서 데이터 관리 읽을 것으로 예상되는 메시지는 다음 형식의 JSON 문서입니다.

속성 Description
OperationId 서비스 쪽에서 작업을 추적하는 데 사용할 수 있는 GUID(작업 식별자)
데이터베이스 대상 데이터베이스 이름
테이블 대상 테이블 이름
FailedOn 실패 타임스탬프
IngestionSourceId 수집에 실패한 데이터 청크를 식별하는 GUID
IngestionSourcePath 수집에 실패한 데이터 청크에 대한 경로(URI)
세부 정보 오류 메시지
오류 코드 오류 코드입니다. 모든 오류 코드는 수집 오류 코드를 참조하세요.
FailureStatus 오류가 영구적인지 일시적인지 여부를 나타냅니다.
RootActivityId 서비스 쪽에서 작업을 추적하는 데 사용할 수 있는 GUID(상관 관계 식별자)
OriginatesFromUpdatePolicy 잘못된 트랜잭션 업데이트 정책으로 인해 오류가 발생했는지 여부를 나타냅니다.
ShouldRetry 를 있는 그대로 다시 시도하면 수집이 성공할 수 있는지 여부를 나타냅니다.