Použití Javy k odesílání nebo příjmu událostí z Azure Event Hubs (azure-messaging-eventhubs)

Tento rychlý start ukazuje, jak odesílat události do centra událostí a přijímat je z něj pomocí balíčku Java azure-messaging-eventhubs.

Důležité

V tomto rychlém startu se používá nový balíček azure-messaging-eventhubs. Rychlý start, který používá staré balíčky azure-eventhubs a azure-eventhubs-eph, najdete v tématu Odesílání a příjem událostí pomocí azure-eventhubs a azure-eventhubs-eph.

Požadavky

Pokud s tímto rychlým zprovozněním Azure Event Hubs, Event Hubs přehled.

K dokončení tohoto rychlého startu potřebujete následující požadavky:

  • Microsoft Azure předplatného. K používání služeb Azure, včetně Azure Event Hubs, potřebujete předplatné. Pokud ještě nemáte účet Azure, můžete si zaregistrovat bezplatnou zkušební verzi nebo využít výhody pro předplatitele MSDN při vytváření účtu.
  • A Java development environment. V tomto rychlém startu se používá Eclipse. Vyžaduje se sada Java Development Kit (JDK) verze 8 nebo novější.
  • Vytvořte Event Hubs oboru názvů a centrum událostí. Prvním krokem je použití Azure Portal k vytvoření oboru názvů typu Event Hubs a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku. Pak získejte připojovací řetězec pro obor názvů Event Hubs podle pokynů v článku: Získání připojovacího řetězce. Připojovací řetězec použijete později v tomto rychlém startu.

Odesílání událostí

V této části si ukážeme, jak vytvořit aplikaci v Javě pro odesílání událostí do centra událostí.

Přidání odkazu do Azure Event Hubs knihovny

Klientská knihovna Java pro Event Hubs je k dispozici v centrálním úložišti Maven. Na tuto knihovnu můžete odkazovat pomocí následující deklarace závislosti v souboru projektu Maven:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.7.0</version>
</dependency>

Poznámka

Aktualizujte verzi na nejnovější verzi publikovanou do úložiště Maven.

Napsání kódu pro odesílání zpráv do centra událostí

Pro následující příklad nejprve vytvořte nový projekt Maven pro aplikaci konzoly nebo prostředí v oblíbeném vývojovém prostředí Java. Přidejte třídu Sender s názvem a do třídy přidejte následující kód:

Důležité

Aktualizujte <Event Hubs namespace connection string> pomocí připojovacího řetězce na váš Event Hubs názvů. V oboru názvů aktualizujte názvem <Event hub name> vašeho centra událostí.

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

public class Sender {
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";

    public static void main(String[] args) {
        publishEvents();
    }
}

Přidání kódu pro publikování událostí do centra událostí

Do třídy publishEvents přidejte Sender metodu s názvem :

    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }

Sestavte program a ujistěte se, že ne dojde k žádným chybám. Tento program spustíte po spuštění programu příjemce.

Příjem událostí

Kód v tomto kurzu je založený na ukázce EventProcessorClientna GitHub , kterou můžete prozkoumat a podívat se na plnohodnotnou aplikaci.

Upozornění

Pokud tento kód spustíte v Azure Stack Hub, dojde k chybám za běhu, pokud nezacílit na konkrétní verzi Storage API. Je to proto, že Event Hubs SDK používá nejnovější dostupné rozhraní AZURE STORAGE API dostupné v Azure, které nemusí být dostupné na vaší Azure Stack Hub platformě. Azure Stack Hub může podporovat jinou verzi sady Azure Blob Storage SDK než ty, které jsou obvykle dostupné v Azure. Pokud jako úložiště kontrolních bodů Storage Azure Blob, zkontrolujte podporovanou verzi rozhraní Azure Storage API pro vaše Azure Stack Hub sestavení a cílí na tuto verzi ve vašem kódu.

Pokud například používáte verzi Azure Stack Hub 2005, je nejvyšší dostupná verze pro službu Storage verze 2019-02-02. Ve výchozím nastavení klientská knihovna Event Hubs SDK používá nejvyšší dostupnou verzi v Azure (7. 7. 2019 v době vydání sady SDK). V tomto případě budete kromě následujících kroků v této části také muset přidat kód pro cílení na rozhraní API služby Storage verze 2019-02-02. Příklad cílení na konkrétní verzi rozhraní API Storage najdete v této ukázce na GitHub.

Vytvoření kontejneru Azure Storage objektů blob

V tomto rychlém startu použijete Azure Storage (konkrétně blob Storage) jako úložiště kontrolních bodů. Vytváření kontrolních bodů je proces, pomocí kterého procesor událostí označí nebo potvrdí pozici poslední úspěšně zpracované události v rámci oddílu. Označení kontrolního bodu se obvykle provádí v rámci funkce, která zpracovává události. Další informace o kontrolních bodech najdete v tématu Procesor událostí.

Postupujte podle těchto kroků a vytvořte Azure Storage účet.

  1. Vytvoření účtu Azure Storage

  2. Vytvoření kontejneru objektů blob

  3. Získání připojovacího řetězce k účtu úložiště

    Poznamenejte si připojovací řetězec a název kontejneru. Použijete je v kódu pro příjem.

Přidání Event Hubs knihoven do projektu Java

Do tohoto souboru přidejte pom.xml závislosti.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.6.0</version>
    </dependency>
</dependencies>
  1. Na začátek souboru Java přidejte následující příkazy importu.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
  2. Vytvořte třídu s názvem Receiver a přidejte do třídy následující proměnné řetězce. Zástupné symboly nahraďte správnými hodnotami.

    Důležité

    Zástupné symboly nahraďte správnými hodnotami.

    • <Event Hubs namespace connection string> s připojovacím řetězcem k vašemu Event Hubs oboru názvů. Aktualizace
    • <Event hub name> názvem vašeho centra událostí v oboru názvů .
    • <Storage connection string> s připojovacím řetězcem k vašemu účtu úložiště Azure.
    • <Storage container name> názvem kontejneru ve službě Azure Blob Storage.
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";
    private static final String storageConnectionString = "<Storage connection string>";
    private static final String storageContainerName = "<Storage container name>";
    
  3. Do třídy main přidejte následující metodu.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }
    
  4. Přidejte do třídy dvě pomocné metody ( a ), které zpracovávají události a PARTITION_PROCESSOR ERROR_HANDLER Receiver chyby.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  5. Sestavte program a ujistěte se, že ne dojde k žádným chybám.

Spuštění aplikací

  1. Nejprve spusťte aplikaci Receiver.

  2. Pak spusťte aplikaci Sender.

  3. V okně aplikace Receiver potvrďte, že se zobrazí události publikované aplikací Sender.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Stisknutím klávesy ENTER v okně přijímací aplikace aplikaci zastavte.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Další kroky

Podívejte se na následující ukázky na GitHub: