Azure Event Hubs (Azure-Messaging-eventhubs) olay göndermek veya olayları almak için Java 'Yı kullanma

Bu hızlı başlangıçta, Azure-Messaging-eventhubs Java paketini kullanarak Olay Hub 'ından olayları gönderme ve olayları alma işlemlerinin nasıl yapılacağı gösterilir.

Önemli

Bu hızlı başlangıç, yeni Azure-Messaging-eventhubs paketini kullanır. Eski Azure-eventhubs ve Azure-eventhubs-EPH paketlerini kullanan bir hızlı başlangıç için bkz. Azure-eventhubs ve Azure-eventhubs-EPH kullanarak olay gönderme ve alma.

Önkoşullar

Azure Event Hubs yeni başladıysanız, bu hızlı başlangıcı uygulamadan önce Event Hubs genel bakış bölümüne bakın.

Bu hızlı başlangıcı tamamlayabilmeniz için aşağıdaki önkoşullara sahip olmanız gerekir:

  • Microsoft Azure aboneliği. Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğiniz olması gerekir. Mevcut bir Azure hesabınız yoksa, ücretsiz deneme için kaydolabilir veya BIR hesap oluştururkenMSDN abonesi avantajlarınızı kullanabilirsiniz.
  • Bir Java geliştirme ortamı. Bu hızlı başlangıç, tutulmakullanır. Java Development Kit (JDK) sürümü 8 veya üzeri gereklidir.
  • Event Hubs bir ad alanı ve bir olay hub 'ı oluşturun. İlk adım, Event Hubs türünde bir ad alanı oluşturmak için Azure Portal ve uygulamanızın Olay Hub 'ı ile iletişim kurması için gereken yönetim kimlik bilgilerini elde etmek için kullanılır. Bir ad alanı ve Olay Hub 'ı oluşturmak için Bu makaledekiyordamı izleyin. Ardından, makalenin yönergelerini izleyerek Event Hubs ad alanı için bağlantı dizesini alın: bağlantı dizesi al. Bağlantı dizesini daha sonra bu hızlı başlangıçta kullanacaksınız.

Olayları gönderme

Bu bölümde, Olay Hub 'ına olay göndermek için bir Java uygulaması oluşturma gösterilmektedir.

Azure Event Hubs kitaplığı 'na başvuru ekleme

Event Hubs için Java istemci kitaplığı, Maven merkezi deposundabulunabilir. Bu kitaplığa, Maven proje dosyanızda aşağıdaki bağımlılık bildirimini kullanarak başvurabilirsiniz:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.7.0</version>
</dependency>

Not

Sürümü, Maven deposunda yayınlanan en son sürüme güncelleştirin.

Olay hub'ına ileti göndermek için kod yazma

Aşağıdaki örnek için önce en sevdiğiniz Java geliştirme ortamında bir konsol/kabuk uygulaması için yeni bir Maven projesi oluşturun. Adlı bir sınıf ekleyin Sender ve aşağıdaki kodu sınıfına ekleyin:

Önemli

<Event Hubs namespace connection string>Event Hubs ad alanınız için bağlantı dizesiyle güncelleştirin. <Event hub name>Ad alanındaki Olay Hub 'ınızın adıyla güncelleştirin.

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

public class Sender {
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";

    public static void main(String[] args) {
        publishEvents();
    }
}

Olayları Olay Hub 'ına yayımlamak için kod ekleme

Sınıfına adlı bir yöntem ekleyin publishEvents Sender :

    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }

Programı oluşturun ve hata olmadığından emin olun. Alıcı programını çalıştırdıktan sonra bu programı çalıştırırsınız.

Olayları alma

bu öğreticideki kod, tüm çalışma uygulamasını görmek için inceleyebileceğiniz GitHub eventprocessorclient örneğinedayalıdır.

Uyarı

bu kodu Azure Stack Hub 'da çalıştırırsanız, belirli bir Depolama apı sürümünü hedefetmediğiniz takdirde çalışma zamanı hatalarıyla karşılaşırsınız. bunun nedeni, Event Hubs SDK 'sının azure 'da kullanılabilen ve Azure Stack Hub platformunda kullanılamayan en son azure Depolama apı 'sini kullanması nedeniyle oluşur. Azure Stack Hub, azure 'da genel kullanıma sunulan farklı azure Blob Depolama SDK sürümünü destekleyebilir. bir denetim noktası deposu olarak azure Blob Depolama kullanıyorsanız, Azure Stack Hub derlemeniz için desteklenen Azure Depolama apı sürümünü denetleyin ve bu sürümü kodunuzda hedefleyin.

örneğin, Azure Stack Hub sürüm 2005 üzerinde çalıştırıyorsanız, Depolama hizmeti için kullanılabilen en yüksek sürüm 2019-02-02 ' dir. Event Hubs SDK istemci kitaplığı, varsayılan olarak Azure 'da kullanılabilen en yüksek sürümü (SDK 'nın sürümü sırasında 2019-07-07) kullanır. bu durumda, bu bölümdeki adımların yanı sıra, Depolama service apı sürüm 2019-02-02 ' i hedeflemek için de kod eklemeniz gerekecektir. belirli bir Depolama apı sürümünün nasıl hedeflenecek hakkında bir örnek için, bu örneğe GitHubbakın.

Azure Depolama ve blob kapsayıcısı oluşturma

bu hızlı başlangıçta, denetim noktası deposu olarak Azure Depolama (özellikle Blob Depolama) kullanacaksınız. Checkişaret, bir olay işlemcisinin bir bölüm içinde son başarılı bir şekilde işlenen etkinliğin konumunu işaretleyen veya işleme yaptığı bir işlemdir. Bir kontrol noktasının işaretlenmesi genellikle olayları işleyen işlev içinde yapılır. Checkişaret hakkında daha fazla bilgi için bkz. olay işlemcisi.

Azure Depolama hesabı oluşturmak için bu adımları izleyin.

  1. Azure Depolama hesabı oluşturma

  2. Blob kapsayıcısı oluşturma

  3. Bağlantı dizesini depolama hesabına al

    Bağlantı dizesini ve kapsayıcı adını aklınızda edin. Bunları alma kodunda kullanacaksınız.

Java projenize Event Hubs kitaplıkları ekleyin

Aşağıdaki bağımlılıkları pom.xml dosyasına ekleyin.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.6.0</version>
    </dependency>
</dependencies>
  1. Aşağıdaki Import deyimlerini Java dosyasının en üstüne ekleyin.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
  2. Adlı bir sınıf oluşturun Receiver ve aşağıdaki dize değişkenlerini sınıfına ekleyin. Yer tutucuları doğru değerlerle değiştirin.

    Önemli

    Yer tutucuları doğru değerlerle değiştirin.

    • <Event Hubs namespace connection string> Event Hubs ad alanına bağlantı dizesi ile. Güncelleştir
    • <Event hub name> ad alanındaki Olay Hub 'ınızın adı ile.
    • <Storage connection string> Azure depolama hesabınıza bağlantı dizesi ile.
    • <Storage container name> Azure Blob depolama alanındaki kapsayıcının adı ile.
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";
    private static final String storageConnectionString = "<Storage connection string>";
    private static final String storageContainerName = "<Storage container name>";
    
  3. mainSınıfına aşağıdaki yöntemi ekleyin.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }
    
  4. PARTITION_PROCESSOR ERROR_HANDLER Olayları ve hataları sınıfına işleyen iki yardımcı yöntemi (ve) ekleyin Receiver .

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  5. Programı oluşturun ve hata olmadığından emin olun.

Uygulamaları çalıştırma

  1. Önce alıcı uygulamasını çalıştırın.

  2. Ardından, Gönderen uygulamasını çalıştırın.

  3. Alıcı uygulaması penceresinde, gönderen uygulama tarafından yayınlanan olayları görtığınızdan emin olun.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Uygulamayı durdurmak için alıcı uygulaması penceresinde ENTER tuşuna basın.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Sonraki adımlar

GitHub aşağıdaki örneklere bakın: