.NET용 Azure Event Hubs 이벤트 프로세서 클라이언트 라이브러리 - 버전 5.8.1

Azure Event Hubs 초당 수백만 개의 이벤트를 수집하고 여러 소비자에게 스트리밍할 수 있는 확장성이 뛰어난 게시 구독 서비스입니다. 이렇게 하면 연결된 디바이스 및 애플리케이션에서 생성되는 대량의 데이터를 처리하고 분석할 수 있습니다. Event Hubs가 데이터를 수집하면 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 데이터를 검색, 변환 및 저장할 수 있습니다. Azure Event Hubs 대해 자세히 알아보려면 Event Hubs란?을 검토할 수 있습니다.

이벤트 프로세서 클라이언트 라이브러리는 Azure Event Hubs 클라이언트 라이브러리와 함께 제공되므로 대부분의 프로덕션 시나리오에 적합한 강력하고 내구성이 뛰어나고 확장 가능한 방식으로 이벤트를 소비하기 위한 독립 실행형 클라이언트를 제공합니다. Azure Storage Blob을 사용하여 빌드된 의견 구현인 이벤트 프로세서는 다음을 위해 권장됩니다.

  • 일시적인 오류 및 일시적인 네트워크 문제에 대한 복원력을 사용하여 대규모 이벤트 허브의 모든 파티션에서 이벤트를 읽고 처리합니다.

  • 여러 프로세서가 소비자 그룹의 컨텍스트에서 책임을 동적으로 배포하고 공유하는 이벤트를 협조적으로 처리하여 프로세서가 그룹에서 추가 및 제거될 때 로드를 정상적으로 관리합니다.

  • Azure Storage Blob을 기본 데이터 저장소로 사용하여 지속형 방식으로 처리하기 위한 검사점 및 상태 관리

소스 코드 | 패키지(NuGet) | API 참조 설명서 | 제품 설명서 | 문제 해결 가이드

시작

필수 조건

  • Azure 구독: Azure Event Hubs 포함한 Azure 서비스를 사용하려면 구독이 필요합니다. 기존 Azure 계정이 없는 경우 평가판에 등록하거나 계정을 만들Visual Studio 구독 혜택을 사용할 수 있습니다.

  • Event Hubs 네임스페이스와 이벤트 허브: Azure Event Hubs 상호 작용하려면 네임스페이스 및 이벤트 허브도 사용할 수 있어야 합니다. Azure 리소스를 만드는 데 익숙하지 않은 경우 Azure Portal 사용하여 이벤트 허브를 만들기 위한 단계별 가이드를 따를 수 있습니다. 또한 Azure CLI, Azure PowerShell 또는 ARM(Azure Resource Manager) 템플릿을 사용하여 이벤트 허브를 만들기 위한 자세한 지침을 찾을 수 있습니다.

  • Blob Storage가 있는 Azure Storage 계정: Azure Storage에서 검사점 및 소유권을 유지하려면 Blob을 사용할 수 있는 Azure Storage 계정이 있어야 합니다. Azure Storage 계정에 익숙하지 않은 경우 Azure Portal 사용하여 스토리지 계정을 만들기 위한 단계별 가이드를 따를 수 있습니다. Azure CLI, Azure PowerShell 또는 ARM(Azure Resource Manager) 템플릿을 사용하여 스토리지 계정을 만드는 방법에 대한 자세한 지침을 찾을 수도 있습니다.

  • Azure Storage Blob 컨테이너: Azure Storage의 검사점 및 소유권 데이터는 특정 컨테이너의 Blob에 기록됩니다. EventProcessorClient 에는 기존 컨테이너가 필요하며 실수로 잘못된 구성을 방지하기 위해 컨테이너를 암시적으로 만들지 않습니다. Azure Storage 컨테이너에 익숙하지 않은 경우 컨테이너 관리에 대한 설명서를 참조할 수 있습니다. .NET, Azure CLI 또는 Azure PowerShell 사용하여 컨테이너를 만들기 위한 자세한 지침을 찾을 수 있습니다.

  • C# 8.0: Azure Event Hubs 클라이언트 라이브러리는 C# 8.0에서 도입된 새로운 기능을 사용합니다. C# 8.0 구문을 활용하려면 언어 버전latest이 인 .NET Core SDK 3.0 이상을 사용하여 컴파일하는 것이 좋습니다.

    C# 8.0 구문을 최대한 활용하려는 Visual Studio 사용자는 Visual Studio 2019 이상을 사용해야 합니다. 체험 Community 버전을 비롯한 Visual Studio 2019는 여기서 다운로드할 수 있습니다. Visual Studio 2017 사용자는 Microsoft.Net.Compilers NuGet 패키지를 사용하고 언어 버전을 설정하여 C# 8 구문을 활용할 수 있지만 편집 환경은 이상적이지 않을 수 있습니다.

    이전 C# 언어 버전에서 라이브러리를 계속 사용할 수 있지만 새 구문의 이점을 활용하지 않고 비동기 열거 가능 및 비동기 일회용 멤버를 수동으로 관리해야 합니다. 이전 버전의 .NET Core 또는 .NET Framework를 포함하여 .NET Core SDK에서 지원하는 모든 프레임워크 버전을 대상으로 지정할 수 있습니다. 자세한 내용은 대상 프레임워크를 지정하는 방법을 참조하세요.

    중요 참고 사항:예제샘플을 수정하지 않고 빌드하거나 실행하려면 C# 11.0을 사용해야 합니다. 다른 언어 버전에 맞게 조정하려는 경우에도 샘플을 실행할 수 있습니다.

