Använd Java för att skicka händelser till eller ta emot händelser Azure Event Hubs (azure-messaging-eventhubs)
Den här snabbstarten visar hur du skickar händelser till och tar emot händelser från en händelsehubb med hjälp av Java-paketet azure-messaging-eventhubs.
Viktigt
I den här snabbstarten används det nya paketet azure-messaging-eventhubs. En snabbstart som använder de gamla paketen azure-eventhubs och azure-eventhubs-eph finns i Skicka och ta emot händelser med azure-eventhubs och azure-eventhubs-eph.
Förutsättningar
Om du inte har börjat använda Azure Event Hubs kan du gå Event Hubs översikt innan du gör den här snabbstarten.
För att slutföra den här snabbstarten, behöver du följande förhandskrav:
- Microsoft Azure prenumeration . Om du vill använda Azure-Azure Event Hubs måste du ha en prenumeration. Om du inte har ett befintligt Azure-konto kan du registrera dig för en kostnadsfri utvärderingsversion eller använda dina MSDN-prenumerantförmåner när du skapar ett konto.
- En Java-utvecklingsmiljö. I den här snabbstarten används Eclipse. Java Development Kit (JDK) med version 8 eller senare krävs.
- Skapa ett Event Hubs namnområde och en händelsehubb. Det första steget är att använda Azure Portal för att skapa ett namnområde av typen Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln. Hämta sedan anslutningssträngen för Event Hubs genom att följa anvisningarna i artikeln: Hämta anslutningssträngen. Du använder anslutningssträngen senare i den här snabbstarten.
Skicka händelser
Det här avsnittet visar hur du skapar ett Java-program för att skicka händelser till en händelsehubb.
Lägga till referens Azure Event Hubs bibliotek
Java-klientbiblioteket för Event Hubs är tillgängligt på Maven Central Repository. Du kan referera till det här biblioteket med hjälp av följande beroendedeklaration i Maven-projektfilen:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.7.0</version>
</dependency>
Anteckning
Uppdatera versionen till den senaste versionen som publicerats på Maven-lagringsplatsen.
Skriva kod för att skicka meddelanden till händelsehubben
För följande exempel skapar du först ett nytt Maven-projekt för ett konsol-/gränssnittsprogram i din favorit Java Development Environment. Lägg till en klass Sender med namnet och lägg till följande kod i klassen :
Viktigt
Uppdatera <Event Hubs namespace connection string> med anslutningssträngen till Event Hubs namnområdet. Uppdatera <Event hub name> med namnet på din händelsehubb i namnområdet.
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
public class Sender {
private static final String connectionString = "<Event Hubs namespace connection string>";
private static final String eventHubName = "<Event hub name>";
public static void main(String[] args) {
publishEvents();
}
}
Lägga till kod för att publicera händelser i händelsehubben
Lägg till en metod med publishEvents namnet i Sender klassen :
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
Skapa programmet och se till att det inte finns några fel. Du kör det här programmet när du har kört mottagarprogrammet.
Ta emot händelser
Koden i den här självstudien baseras på EventProcessorClient-exemplet på GitHub, som du kan undersöka för att se det fullständiga fungerande programmet.
Varning
Om du kör den här koden på Azure Stack Hub får du körningsfel om du inte riktar in dig på en Storage API-version. Det beror på att Event Hubs SDK använder det senaste tillgängliga API:et för Azure Storage i Azure som kanske inte är tillgängligt på din Azure Stack Hub plattform. Azure Stack Hub kan ha stöd för en annan version av Azure Blob Storage SDK än de som vanligtvis är tillgängliga i Azure. Om du använder Azure Blob Storage som kontrollpunktslager kontrollerar du vilken Azure Storage API-version som stöds för ditt Azure Stack Hub-bygge och riktar in dig på den versionen i koden.
Om du till exempel kör på Azure Stack Hub version 2005 är den högsta tillgängliga versionen för Storage-tjänsten version 2019-02-02. Som standard använder Event Hubs SDK-klientbiblioteket den högsta tillgängliga versionen i Azure (2019-07-07 vid tidpunkten för SDK-versionen). I det här fallet måste du, förutom att följa stegen i det här avsnittet, även lägga till kod för att rikta Storage-tjänstens API-version 2019-02-02. Ett exempel på hur du riktar in dig på en Storage API-version finns i det här exemplet på GitHub.
Skapa en Azure Storage och en blobcontainer
I den här snabbstarten använder du Azure Storage (särskilt Blob Storage) som kontrollpunktslager. Kontrollpunkter är en process genom vilken en händelseprocessor markerar eller checkar in positionen för den senast bearbetade händelsen inom en partition. Markering av en kontrollpunkt görs vanligtvis i den funktion som bearbetar händelserna. Mer information om kontrollpunkter finns i Händelseprocessor.
Följ de här stegen för att skapa Azure Storage konto.
Hämta anslutningssträngen till lagringskontot
Anteckna anslutningssträngen och containernamnet. Du använder dem i mottagningskoden.
Lägga Event Hubs bibliotek i Java-projektet
Lägg till följande beroenden i pom.xml filen.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
Lägg till följande importutdrag överst i Java-filen.
import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer;Skapa en klass med
Receivernamnet och lägg till följande strängvariabler i klassen . Ersätt platshållarna med rätt värden.Viktigt
Ersätt platshållarna med rätt värden.
<Event Hubs namespace connection string>med anslutningssträngen till Event Hubs namnområdet. Uppdatera<Event hub name>med namnet på din händelsehubb i namnområdet.<Storage connection string>med anslutningssträngen till ditt Azure Storage-konto.<Storage container name>med namnet på din container i Azure Blob Storage.
private static final String connectionString = "<Event Hubs namespace connection string>"; private static final String eventHubName = "<Event hub name>"; private static final String storageConnectionString = "<Storage connection string>"; private static final String storageContainerName = "<Storage container name>";Lägg till följande
mainmetod i klassen .public static void main(String[] args) throws Exception { // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .connectionString(storageConnectionString) .containerName(storageContainerName) .buildAsyncClient(); // Create a builder object that you will use later to build an event processor client to receive and process events and errors. EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() .connectionString(connectionString, eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); // Use the builder object to create an event processor client EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process"); }Lägg till de två hjälpmetoderna (
PARTITION_PROCESSORoch ) som bearbetar händelser och fel i klassenERROR_HANDLERReceiver.public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };Skapa programmet och se till att det inte finns några fel.
Köra programmen
Kör mottagarprogrammet först.
Kör sedan avsändarprogrammet.
I programfönstret Mottagare bekräftar du att du ser de händelser som har publicerats av avsändarprogrammet.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: BarTryck på RETUR i mottagarprogramfönstret för att stoppa programmet.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
Nästa steg
Se följande exempel på GitHub: