Gebruik Java om gebeurtenissen te verzenden naar of te ontvangen van Azure Event Hubs (azure-messaging-eventhubs)

In deze quickstart ziet u hoe u gebeurtenissen kunt verzenden naar en ontvangen van een Event Hub met behulp van het azure-messaging-eventHubs Java-pakket.

Belangrijk

In deze snelstartgids wordt gebruik gemaakt van het nieuwe azure-messaging-eventhubs-pakket. Zie verzenden en ontvangen van gebeurtenissen met azure-eventhubs en azure-eventhubs-eph voor een quickstart die gebruikmaakt van de oude azure-eventhubs en azure-eventhubs-eph-pakketten.

Vereisten

Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.

Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:

  • Microsoft Azure-abonnement. Als u Azure-services wilt gebruiken, met inbegrip van Azure Event Hubs, hebt u een abonnement nodig. Als u nog geen Azure-account hebt, kunt u zich aanmelden voor een gratis proefversie of uw voordelen als MSDN-abonnee gebruiken wanneer u een account maakt.
  • Een Java-ontwikkelomgeving. In deze quickstart wordt gebruikgemaakt van Eclipse. Java Development Kit (JDK) met versie 8 of hoger is vereist.
  • Een Event Hubs-naamruimte en een Event Hub maken. In de eerste stap gebruikt u Azure Portal om een naamruimte van het type Event Hubs te maken en de beheerreferenties te verkrijgen die de toepassing nodig heeft om met de Event Hub te communiceren. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken. Haal vervolgens de verbindingsreeks voor de Event Hubs-naamruimte op door de instructies in het artikel te volgen: Verbindingstekenreeks ophalen. U gebruikt de verbindingsreeks later in deze quickstart.

Gebeurtenissen verzenden

In deze sectie wordt beschreven hoe u een Java-toepassing maakt voor het verzenden van gebeurtenissen naar een Event Hub.

Verwijzing naar Azure Event Hubs-bibliotheek toevoegen

De Java-clientbibliotheek voor Event Hubs is beschikbaar in de Centrale opslagplaats voor Maven. U kunt naar deze bibliotheek verwijzen met behulp van de volgende afhankelijkheidsdeclaratie in uw Maven-projectbestand:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.7.0</version>
</dependency>

Notitie

Werk de versie bij naar de nieuwste versie die is gepubliceerd naar de Maven-opslagplaats.

Code schrijven om berichten te verzenden naar de event hub

Maak voor het volgende voorbeeld eerst een nieuw Maven-project voor een console/shell-toepassing in uw favoriete Java-ontwikkelomgeving. Voeg een klasse genaamd Sender toe en voeg de volgende code aan de klasse toe:

Belangrijk

Werk <Event Hubs namespace connection string> bij met de connection string naar uw Event Hubs-naamruimte. Werk <Event hub name> bij met de naam van uw Event Hub in de naamruimte .

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

public class Sender {
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";

    public static void main(String[] args) {
        publishEvents();
    }
}

Code toevoegen om gebeurtenissen te publiceren naar de Event Hub

Voeg een methode met de publishEvents naam toe aan de klasse Sender :

    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a producer client
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }

Bouw het programma en controleer of er geen fouten zijn. U voert dit programma uit nadat u het ontvangersprogramma hebt uitgevoerd.

Gebeurtenissen ontvangen

De code in deze zelfstudie is gebaseerd op het EventProcessorClient-voorbeeld op GitHub, dat u kunt bekijken om de volledig werkende toepassing weer te geven.

Waarschuwing

Als u deze code op Azure Stack Hub uitvoert, treden er runtimefouten op tenzij u zich richt op een specifieke versie van de Storage-API. Dat komt doordat de Event Hubs-SDK de meest recente Azure Storage-API gebruikt die beschikbaar is in Azure maar die niet beschikbaar is op uw Azure Stack Hub-platform. Azure Stack Hub biedt mogelijk ondersteuning voor een andere versie van Azure Blob Storage SDK dan de SDK die doorgaans beschikbaar is in Azure. Als u Azure Blob-opslag gebruikt als controlepuntopslag, controleert u de ondersteunde versie van de Azure Storage-API voor uw build van Azure Stack Hub en stelt u die versie in uw code als doel in.

Als u bijvoorbeeld op Azure Stack Hub versie 2005 uitvoert, is de hoogste beschikbare versie van de Storage-service versie 2019-02-02. De Event Hubs-SDK-clientbibliotheek maakt standaard gebruik van de hoogste beschikbare versie op Azure (2019-07-07 op het moment van de release van de SDK). In dit geval moet u naast de volgende stappen in deze sectie ook code toevoegen om de API-versie van de Storage-service te richten op 2019-02-02. Zie dit voorbeeld op GitHub voor een voorbeeld van hoe u een specifieke versie van Storage-API instelt.

Een Azure Storage en een blobcontainer maken

In deze quickstart gebruikt u Azure Storage (bijvoorbeeld Blob Storage) als controlepuntopslag. Controlepunten is een proces waarbij een gebeurtenisprocessor de positie van de laatste uitgevoerde gebeurtenis binnen een partitie markeert of doorvoert. Het markeren van een controlepunt wordt doorgaans uitgevoerd binnen de functie waarmee de gebeurtenissen worden verwerkt. Zie Gebeurtenisprocessor voor meer informatie over controle punten.

Volg deze stappen om een Azure Storage-account te maken.

  1. Een Azure Storage-account maken

  2. Een blobcontainer maken

  3. De verbindingsreeks voor het opslagaccount ophalen

    Noteer de verbindingsreeks en de naam van de container. U gebruikt deze in de receive-code.

Event Hubs-bibliotheken toevoegen aan uw Java-project

Voeg de volgende afhankelijkheden toe in het pom.xml-bestand.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.7.0</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.6.0</version>
    </dependency>
</dependencies>
  1. Voeg de volgende instructie importeren in boven aan het Java-bestand.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
  2. Maak een klasse genaamd Receiver toe en voeg de reeks variabelen aan de klasse toe. Vervang de plaatsaanduidingen door de correcte waarden.

    Belangrijk

    Vervang de plaatsaanduidingen door de correcte waarden.

    • <Event Hubs namespace connection string> door de connection string aan uw Event Hubs naamruimte. Bijwerken
    • <Event hub name> door de naam van uw Event Hub in de naamruimte .
    • <Storage connection string> met de connection string naar uw Azure-opslagaccount.
    • <Storage container name> door de naam van uw container in uw Azure Blob-opslag.
    private static final String connectionString = "<Event Hubs namespace connection string>";
    private static final String eventHubName = "<Event hub name>";
    private static final String storageConnectionString = "<Storage connection string>";
    private static final String storageContainerName = "<Storage container name>";
    
  3. Voeg de volgende methode main toe aan de klasse.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(storageConnectionString)
            .containerName(storageContainerName)
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR)
            .processError(ERROR_HANDLER)
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }
    
  4. Voeg de twee helper-methoden (PARTITION_PROCESSOR en ERROR_HANDLER) toe waarmee gebeurtenissen en fouten naar de klasse Receiver worden verwerkt.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  5. Bouw het programma en controleer of er geen fouten zijn.

De toepassingen uitvoeren

  1. Voer eerst de ontvangertoepassing uit.

  2. Voer vervolgens de toepassing Afzender uit.

  3. Controleer in het toepassingsvenster Ontvanger of u de gebeurtenissen ziet die zijn gepubliceerd door de afzendertoepassing.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Druk op ENTER in het ontvangerstoepassingsvenster om de toepassing te stoppen.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Volgende stappen

Bekijk de volgende voorbeelden op GitHub: