Java kullanarak Azure Event Hubs’daki olayları gönderme veya alma

Bu hızlı başlangıçta azure-messaging-eventhubs Java paketini kullanarak olay hub'ına olay gönderme ve olay hub'ından olay alma işlemleri gösterilmektedir .

İpucu

Bir Spring uygulamasında Azure Event Hubs kaynaklarıyla çalışıyorsanız Spring Cloud Azure'ı alternatif olarak değerlendirmenizi öneririz. Spring Cloud Azure, Azure hizmetleriyle sorunsuz Spring tümleştirmesi sağlayan açık kaynak bir projedir. Spring Cloud Azure hakkında daha fazla bilgi edinmek ve Event Hubs kullanarak bir örnek görmek için bkz . Azure Event Hubs ile Spring Cloud Stream.

Önkoşullar

Azure Event Hubs'da yeniyseniz bu hızlı başlangıcı gerçekleştirmeden önce event hubs'a genel bakış konusuna bakın.

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:

  • Microsoft Azure aboneliği. Azure Event Hubs da dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolabilir veya hesap oluştururken MSDN abone avantajlarınızı kullanabilirsiniz.
  • Java geliştirme ortamı. Bu hızlı başlangıçta Eclipse kullanılır. Sürüm 8 veya üzeri olan Java Development Kit (JDK) gereklidir.
  • Event Hubs ad alanı ve olay hub'ı oluşturun. İlk adımda Azure portalını kullanarak Event Hubs türünde bir ad alanı oluşturun, ardından uygulamanızın olay hub’ı ile iletişim kurması için gereken yönetim kimlik bilgilerini edinin. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin. Ardından, şu makaledeki yönergeleri izleyerek Event Hubs ad alanının bağlantı dizesi alın: bağlantı dizesi alma. Bu hızlı başlangıcın ilerleyen bölümlerinde bağlantı dizesi kullanacaksınız.

Olayları gönderme

Bu bölümde, olayları bir olay hub'ına göndermek için nasıl Java uygulaması oluşturulacağı gösterilmektedir.

Azure Event Hubs kitaplığına başvuru ekleme

İlk olarak, sık kullandığınız Java geliştirme ortamında bir konsol/kabuk uygulaması için yeni bir Maven projesi oluşturun. Dosyayı aşağıdaki gibi güncelleştirin pom.xml . Event Hubs için Java istemci kitaplığı Maven Central Repository'de kullanılabilir.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Not

Sürümü Maven deposunda yayımlanan en son sürüme güncelleştirin.

Azure'da uygulamanın kimliğini doğrulama

Bu hızlı başlangıçta Azure Event Hubs'a bağlanmanın iki yolu gösterilmektedir: parolasız ve bağlantı dizesi. İlk seçenek, Bir Event Hubs ad alanına bağlanmak için Microsoft Entra Id ve rol tabanlı erişim denetiminde (RBAC) güvenlik sorumlunuzu nasıl kullanacağınızı gösterir. Kodunuzda, yapılandırma dosyasında veya Azure Key Vault gibi güvenli bir depolama alanında sabit kodlanmış bağlantı dizesi olması konusunda endişelenmeniz gerekmez. İkinci seçenek, event hubs ad alanına bağlanmak için bir bağlantı dizesi nasıl kullanacağınızı gösterir. Azure'da yeniyseniz bağlantı dizesi seçeneğini daha kolay takip edebilirsiniz. Gerçek dünyadaki uygulamalarda ve üretim ortamlarında parolasız seçeneği kullanmanızı öneririz. Daha fazla bilgi için bkz . Kimlik doğrulaması ve yetkilendirme. Ayrıca, genel bakış sayfasında parolasız kimlik doğrulaması hakkında daha fazla bilgi edinebilirsiniz.

Microsoft Entra kullanıcınıza rol atama

Yerel olarak geliştirme yaparken, Azure Event Hubs'a bağlanan kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. İleti gönderip almak için Azure Event Hubs Veri Sahibi rolüne ihtiyacınız vardır. Kendinize bu rolü atamak için Kullanıcı Erişimi Yönetici istrator rolüne veya eylemi içeren Microsoft.Authorization/roleAssignments/write başka bir role ihtiyacınız olacaktır. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Kapsam genel bakış sayfasında rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgi edinin.

Aşağıdaki örnekte rol, Azure Event Hubs kaynaklarına tam erişim sağlayan kullanıcı hesabınıza atanır Azure Event Hubs Data Owner . Gerçek bir senaryoda, kullanıcılara yalnızca daha güvenli bir üretim ortamı için gereken minimum izinleri vermek için En Az Ayrıcalık İlkesi'ni izleyin.

Azure Event Hubs için Azure yerleşik rolleri

Azure Event Hubs için, Azure portalı ve Azure kaynak yönetimi API'sini kullanarak ad alanlarının ve tüm ilgili kaynakların yönetimi Azure RBAC modeli kullanılarak zaten korunur. Azure, Event Hubs ad alanına erişim yetkisi vermek için aşağıdaki Azure yerleşik rollerini sağlar:

Özel bir rol oluşturmak istiyorsanız bkz . Event Hubs işlemleri için gereken haklar.

Önemli

Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer. Nadir durumlarda, sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.

  1. Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak Event Hubs ad alanınızı bulun.

  2. Genel bakış sayfasında, sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.

  3. Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.

  4. Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.

    A screenshot showing how to assign a role.

  5. Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için eşleşen sonucu arayın Azure Event Hubs Data Owner ve seçin. Ardından İleri'yi seçin.

  6. Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.

  7. İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.

  8. Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.

Olay hub'ına ileti göndermek için kod yazma

adlı Senderbir sınıf ekleyin ve sınıfına aşağıdaki kodu ekleyin:

Önemli

  • Event Hubs ad alanınızın adıyla güncelleştirin <NAMESPACE NAME> .
  • Olay hub'ınızın adıyla güncelleştirin <EVENT HUB NAME> .
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Programı oluşturun ve hata olmadığından emin olun. Alıcı programını çalıştırdıktan sonra bu programı çalıştıracaksınız.

Olayları alma

Bu öğreticideki kod, GitHub'daki EventProcessorClient örneğini temel alır ve tam çalışan uygulamayı görmek için inceleyebilirsiniz.

Azure Blob Depolama denetim noktası deposu olarak kullanırken şu önerileri izleyin:

  • Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
  • Kapsayıcıyı başka hiçbir şey için kullanmayın ve depolama hesabını başka hiçbir şey için kullanmayın.
  • Depolama hesabın dağıtılan uygulamanın bulunduğu bölgede olması gerekir. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.

Azure portalındaki Depolama hesabı sayfasında, Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.

  • Hiyerarşik ad alanı
  • Blob geçici silme
  • Sürüm oluşturma

Azure Depolama ve blob kapsayıcısı oluşturma

Bu hızlı başlangıçta denetim noktası deposu olarak Azure Depolama (özellikle Blob Depolama) kullanacaksınız. Denetim noktası oluşturma, bir olay işlemcisinin bir bölümde başarıyla işlenen son olayın konumunu işaretlediği veya işlediği bir işlemdir. Bir denetim noktasını işaretleme işlemi genellikle olayları işleyen işlev içinde yapılır. Denetim noktası oluşturma hakkında daha fazla bilgi edinmek için bkz . Olay işlemcisi.

Azure Depolama hesabı oluşturmak için bu adımları izleyin.

  1. Azure Depolama hesabı oluşturma
  2. Blob kapsayıcısı oluşturma
  3. Blob kapsayıcısı için kimlik doğrulaması

Yerel olarak geliştirme yaparken blob verilerine erişen kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. Blob verilerini okumak ve yazmak için Depolama Blob Veri Katkıda Bulunanı gerekir. Kendinize bu rolü atamak için Kullanıcı Erişimi Yönetici istrator rolüne veya Microsoft.Authorization/roleAssignments/write eylemini içeren başka bir role atanmalısınız. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgiyi kapsam genel bakış sayfasından öğrenebilirsiniz.

Bu senaryoda, En Az Ayrıcalık İlkesi'ni izlemek için depolama hesabı kapsamındaki kullanıcı hesabınıza izinler atayacaksınız. Bu uygulama kullanıcılara yalnızca gereken minimum izinleri verir ve daha güvenli üretim ortamları oluşturur.

Aşağıdaki örnek, depolama hesabınızdaki blob verilerine hem okuma hem de yazma erişimi sağlayan Depolama Blob Verileri Katkıda Bulunanı rolünü kullanıcı hesabınıza atar.

Önemli

Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer, ancak nadir durumlarda sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.

  1. Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak depolama hesabınızı bulun.

  2. Depolama hesabına genel bakış sayfasında sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.

  3. Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.

  4. Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.

    A screenshot showing how to assign a storage account role.

  5. Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için Depolama Blob Veri Katkıda Bulunanı'nı arayın, eşleşen sonucu seçin ve ardından İleri'yi seçin.

  6. Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.

  7. İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.

  8. Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.

Java projenize Event Hubs kitaplıkları ekleme

pom.xml dosyasına aşağıdaki bağımlılıkları ekleyin.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Java dosyasının en üstüne aşağıdaki import deyimleri ekleyin.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. adlı Receiverbir sınıf oluşturun ve sınıfına aşağıdaki dize değişkenlerini ekleyin. Yer tutucuları doğru değerlerle değiştirin.

    Önemli

    Yer tutucuları doğru değerlerle değiştirin.

    • <NAMESPACE NAME> yerine Event Hubs ad alanını yazın.
    • <EVENT HUB NAME> ad alanında olay hub'ınızın adını yazın.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Sınıfına aşağıdaki main yöntemi ekleyin.

    Önemli

    Yer tutucuları doğru değerlerle değiştirin.

    • <STORAGE ACCOUNT NAME>azure Depolama hesabınızın adıyla birlikte.
    • <CONTAINER NAME> depolama hesabındaki blob kapsayıcısının adıyla
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Olayları ve hataları işleyen iki yardımcı yöntemi (PARTITION_PROCESSOR ve ERROR_HANDLER) sınıfına Receiver ekleyin.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Programı oluşturun ve hata olmadığından emin olun.

Uygulamaları çalıştırma

  1. Önce Alıcı uygulamasını çalıştırın.

  2. Ardından Sender uygulamasını çalıştırın.

  3. Alıcı uygulaması penceresinde, Gönderen uygulaması tarafından yayımlanan olayları gördüğünüzden emin olun.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Uygulamayı durdurmak için alıcı uygulama penceresinde ENTER tuşuna basın.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Sonraki adımlar

GitHub'da aşağıdaki örneklere bakın: