Java를 사용하여 Azure Event Hubs에 이벤트를 보내거나 이벤트를 받습니다.

이 빠른 시작에서는 azure-messaging-eventhubs Java 패키지를 사용하여 이벤트 허브에서 이벤트를 보내고 받는 방법을 보여 줍니다 .

Spring 애플리케이션에서 Azure Event Hubs 리소스를 사용하는 경우 Spring Cloud Azure를 대안으로 고려하는 것이 좋습니다. Spring Cloud Azure는 Azure 서비스와 원활한 Spring 통합을 제공하는 오픈 소스 프로젝트입니다. Spring Cloud Azure에 대해 자세히 알아보고 Event Hubs를 사용하는 예제를 보려면 Azure Event Hubs를 사용한 Spring Cloud Stream을 참조 하세요.

필수 조건

Azure Event Hubs를 새로 사용하는 경우 이 빠른 시작을 하기 전에 Event Hubs 개요를 참조하세요.

이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.

  • Microsoft Azure 구독. Azure Event Hubs를 비롯한 Azure 서비스를 사용하려면 구독이 필요합니다. 기존 Azure 계정이 없는 경우 평가판에 가입하거나 계정을 만들 때 MSDN 구독자 혜택을 사용할 수 있습니다.
  • Java 개발 환경. 이 빠른 시작에서는 Eclipse를 사용합니다. 버전 8 이상이 있는 JDK(Java Development Kit)가 필요합니다.
  • Event Hubs 네임스페이스 및 이벤트 허브 만들기 첫 번째 단계에서는 Azure Portal을 사용하여 Event Hubs 형식의 네임스페이스를 만들고 애플리케이션에서 Event Hub와 통신하는 데 필요한 관리 자격 증명을 얻습니다. 네임스페이스 및 이벤트 허브를 만들려면 이 문서의 절차를 따릅니다. 그런 다음, 연결 문자열 가져오기 문서의 지침에 따라 Event Hubs 네임스페이스에 대한 연결 문자열을 가져옵니다. 이 빠른 시작의 뒷부분에서 연결 문자열 사용합니다.

이벤트 보내기

이 섹션에서는 이벤트 허브로 이벤트를 전송하는 Java 애플리케이션을 만드는 방법을 보여줍니다.

Azure Event Hubs 라이브러리에 대한 참조 추가

먼저 즐겨 찾는 Java 개발 환경에서 콘솔/셸 애플리케이션에 대한 새 Maven 프로젝트를 만듭니다. pom.xml 다음과 같이 파일을 업데이트합니다. Event Hubs용 Java 클라이언트 라이브러리는 Maven 중앙 리포지토리에서 사용할 수 있습니다.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

참고 항목

버전을 Maven 리포지토리에 게시된 최신 버전으로 업데이트합니다.

Azure에 앱 인증

이 빠른 시작에서는 Azure Event Hubs에 연결하는 두 가지 방법, 즉 암호 없는 방법과 연결 문자열 보여 줍니다. 첫 번째 옵션은 Microsoft Entra ID 및 RBAC(역할 기반 액세스 제어)에서 보안 주체를 사용하여 Event Hubs 네임스페이스에 연결하는 방법을 보여 줍니다. 코드 또는 구성 파일 또는 Azure Key Vault와 같은 보안 스토리지에 하드 코딩된 연결 문자열 있는 것에 대해 걱정할 필요가 없습니다. 두 번째 옵션은 연결 문자열을 사용하여 Event Hubs 네임스페이스에 연결하는 방법을 보여 줍니다. Azure를 접하는 경우 연결 문자열 옵션을 더 쉽게 따를 수 있습니다. 실제 애플리케이션 및 프로덕션 환경에서 암호 없는 옵션을 사용하는 것이 좋습니다. 자세한 내용은 인증 및 권한 부여를 참조하세요. 개요 페이지에서 암호 없는 인증에 대한 자세한 내용을 확인할 수도 있습니다.

Microsoft Entra 사용자에게 역할 할당

로컬로 개발할 때 Azure Event Hubs에 연결하는 사용자 계정에 올바른 권한이 있는지 확인합니다. 메시지를 보내고 받으려면 Azure Event Hubs 데이터 소유자 역할이 필요합니다. 이 역할을 자신에게 할당하려면 사용자 액세스 관리주체 역할 또는 작업을 포함하는 Microsoft.Authorization/roleAssignments/write 다른 역할이 필요합니다. Azure Portal, Azure CLI 또는 Azure PowerShell을 사용하여 사용자에게 Azure RBAC 역할을 할당할 수 있습니다. 범위 개요 페이지에서 역할 할당에 사용할 수 있는 범위에 대해 자세히 알아봅니다.

다음 예제에서는 Azure Event Hubs Azure Event Hubs Data Owner 리소스에 대한 모든 권한을 제공하는 사용자 계정에 역할을 할당합니다. 실제 시나리오에서는 최소 권한 원칙에 따라 사용자에게 보다 안전한 프로덕션 환경에 필요한 최소 권한만 부여합니다.

Azure Event Hubs에 대한 Azure 기본 제공 역할

Azure Event Hubs의 경우 Azure Portal 및 Azure 리소스 관리 API를 통한 네임스페이스 및 모든 관련 리소스의 관리는 이미 Azure RBAC 모델을 사용하여 보호됩니다. Azure는 Event Hubs 네임스페이스에 대한 액세스 권한을 부여하기 위한 아래 Azure 기본 제공 역할을 제공합니다.

  • Azure Event Hubs 데이터 소유자: Event Hubs 네임스페이스 및 해당 엔터티(큐, 토픽, 구독 및 필터)에 대한 데이터 액세스를 사용하도록 설정합니다.
  • Azure Event Hubs 데이터 발신자: 이 역할을 사용하여 보낸 사람에게 Event Hubs 네임스페이스 및 해당 엔터티에 대한 액세스 권한을 부여합니다.
  • Azure Event Hubs 데이터 수신기: 이 역할을 사용하여 수신기에 Event Hubs 네임스페이스 및 해당 엔터티에 대한 액세스 권한을 부여합니다.

사용자 지정 역할을 만들려면 Event Hubs 작업에 필요한 권한을 참조 하세요.

Important

대부분의 경우 역할 할당이 Azure에서 전파되는 데 1~2분이 걸립니다. 드문 경우지만 최대 8분이 소요될 수 있습니다. 코드를 처음 실행할 때 인증 오류가 발생하면 잠시 기다렸다가 다시 시도하세요.

  1. Azure Portal에서 기본 검색 창 또는 왼쪽 탐색을 사용하여 Event Hubs 네임스페이스를 찾습니다.

  2. 개요 페이지의 왼쪽 메뉴에서 액세스 제어(IAM)를 선택합니다.

  3. 액세스 제어(IAM) 페이지에서 역할 할당 탭을 선택합니다.

  4. 위쪽 메뉴에서 + 추가를 선택한 다음, 드롭다운 메뉴에서 역할 할당 추가를 선택합니다.

    A screenshot showing how to assign a role.

  5. 검색 상자를 사용하여 결과를 원하는 역할로 필터링합니다. 이 예제에서는 일치하는 결과를 검색 Azure Event Hubs Data Owner 하여 선택합니다. 다음을 선택합니다.

  6. 다음에 대한 액세스 할당 아래에서 사용자, 그룹 또는 서비스 주체를 선택한 다음, + 멤버 선택을 선택합니다.

  7. 대화 상자에서 Microsoft Entra 사용자 이름(일반적으로 user@do기본 전자 메일 주소)을 검색한 다음 대화 상자 아래쪽에서 선택을 선택합니다.

  8. 검토 + 할당을 선택하여 최종 페이지로 이동한 다음, 검토 + 할당을 다시 선택하여 프로세스를 완료합니다.

이벤트 허브에 메시지를 전송하는 코드 작성

Sender 클래스를 추가하고 해당 클래스에 다음 코드를 추가합니다.

Important

  • Event Hubs 네임스페이스의 이름으로 업데이트 <NAMESPACE NAME> 합니다.
  • 이벤트 허브의 이름으로 업데이트 <EVENT HUB NAME> 합니다.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

프로그램을 빌드하고 오류가 없는지 확인합니다. 수신기 프로그램을 실행한 후 이 프로그램을 실행합니다.

이벤트 수신

이 자습서의 코드는 전체 작동 중인 애플리케이션을 검사할 수 있는 GitHub의 EventProcessorClient 샘플을 기반으로 합니다.

Azure Blob Storage를 검사점 저장소로 사용할 때 다음 권장 사항을 따릅니다.

  • 각 소비자 그룹에 대해 별도의 컨테이너를 사용합니다. 동일한 스토리지 계정을 사용할 수 있지만 각 그룹당 하나의 컨테이너를 사용합니다.
  • 컨테이너를 다른 용도로 사용하지 말고 스토리지 계정을 다른 용도로 사용하지 마세요.
  • 스토리지 계정은 배포된 애플리케이션이 있는 지역과 동일한 지역에 있어야 합니다. 애플리케이션이 온-프레미스인 경우 가능한 가장 가까운 지역을 선택해 보세요.

Azure Portal에서 Storage 계정 페이지의 Blob service 섹션에서 다음 설정을 사용하지 않도록 설정해야 합니다.

  • 계층 구조 네임스페이스
  • Blob 일시 삭제
  • 버전 관리

Azure Storage 및 BLOB 컨테이너 만들기

이 빠른 시작에서는 Azure Storage(특히 Blob Storage)를 검사포인트 저장소로 사용합니다. 검사점은 이벤트 프로세서가 파티션 내에서 마지막으로 처리된 이벤트의 위치를 표시하거나 커밋하는 프로세스입니다. 검사포인트 표시는 일반적으로 이벤트를 처리하는 함수 내에서 수행됩니다. 검사포인트 지정에 대한 자세한 내용은 이벤트 프로세서를 참조하세요.

다음 단계에 따라 Azure Storage 계정을 만듭니다.

  1. Azure Storage 계정 만들기
  2. Blob 컨테이너 만들기
  3. Blob 컨테이너에 인증

로컬로 개발하는 경우 Blob 데이터에 액세스하는 사용자 계정에 올바른 권한이 있는지 확인합니다. Blob 데이터를 읽고 쓰려면 Storage Blob 데이터 참가자가 필요합니다. 이 역할을 자신에게 할당하려면 사용자 액세스 관리자 역할 또는 Microsoft.Authorization/roleAssignments/write 작업을 포함하는 다른 역할이 필요합니다. Azure Portal, Azure CLI 또는 Azure PowerShell을 사용하여 사용자에게 Azure RBAC 역할을 할당할 수 있습니다. 범위 개요 페이지에서 역할 할당에 사용할 수 있는 범위에 대해 자세히 알아볼 수 있습니다.

이 시나리오에서는 최소 권한 원칙을 따르기 위해 범위가 스토리지 계정으로 지정된 사용자 계정에 권한을 할당합니다. 이 방법은 사용자에게 필요한 최소 권한만 부여하고 더 안전한 프로덕션 환경을 만듭니다.

다음 예제에서는 스토리지 계정의 Blob 데이터에 대한 읽기 및 쓰기 액세스를 모두 제공하는 Storage Blob 데이터 참가자 역할을 사용자 계정에 할당합니다.

Important

대부분의 경우 Azure에서 역할 할당이 전파되는 데 1~2분이 걸리지만 드문 경우이지만 최대 8분이 걸릴 수 있습니다. 코드를 처음 실행할 때 인증 오류가 발생하면 잠시 기다렸다가 다시 시도하세요.

  1. Azure Portal에서 기본 검색 창 또는 왼쪽 탐색 영역을 사용하여 스토리지 계정을 찾습니다.

  2. 스토리지 계정 개요 페이지의 왼쪽 메뉴에서 액세스 제어(IAM)를 선택합니다.

  3. 액세스 제어(IAM) 페이지에서 역할 할당 탭을 선택합니다.

  4. 위쪽 메뉴에서 + 추가를 선택한 다음, 드롭다운 메뉴에서 역할 할당 추가를 선택합니다.

    A screenshot showing how to assign a storage account role.

  5. 검색 상자를 사용하여 결과를 원하는 역할로 필터링합니다. 이 예에서는 Storage Blob 데이터 기여자를 검색하고, 일치하는 결과를 선택하고, 다음을 선택합니다.

  6. 다음에 대한 액세스 할당 아래에서 사용자, 그룹 또는 서비스 주체를 선택한 다음, + 멤버 선택을 선택합니다.

  7. 대화 상자에서 Microsoft Entra 사용자 이름(일반적으로 user@do기본 전자 메일 주소)을 검색한 다음 대화 상자 아래쪽에서 선택을 선택합니다.

  8. 검토 + 할당을 선택하여 최종 페이지로 이동한 다음, 검토 + 할당을 다시 선택하여 프로세스를 완료합니다.

Java 프로젝트에 Event Hubs 라이브러리 추가

pom.xml 파일에 다음 종속성을 추가합니다.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Java 파일의 맨 위에 다음 import 문을 추가합니다.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Receiver라는 클래스를 만들고 다음 문자열 변수를 클래스에 추가합니다. 자리 표시자를 올바른 값으로 바꿉니다.

    Important

    자리 표시자를 올바른 값으로 바꿉니다.

    • <NAMESPACE NAME>을 Event Hubs 네임스페이스 이름으로 바꿉니다.
    • <EVENT HUB NAME> 네임스페이스에 있는 이벤트 허브의 이름으로
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. 다음 main 메서드를 클래스에 추가합니다.

    Important

    자리 표시자를 올바른 값으로 바꿉니다.

    • <STORAGE ACCOUNT NAME> Azure Storage 계정의 이름을 사용합니다.
    • <CONTAINER NAME> 스토리지 계정의 Blob 컨테이너 이름으로
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. 이벤트 및 오류를 처리하는 두 개의 도우미 메서드(PARTITION_PROCESSORERROR_HANDLER)를 Receiver 클래스에 추가합니다.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. 프로그램을 빌드하고 오류가 없는지 확인합니다.

애플리케이션 실행

  1. 수신기 애플리케이션을 먼저 실행합니다 .

  2. 그런 다음, 보낸 사람 애플리케이션을 실행합니다.

  3. 받는 사람 애플리케이션 창에서 보낸 사람 애플리케이션에서 게시한 이벤트가 표시되는지 확인합니다.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. 수신기 애플리케이션 창에서 Enter 키를 눌러 애플리케이션을 중지합니다.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

다음 단계

GitHub에서 다음 샘플을 참조하세요.