Azure에서 필요한 리소스를 빠르게 만들고 해당 리소스에 대한 연결 문자열을 받으려면 다음을 클릭하여 샘플 템플릿을 배포할 수 있습니다.

Azure에 배포

패키지 설치

NuGet을 사용하여 .NET용 Azure Event Hubs 이벤트 프로세서 클라이언트 라이브러리를 설치합니다.

dotnet add package Azure.Messaging.EventHubs.Processor

클라이언트 인증

Event Hubs 연결 문자열 가져오기

Event Hubs 클라이언트 라이브러리가 이벤트 허브와 상호 작용하려면 Event Hubs에 연결하고 권한을 부여하는 방법을 이해해야 합니다. 이렇게 하는 가장 쉬운 방법은 Event Hubs 네임스페이스를 만들 때 자동으로 만들어지는 연결 문자열을 사용하는 것입니다. Event Hubs에서 연결 문자열을 사용하는 데 익숙하지 않은 경우 단계별 가이드에 따라 Event Hubs 연결 문자열을 가져올 수 있습니다.

Azure Storage 연결 문자열 가져오기

이벤트 프로세서 클라이언트가 검사점 지정에 Azure Storage Blob을 사용하려면 스토리지 계정에 연결하고 권한을 부여하는 방법을 이해해야 합니다. 이렇게 하는 가장 간단한 방법은 스토리지 계정을 만들 때 생성되는 연결 문자열을 사용하는 것입니다. Azure에서 스토리지 계정 연결 문자열 권한 부여에 익숙하지 않은 경우 단계별 가이드에 따라 Azure Storage 연결 문자열을 구성할 수 있습니다.

주요 개념

  • 이벤트 프로세서는 특정 소비자 그룹의 컨텍스트에서 지정된 이벤트 허브에 연결하고 각 파티션의 이벤트를 처리하는 것과 관련된 책임을 관리하기 위한 구문입니다. 파티션에서 읽은 이벤트를 처리하고 발생하는 오류를 처리하는 작업은 이벤트 프로세서가 제공하는 코드에 위임되므로 프로세서가 이벤트 읽기, 파티션 관리 및 상태 유지와 관련된 작업을 처리하는 동안 논리가 비즈니스 가치를 제공하는 데 집중할 수 있습니다.

  • 검사점 지정 은 판독기에서 파티션에 대해 처리된 이벤트에 대한 위치를 표시하고 유지하는 프로세스입니다. 검사점 지정은 소비자의 책임이며 일반적으로 특정 소비자 그룹의 컨텍스트에서 파티션당 발생합니다. 의 EventProcessorClient경우 소비자 그룹 및 파티션 조합의 경우 프로세서는 이벤트 스트림에서 현재 위치를 추적해야 합니다. 자세한 내용은 Event Hubs 제품 설명서의 검사점 지정 을 참조하세요.

    이벤트 프로세서가 연결되면 해당 소비자 그룹에 있는 해당 파티션의 마지막 프로세서(있는 경우)가 이전에 유지했던 검사점에서 이벤트를 읽기 시작합니다. 이벤트 프로세서는 파티션의 이벤트를 읽고 작동하므로, 이벤트를 다운스트림 애플리케이션에서 "완료"로 표시하고 이벤트 프로세서 또는 이벤트를 호스트하는 환경이 실패할 경우 복원력을 제공하는 검사점이 주기적으로 생성되어야 합니다. 필요한 경우 이 검사점 프로세스를 통해 이전 오프셋을 지정하여 이전에 "완료"로 표시된 이벤트를 다시 처리할 수 있습니다.

  • 파티션은 이벤트 허브에 보관되는 순서가 지정된 이벤트 시퀀스입니다. 파티션은 이벤트 소비자에게 필요한 병렬 처리와 관련된 데이터 조직의 수단입니다. Azure Event Hubs는 각 소비자가 메시지 스트림의 특정 하위 집합 또는 파티션만 읽는 분할된 소비자 패턴을 통해 메시지 스트리밍을 제공합니다. 최신 이벤트가 도착하면 이 시퀀스의 끝에 추가됩니다. 파티션 수는 이벤트 허브를 만들 때 지정되며 변경할 수 없습니다.

  • 소비자 그룹은 전체 이벤트 허브의 보기입니다. 소비자 그룹을 통해 데이터를 사용하는 여러 애플리케이션이 각각 개별 이벤트 스트림 보기를 사용할 수 있으며 고유한 위치에서 고유한 속도로 독립적으로 스트림을 읽을 수 있습니다. 소비자 그룹당 파티션에는 최대 5개의 동시 reader가 있을 수 있습니다. 그러나 지정된 파티션 및 소비자 그룹 페어링에 대해 활성 소비자가 하나만 있는 것이 좋습니다. 각 활성 reader는 해당 파티션의 모든 이벤트를 수신합니다. 동일한 파티션에 여러 reader가 있는 경우 중복된 이벤트를 수신합니다.

더 많은 개념과 심층적인 논의는 Event Hubs 기능을 참조하세요.

클라이언트 수명

EventProcessorClient 은 애플리케이션의 수명 동안 싱글톤으로 캐시하고 사용하는 것이 안전하며, 이벤트를 정기적으로 읽는 것이 가장 좋습니다. 클라이언트는 네트워크, CPU 및 메모리 사용을 효율적으로 관리하여 비활성 기간 동안 사용량을 낮게 유지하는 작업을 담당합니다. 네트워크 리소스 및 기타 관리되지 않는 개체가 제대로 정리되도록 하려면 프로세서에서 또는 StopProcessing 를 호출 StopProcessingAsync 해야 합니다.

스레드로부터의 안전성

모든 클라이언트 instance 메서드가 스레드로부터 안전하고 서로 독립적임을 보장합니다(지침). 이렇게 하면 스레드 간에도 클라이언트 인스턴스를 다시 사용하는 것이 항상 안전합니다.

EventDataBatch 와 같은 EventData 데이터 모델 형식은 스레드로부터 안전하지 않습니다. 스레드 간에 공유하거나 클라이언트 메서드와 동시에 사용해서는 안 됩니다.

추가 개념

클라이언트 옵션 | 이벤트 처리기 | 오류 | 처리 진단 | 모의(프로세서) | 모의(클라이언트 형식)

예제

이벤트 프로세서 클라이언트 만들기

EventProcessorClient는 상태의 지속성을 위해 Azure Storage Blob에 의존하기 때문에 사용해야 하는 스토리지 계정 및 컨테이너에 대해 구성된 프로세서에 대한 BlobContainerClient를 제공해야 합니다. 를 구성하는 데 사용되는 컨테이너가 EventProcessorClient 있어야 합니다.

EventProcessorClient 존재하지 않는 컨테이너를 지정하려는 의도를 알 수 없으므로 컨테이너를 암시적으로 만들지 않습니다. 이는 잘못 구성된 컨테이너에 대한 보호 역할을 하여 불량 프로세서가 소유권을 공유할 수 없고 소비자 그룹의 다른 프로세서를 방해합니다.

// The container specified when creating the BlobContainerClient must exist; it will
// not be implicitly created.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

이벤트 및 오류 처리기 구성

를 사용 EventProcessorClient하려면 이벤트 처리 및 오류에 대한 처리기를 제공해야 합니다. 이러한 처리기는 자체 포함으로 간주되며 개발자는 처리기 코드 내의 예외가 고려되도록 할 책임이 있습니다.

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheEvent(eventArgs.Partition, eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error.  This method
        // is intended for illustration and is not defined in this snippet.

        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

처리 시작 및 중지

EventProcessorClient 명시적으로 시작된 후 백그라운드에서 처리를 수행하고 명시적으로 중지될 때까지 계속 처리합니다. 이렇게 하면 애플리케이션 코드가 다른 작업을 수행할 수 있지만 다른 작업이 수행되지 않는 경우 처리 중에 프로세스가 종료되지 않도록 해야 합니다.

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask;
Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask;

var storageClient = new BlobContainerClient(storageConnectionString, blobContainerName);
var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName);

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

await processor.StartProcessingAsync();

try
{
    // The processor performs its work in the background; block until cancellation
    // to allow processing to take place.

    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException)
{
    // This is expected when the delay is canceled.
}

try
{
    await processor.StopProcessingAsync();
}
finally
{
    // To prevent leaks, the handlers should be removed when processing is complete.

    processor.ProcessEventAsync -= processEventHandler;
    processor.ProcessErrorAsync -= processErrorHandler;
}

이벤트 프로세서 클라이언트에서 Active Directory 보안 주체 사용

Azure ID 라이브러리는 Event Hubs 및 Azure Storage를 포함하여 Azure 클라이언트 라이브러리에 사용할 수 있는 Azure Active Directory 인증 지원을 제공합니다.

Active Directory 보안 주체를 사용하기 위해 Event Hubs 클라이언트를 Azure.Identity 만들 때 라이브러리에서 사용 가능한 자격 증명 중 하나가 지정됩니다. 또한 정규화된 Event Hubs 네임스페이스와 원하는 Event Hub의 이름은 Event Hubs 연결 문자열 대신 제공됩니다.

Azure Storage Blob 컨테이너에서 Active Directory 보안 주체를 사용하려면 스토리지 클라이언트를 만들 때 컨테이너에 대한 정규화된 URL을 제공해야 합니다. Blob Storage에 액세스하기 위한 유효한 URI 형식에 대한 자세한 내용은 컨테이너, Blob 및 메타데이터 명명 및 참조에서 찾을 수 있습니다.

var credential = new DefaultAzureCredential();
var blobStorageUrl ="<< FULLY-QUALIFIED CONTAINER URL (like https://myaccount.blob.core.windows.net/mycontainer) >>";

var fullyQualifiedNamespace = "<< FULLY-QUALIFIED EVENT HUBS NAMESPACE (like something.servicebus.windows.net) >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(new Uri(blobStorageUrl), credential);

var processor = new EventProcessorClient
(
    storageClient,
    consumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    credential
);

Event Hubs에서 Azure Active Directory를 사용하는 경우 주체에 역할과 같은 Azure Event Hubs Data Receiver Event Hubs에서 읽을 수 있는 역할이 할당되어야 합니다. Event Hubs에서 Azure Active Directory 권한 부여를 사용하는 방법에 대한 자세한 내용은 관련 설명서를 참조하세요.

Azure Storage에서 Azure Active Directory를 사용하는 경우 주체에 역할과 같은 Storage Blob Data Contributor Blob에 대한 읽기, 쓰기 및 삭제 액세스를 허용하는 역할이 할당되어야 합니다. Azure Storage에서 Active Directory 권한 부여를 사용하는 방법에 대한 자세한 내용은 연결된 설명서Azure Storage 권한 부여 샘플을 참조하세요.

문제 해결

자세한 문제 해결 정보는 Event Hubs 문제 해결 가이드를 참조하세요.

예외 처리

이벤트 프로세서 클라이언트 예외

이벤트 프로세서 클라이언트는 예외에 직면하여 복원력을 위해 모든 시도를 수행하며 불가능하지 않는 한 처리를 계속하는 데 필요한 조치를 취합니다. 이 작업을 수행하려면 개발자의 조치가 필요하지 않습니다 . 기본적으로 프로세서 동작의 일부입니다.

개발자가 이벤트 프로세서 클라이언트 작업 내에서 발생하는 예외를 검사하고 대응할 수 있도록 이벤트를 통해 ProcessError 표시됩니다. 이 이벤트의 인수는 예외 및 예외가 관찰된 컨텍스트에 대한 세부 정보를 제공합니다. 개발자는 이 이벤트 처리기 내에서 이벤트 프로세서 클라이언트에서 오류에 대한 응답으로 중지 및/또는 다시 시작과 같은 정상적인 작업을 수행할 수 있지만, 그렇지 않으면 프로세서의 예외 동작에 영향을 미치지 않을 수 있습니다.

오류 처리기를 구현하는 기본 예제는 샘플: 이벤트 프로세서 처리기를 참조하세요.

이벤트 처리기의 예외

이벤트 프로세서 클라이언트는 개발자가 제공하는 이벤트 처리기 내에서 예외의 심각도를 이해할 수 있는 적절한 컨텍스트가 없기 때문에 어떤 작업이 적절한 응답인지 가정할 수 없습니다. 결과적으로 개발자는 블록 및 기타 표준 언어 구문을 사용하여 try/catch 제공하는 이벤트 처리기 내에서 발생하는 예외에 대한 책임이 있는 것으로 간주됩니다.

이벤트 프로세서 클라이언트는 개발자 코드에서 예외를 검색하거나 명시적으로 표시하지 않습니다. 결과 동작은 프로세서의 호스팅 환경 및 이벤트 처리기가 호출된 컨텍스트에 따라 달라집니다. 시나리오마다 다를 수 있으므로 개발자는 이벤트 처리기를 방어적으로 코딩하고 잠재적인 예외를 처리하는 것이 좋습니다.

로깅 및 진단

이벤트 프로세서 클라이언트 라이브러리는 .NET EventSource 을 사용하여 정보를 내보내는 다양한 수준의 세부 정보로 정보를 로깅하기 위해 완전히 계측됩니다. 로깅은 각 작업에 대해 수행되며 작업의 시작점, 완료 및 발생한 예외를 표시하는 패턴을 따릅니다. 인사이트를 제공할 수 있는 추가 정보는 연결된 작업의 컨텍스트에도 기록됩니다.

이벤트 프로세서 클라이언트 로그는 "Azure-Messaging-EventHubs-Processor-EventProcessorClient"라는 원본에 옵트인하거나 특성이 "AzureEventSource"인 모든 원본을 옵트인하여 사용할 수 EventListener 있습니다. Azure 클라이언트 라이브러리에서 로그를 더 Azure.Core 쉽게 캡처할 수 있도록 Event Hubs에서 사용하는 라이브러리는 을 AzureEventSourceListener제공합니다. 자세한 내용은 AzureEventSourceListener를 사용하여 Event Hubs 로그 캡처에서 찾을 수 있습니다.

또한 이벤트 프로세서 라이브러리는 Application Insights 또는 OpenTelemetry를 사용하여 분산 추적을 위해 계측됩니다. 자세한 내용은 Azure.Core 진단 샘플에서 찾을 수 있습니다.

다음 단계

설명된 시나리오 외에도 Azure Event Hubs 프로세서 라이브러리는 의 전체 기능 집합EventProcessorClient을 활용하는 데 도움이 되는 추가 시나리오를 지원합니다. 이러한 시나리오 중 일부를 탐색하는 데 도움이 되도록 Event Hubs 프로세서 클라이언트 라이브러리는 일반적인 시나리오에 대한 그림으로 사용할 샘플 프로젝트를 제공합니다. 자세한 내용은 샘플 추가 정보를 참조하세요.

참여

이 프로젝트에 대한 기여와 제안을 환영합니다. 대부분의 경우 기여하려면 권한을 부여하며 실제로 기여를 사용할 권한을 당사에 부여한다고 선언하는 CLA(기여자 라이선스 계약)에 동의해야 합니다. 자세한 내용은 https://cla.microsoft.com 을 참조하세요.

끌어오기 요청을 제출하면 CLA-bot은 CLA를 제공하고 PR을 적절하게 데코레이팅해야 하는지 여부를 자동으로 결정합니다(예: 레이블, 설명). 봇에서 제공하는 지침을 따르기만 하면 됩니다. 이 작업은 CLA를 사용하여 모든 리포지토리에서 한 번만 수행하면 됩니다.

이 프로젝트에는 Microsoft Open Source Code of Conduct(Microsoft 오픈 소스 준수 사항)가 적용됩니다. 자세한 내용은 Code of Conduct FAQ(규정 FAQ)를 참조하세요. 또는 추가 질문이나 의견은 opencode@microsoft.com으로 문의하세요.

자세한 내용은 기여 가이드를 참조하세요.

Impressions