Použití Javy k odesílání nebo příjmu událostí z Azure Event Hubs (azure-messaging-eventhubs)
Tento rychlý start ukazuje, jak odesílat události do centra událostí a přijímat je z něj pomocí balíčku Java azure-messaging-eventhubs.
Důležité
V tomto rychlém startu se používá nový balíček azure-messaging-eventhubs. Rychlý start, který používá staré balíčky azure-eventhubs a azure-eventhubs-eph, najdete v tématu Odesílání a příjem událostí pomocí azure-eventhubs a azure-eventhubs-eph.
Požadavky
Pokud s tímto rychlým zprovozněním Azure Event Hubs, Event Hubs přehled.
K dokončení tohoto rychlého startu potřebujete následující požadavky:
- Microsoft Azure předplatného. K používání služeb Azure, včetně Azure Event Hubs, potřebujete předplatné. Pokud ještě nemáte účet Azure, můžete si zaregistrovat bezplatnou zkušební verzi nebo využít výhody pro předplatitele MSDN při vytváření účtu.
- A Java development environment. V tomto rychlém startu se používá Eclipse. Vyžaduje se sada Java Development Kit (JDK) verze 8 nebo novější.
- Vytvořte Event Hubs oboru názvů a centrum událostí. Prvním krokem je použití Azure Portal k vytvoření oboru názvů typu Event Hubs a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku. Pak získejte připojovací řetězec pro obor názvů Event Hubs podle pokynů v článku: Získání připojovacího řetězce. Připojovací řetězec použijete později v tomto rychlém startu.
Odesílání událostí
V této části si ukážeme, jak vytvořit aplikaci v Javě pro odesílání událostí do centra událostí.
Přidání odkazu do Azure Event Hubs knihovny
Klientská knihovna Java pro Event Hubs je k dispozici v centrálním úložišti Maven. Na tuto knihovnu můžete odkazovat pomocí následující deklarace závislosti v souboru projektu Maven:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.7.0</version>
</dependency>
Poznámka
Aktualizujte verzi na nejnovější verzi publikovanou do úložiště Maven.
Napsání kódu pro odesílání zpráv do centra událostí
Pro následující příklad nejprve vytvořte nový projekt Maven pro aplikaci konzoly nebo prostředí v oblíbeném vývojovém prostředí Java. Přidejte třídu Sender s názvem a do třídy přidejte následující kód:
Důležité
Aktualizujte <Event Hubs namespace connection string> pomocí připojovacího řetězce na váš Event Hubs názvů. V oboru názvů aktualizujte názvem <Event hub name> vašeho centra událostí.
import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;
public class Sender {
private static final String connectionString = "<Event Hubs namespace connection string>";
private static final String eventHubName = "<Event hub name>";
public static void main(String[] args) {
publishEvents();
}
}
Přidání kódu pro publikování událostí do centra událostí
Do třídy publishEvents přidejte Sender metodu s názvem :
/**
* Code sample for publishing events.
* @throws IllegalArgumentException if the EventData is bigger than the max batch size.
*/
public static void publishEvents() {
// create a producer client
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// sample events in an array
List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
for (EventData eventData : allEvents) {
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
Sestavte program a ujistěte se, že ne dojde k žádným chybám. Tento program spustíte po spuštění programu příjemce.
Příjem událostí
Kód v tomto kurzu je založený na ukázce EventProcessorClientna GitHub , kterou můžete prozkoumat a podívat se na plnohodnotnou aplikaci.
Upozornění
Pokud tento kód spustíte v Azure Stack Hub, dojde k chybám za běhu, pokud nezacílit na konkrétní verzi Storage API. Je to proto, že Event Hubs SDK používá nejnovější dostupné rozhraní AZURE STORAGE API dostupné v Azure, které nemusí být dostupné na vaší Azure Stack Hub platformě. Azure Stack Hub může podporovat jinou verzi sady Azure Blob Storage SDK než ty, které jsou obvykle dostupné v Azure. Pokud jako úložiště kontrolních bodů Storage Azure Blob, zkontrolujte podporovanou verzi rozhraní Azure Storage API pro vaše Azure Stack Hub sestavení a cílí na tuto verzi ve vašem kódu.
Pokud například používáte verzi Azure Stack Hub 2005, je nejvyšší dostupná verze pro službu Storage verze 2019-02-02. Ve výchozím nastavení klientská knihovna Event Hubs SDK používá nejvyšší dostupnou verzi v Azure (7. 7. 2019 v době vydání sady SDK). V tomto případě budete kromě následujících kroků v této části také muset přidat kód pro cílení na rozhraní API služby Storage verze 2019-02-02. Příklad cílení na konkrétní verzi rozhraní API Storage najdete v této ukázce na GitHub.
Vytvoření kontejneru Azure Storage objektů blob
V tomto rychlém startu použijete Azure Storage (konkrétně blob Storage) jako úložiště kontrolních bodů. Vytváření kontrolních bodů je proces, pomocí kterého procesor událostí označí nebo potvrdí pozici poslední úspěšně zpracované události v rámci oddílu. Označení kontrolního bodu se obvykle provádí v rámci funkce, která zpracovává události. Další informace o kontrolních bodech najdete v tématu Procesor událostí.
Postupujte podle těchto kroků a vytvořte Azure Storage účet.
Získání připojovacího řetězce k účtu úložiště
Poznamenejte si připojovací řetězec a název kontejneru. Použijete je v kódu pro příjem.
Přidání Event Hubs knihoven do projektu Java
Do tohoto souboru přidejte pom.xml závislosti.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
Na začátek souboru Java přidejte následující příkazy importu.
import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.util.function.Consumer;Vytvořte třídu s názvem
Receivera přidejte do třídy následující proměnné řetězce. Zástupné symboly nahraďte správnými hodnotami.Důležité
Zástupné symboly nahraďte správnými hodnotami.
<Event Hubs namespace connection string>s připojovacím řetězcem k vašemu Event Hubs oboru názvů. Aktualizace<Event hub name>názvem vašeho centra událostí v oboru názvů .<Storage connection string>s připojovacím řetězcem k vašemu účtu úložiště Azure.<Storage container name>názvem kontejneru ve službě Azure Blob Storage.
private static final String connectionString = "<Event Hubs namespace connection string>"; private static final String eventHubName = "<Event hub name>"; private static final String storageConnectionString = "<Storage connection string>"; private static final String storageContainerName = "<Storage container name>";Do třídy
mainpřidejte následující metodu.public static void main(String[] args) throws Exception { // Create a blob container client that you use later to build an event processor client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .connectionString(storageConnectionString) .containerName(storageContainerName) .buildAsyncClient(); // Create a builder object that you will use later to build an event processor client to receive and process events and errors. EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() .connectionString(connectionString, eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); // Use the builder object to create an event processor client EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process"); }Přidejte do třídy dvě pomocné metody ( a ), které zpracovávají události a
PARTITION_PROCESSORERROR_HANDLERReceiverchyby.public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };Sestavte program a ujistěte se, že ne dojde k žádným chybám.
Spuštění aplikací
Nejprve spusťte aplikaci Receiver.
Pak spusťte aplikaci Sender.
V okně aplikace Receiver potvrďte, že se zobrazí události publikované aplikací Sender.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: BarStisknutím klávesy ENTER v okně přijímací aplikace aplikaci zastavte.
Starting event processor Press enter to stop. Processing event from partition 0 with sequence number 331 with body: Foo Processing event from partition 0 with sequence number 332 with body: Bar Stopping event processor Event processor stopped. Exiting process
Další kroky
Podívejte se na následující ukázky na GitHub: