Použití jazyka Java k posílání událostí nebo přijímání událostí z Azure Event Hubs (Azure-zasílání zpráv – eventhubs)Use Java to send events to or receive events from Azure Event Hubs (azure-messaging-eventhubs)

V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat z něj události pomocí balíčku Java -Messaging-eventhubs Java.This quickstart shows how to send events to and receive events from an event hub using the azure-messaging-eventhubs Java package.

Důležité

V tomto rychlém startu se používá nový balíček Azure-Messaging-eventhubs .This quickstart uses the new azure-messaging-eventhubs package. Pro rychlý Start, který používá staré balíčky Azure-eventhubs a Azure-eventhubs-EPH , najdete informace v tématu posílání a přijímání událostí pomocí Azure-eventhubs a Azure-eventhubs-EPH.For a quickstart that uses the old azure-eventhubs and azure-eventhubs-eph packages, see Send and receive events using azure-eventhubs and azure-eventhubs-eph.

PožadavkyPrerequisites

Pokud s Azure Event Hubs teprve začínáte, přečtěte si téma přehled Event Hubs před provedením tohoto rychlého startu.If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

K dokončení tohoto rychlého startu potřebujete následující požadavky:To complete this quickstart, you need the following prerequisites:

  • Microsoft Azure předplatné.Microsoft Azure subscription. Pokud chcete používat služby Azure, včetně Azure Event Hubs, potřebujete předplatné.To use Azure services, including Azure Event Hubs, you need a subscription. Pokud nemáte existující účet Azure, můžete si zaregistrovat bezplatnou zkušební verzi nebo využít výhody pro předplatitele MSDN při vytváření účtu.If you don't have an existing Azure account, you can sign up for a free trial or use your MSDN subscriber benefits when you create an account.
  • Java development environment.A Java development environment. V tomto rychlém startu se používá zatmění.This quickstart uses Eclipse. Vyžaduje se sada Java Development Kit (JDK) s verzí 8 nebo vyšší.Java Development Kit (JDK) with version 8 or above is required.
  • Vytvoří obor názvů Event Hubs a centrum událostí.Create an Event Hubs namespace and an event hub. Prvním krokem je použití Azure Portal k vytvoření oboru názvů typu Event Hubs a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí.The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.To create a namespace and an event hub, follow the procedure in this article. Pak Získejte připojovací řetězec pro obor názvů Event Hubs podle pokynů uvedených v článku získání připojovacího řetězce.Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. Připojovací řetězec použijete později v tomto rychlém startu.You use the connection string later in this quickstart.

Odesílání událostíSend events

V této části se dozvíte, jak vytvořit aplikaci Java pro posílání událostí do centra událostí.This section shows you how to create a Java application to send events an event hub.

Přidat odkaz na knihovnu Azure Event HubsAdd reference to Azure Event Hubs library

Klientská knihovna Java pro Event Hubs je k dispozici v centrálním úložišti Maven.The Java client library for Event Hubs is available in the Maven Central Repository. Na tuto knihovnu můžete odkazovat pomocí následující deklarace závislosti v rámci souboru projektu Maven:You can reference this library using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.0.1</version>
</dependency>

Napsání kódu pro odesílání zpráv do centra událostíWrite code to send messages to the event hub

Pro následující příklad nejprve vytvořte nový projekt Maven pro aplikaci konzoly nebo prostředí v oblíbeném vývojovém prostředí Java.For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. Přidejte třídu s názvem Sender a do třídy přidejte následující kód:Add a class named Sender, and add the following code to the class:

import com.azure.messaging.eventhubs.*;
import static java.nio.charset.StandardCharsets.UTF_8;

public class Sender {
       public static void main(String[] args) {
    }
}

Připojovací řetězec a centrum událostíConnection string and event hub

Tento kód používá připojovací řetězec k oboru názvů Event Hubs a název centra událostí pro sestavení klienta Event Hubs.This code uses the connection string to the Event Hubs namespace and the name of the event hub to build an Event Hubs client.

String connectionString = "<CONNECTION STRING to EVENT HUBS NAMESPACE>";
String eventHubName = "<EVENT HUB NAME>";

Vytvoření klienta Event Hubsho producentaCreate an Event Hubs Producer client

Tento kód vytvoří objekt klienta producenta, který se používá k vytvoření nebo odeslání událostí do centra událostí.This code creates a producer client object that's used to produce/send events to the event hub.

EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString(connectionString, eventHubName)
    .buildProducerClient();

Příprava dávky událostíPrepare a batch of events

Tento kód připraví dávku událostí.This code prepares a batch of events.

EventDataBatch batch = producer.createBatch();
batch.tryAdd(new EventData("First event"));
batch.tryAdd(new EventData("Second event"));
batch.tryAdd(new EventData("Third event"));
batch.tryAdd(new EventData("Fourth event"));
batch.tryAdd(new EventData("Fifth event"));

Odeslání dávky událostí do centra událostíSend the batch of events to the event hub

Tento kód pošle dávku událostí, které jste připravili v předchozím kroku, do centra událostí.This code sends the batch of events you prepared in the previous step to the event hub. Následující bloky kódu na operaci odeslání.The following code blocks on the send operation.

producer.send(batch);

Zavřít a vyčistitClose and cleanup

Tento kód zavírá producenta.This code closes the producer.

producer.close();

Dokončení kódu pro odesílání událostíComplete code to send events

Tady je kompletní kód pro odesílání událostí do centra událostí.Here is the complete code to send events to the event hub.

import com.azure.messaging.eventhubs.*;

public class Sender {
    public static void main(String[] args) {
        final String connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
        final String eventHubName = "EVENT HUB NAME";

        // create a producer using the namespace connection string and event hub name
        EventHubProducerClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildProducerClient();

        // prepare a batch of events to send to the event hub    
        EventDataBatch batch = producer.createBatch();
        batch.tryAdd(new EventData("First event"));
        batch.tryAdd(new EventData("Second event"));
        batch.tryAdd(new EventData("Third event"));
        batch.tryAdd(new EventData("Fourth event"));
        batch.tryAdd(new EventData("Fifth event"));

        // send the batch of events to the event hub
        producer.send(batch);

        // close the producer
        producer.close();
    }
}

Sestavte program a zajistěte, aby nedocházelo k chybám.Build the program, and ensure that there are no errors. Po spuštění programu přijímače budete tento program spouštět.You'll run this program after you run the receiver program.

Příjem událostíReceive events

Kód v tomto kurzu je založený na ukázce EventProcessorClient na GitHubu, kterou si můžete prohlédnout, abyste viděli úplnou funkční aplikaci.The code in this tutorial is based on the EventProcessorClient sample on GitHub, which you can examine to see the full working application.

Upozornění

Pokud spustíte tento kód v Azure Stackovém centru, dojde k chybám za běhu, pokud necílíte na konkrétní verzi rozhraní API úložiště.If you run this code on Azure Stack Hub, you will experience runtime errors unless you target a specific Storage API version. Důvodem je, že sada Event Hubs SDK používá nejnovější dostupné rozhraní API Azure Storage dostupné v Azure, které nemusí být k dispozici na vaší platformě služby Azure Stack hub.That's because the Event Hubs SDK uses the latest available Azure Storage API available in Azure that may not be available on your Azure Stack Hub platform. Centrum Azure Stack může podporovat jinou verzi sady SDK pro úložiště objektů blob, než jsou ta, která jsou běžně dostupná v Azure.Azure Stack Hub may support a different version of Storage Blob SDK than those typically available on Azure. Pokud jako úložiště kontrolního bodu používáte Azure blogu Storage, podívejte se na podporovanou verzi rozhraní API Azure Storage pro sestavení centra Azure Stack a cílení na verzi v kódu.If you are using Azure Blog Storage as a checkpoint store, check the supported Azure Storage API version for your Azure Stack Hub build and target that version in your code.

Pokud například používáte v Azure Stack centra verze 2005, nejvyšší dostupná verze služby úložiště je verze 2019-02-02.For example, If you are running on Azure Stack Hub version 2005, the highest available version for the Storage service is version 2019-02-02. Ve výchozím nastavení používá Klientská knihovna Event Hubs SDK nejvyšší dostupnou verzi v Azure (2019-07-07 v době vydání sady SDK).By default, the Event Hubs SDK client library uses the highest available version on Azure (2019-07-07 at the time of the release of the SDK). V takovém případě, kromě kroků v této části, budete také muset přidat kód pro cílení na rozhraní API služby úložiště verze 2019-02-02.In this case, besides following steps in this section, you will also need to add code to target the Storage service API version 2019-02-02. Příklad cílení na konkrétní verzi rozhraní API úložiště najdete v této ukázce na GitHubu.For an example on how to target a specific Storage API version, see this sample on GitHub.

Vytvoření Azure Storage a kontejneru objektů BLOBCreate an Azure Storage and a blob container

V tomto rychlém startu použijete jako úložiště kontrolního bodu Azure Storage (konkrétně Blob Storage).In this quickstart, you use Azure Storage (specifically, Blob Storage) as the checkpoint store. Kontrolní bod je proces, při kterém procesor událostí označí nebo potvrdí pozici poslední úspěšné zpracovávané události v rámci oddílu.Checkpointing is a process by which an event processor marks or commits the position of the last successfully processed event within a partition. Označení kontrolního bodu se obvykle provádí ve funkci, která události zpracovává.Marking a checkpoint is typically done within the function that processes the events. Další informace o vytváření kontrolních bodů najdete v tématu procesor událostí.To learn more about checkpointing, see Event processor.

Pomocí těchto kroků vytvořte účet Azure Storage.Follow these steps to create an Azure Storage account.

  1. Vytvoření účtu Azure StorageCreate an Azure Storage account

  2. Vytvoření kontejneru objektů blobCreate a blob container

  3. Získání připojovacího řetězce k účtu úložištěGet the connection string to the storage account

    Poznamenejte si připojovací řetězec a název kontejneru.Note down the connection string and the container name. Budete je používat v kódu příjmu.You'll use them in the receive code.

Přidání knihoven Event Hubs do projektu JavaAdd Event Hubs libraries to your Java project

Do souboru pom.xml přidejte následující závislosti.Add the following dependencies in the pom.xml file.

<dependencies>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.1.1</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
        <version>1.5.0</version>
    </dependency>
</dependencies>
  1. Do horní části souboru Java přidejte následující příkazy pro Import .Add the following import statements at the top of the Java file.

    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import java.util.function.Consumer;
    import java.util.concurrent.TimeUnit;
    
  2. Vytvořte třídu s názvem Receiver a přidejte do třídy následující řetězcové proměnné.Create a class named Receiver, and add the following string variables to the class. Nahraďte zástupné symboly správnými hodnotami.Replace the placeholders with the correct values.

    private static final String EH_NAMESPACE_CONNECTION_STRING = "<EVENT HUBS NAMESPACE CONNECTION STRING>";
    private static final String eventHubName = "<EVENT HUB NAME>";
    private static final String STORAGE_CONNECTION_STRING = "<AZURE STORAGE CONNECTION STRING>";
    private static final String STORAGE_CONTAINER_NAME = "<AZURE STORAGE CONTAINER NAME>";
    
  3. mainDo třídy přidejte následující metodu.Add the following main method to the class.

    public static void main(String[] args) throws Exception {
        // Create a blob container client that you use later to build an event processor client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING) 
            .containerName(STORAGE_CONTAINER_NAME) 
            .buildAsyncClient();
    
        // Create a builder object that you will use later to build an event processor client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
            .connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName) 
            .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
            .processEvent(PARTITION_PROCESSOR) 
            .processError(ERROR_HANDLER) 
            .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); 
    
        // Use the builder object to create an event processor client 
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
        System.out.println("Starting event processor");
        eventProcessorClient.start();
    
        System.out.println("Press enter to stop.");
        System.in.read();
    
        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");
    
        System.out.println("Exiting process");
    }    
    
  4. Přidejte dvě pomocné metody ( PARTITION_PROCESSOR a ERROR_HANDLER ), které zpracovávají události a chyby Receiver třídy.Add the two helper methods (PARTITION_PROCESSOR and ERROR_HANDLER) that process events and errors to the Receiver class.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n", 
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());
    
        if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };    
    
  5. Úplný kód by měl vypadat takto:The complete code should look like:

    
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import java.util.function.Consumer;
    import java.util.concurrent.TimeUnit;
    
    public class Receiver {
    
        private static final String EH_NAMESPACE_CONNECTION_STRING = "<EVENT HUBS NAMESPACE CONNECTION STRING>";
        private static final String eventHubName = "<EVENT HUB NAME>";
        private static final String STORAGE_CONNECTION_STRING = "<AZURE STORAGE CONNECTION STRING>";
        private static final String STORAGE_CONTAINER_NAME = "<AZURE STORAGE CONTAINER NAME>";
    
        public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s %n", 
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(), eventContext.getEventData().getBodyAsString());
    
            if (eventContext.getEventData().getSequenceNumber() % 10 == 0) {
                eventContext.updateCheckpoint();
            }
        };
    
        public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
            System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        };
    
        public static void main(String[] args) throws Exception {
            BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(STORAGE_CONNECTION_STRING)
                .containerName(STORAGE_CONTAINER_NAME)
                .buildAsyncClient();
    
            EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                .connectionString(EH_NAMESPACE_CONNECTION_STRING, eventHubName)
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));
    
            EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();
    
            System.out.println("Starting event processor");
            eventProcessorClient.start();
    
            System.out.println("Press enter to stop.");
            System.in.read();
    
            System.out.println("Stopping event processor");
            eventProcessorClient.stop();
            System.out.println("Event processor stopped.");
    
            System.out.println("Exiting process");
        }
    
    }
    
  6. Sestavte program a zajistěte, aby nedocházelo k chybám.Build the program, and ensure that there are no errors.

Spuštění aplikacíRun the applications

  1. Nejdřív spusťte aplikaci příjemce .Run the receiver application first.
  2. Pak spusťte aplikaci sender .Then, run the sender application.
  3. V okně přijímač aplikace potvrďte, že vidíte události, které byly publikovány aplikací odesílatele.In the receiver application window, confirm that you see the events that were published by the sender application.
  4. Stisknutím klávesy ENTER v okně přijímače aplikace zastavte aplikaci.Press ENTER in the receiver application window to stop the application.

Další krokyNext steps

Podívejte se na následující ukázky na GitHubu:See the following samples on GitHub: