EventProcessorClient Klasse

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClient

public class EventProcessorClient

EventProcessorClient bietet einen praktischen Mechanismus zum Nutzen von Ereignissen aus allen Partitionen eines Event Hubs im Kontext einer Consumergruppe. Ereignisprozessorbasierte Anwendung besteht aus einer oder mehreren Instanzen von EventProcessorClient(s), die für die Nutzung von Ereignissen aus derselben Event Hub-Consumergruppe eingerichtet sind, um die Workload auf verschiedene Instanzen zu verteilen und den Fortschritt bei der Verarbeitung von Ereignissen zu verfolgen. Basierend auf der Anzahl der ausgeführten Instanzen kann jeder EventProcessorClient 0 oder mehr Partitionen besitzen, um die Workload zwischen allen Instanzen auszugleichen.

Beispiel: Erstellen eines EventProcessorClient

Im folgenden Beispiel wird ein In-Memory-Speicher CheckpointStore verwendet, aber azure-messaging-eventhubs-checkpointstore-blob stellt einen Prüfpunktspeicher bereit, der von Azure Blob Storage unterstützt wird. Darüber hinaus fullyQualifiedNamespace ist der Hostname des Event Hubs-Namespace. Sie wird im Bereich "Essentials" aufgeführt, nachdem Sie über das Azure-Portal zum Event Hubs-Namespace navigiert haben. Die verwendeten Anmeldeinformationen sind DefaultAzureCredential darauf zurückzuführen, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen. Navigieren consumerGroup Sie zum Event Hub-instance, und wählen Sie im Bereich "Entitäten" die Option "Consumergruppen" aus. Die consumerGroup ist erforderlich. Die verwendeten Anmeldeinformationen sind DefaultAzureCredential darauf zurückzuführen, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

Methodenzusammenfassung

Modifizierer und Typ Methode und Beschreibung
String getIdentifier()

Der Bezeichner ist ein eindeutiger Name, der diesem Ereignisprozessor instance.

synchronized boolean isRunning()

Gibt zurück true , wenn der Ereignisprozessor ausgeführt wird.

synchronized void start()

Startet die Verarbeitung von Ereignissen für alle Partitionen des Event Hubs, die dieser Ereignisprozessor besitzen kann, und weist jeder Partition ein dediziertes PartitionProcessor zu.

synchronized void stop()

Beendet die Verarbeitung von Ereignissen für alle Partitionen, die diesem Ereignisprozessor gehören.

Geerbte Methoden von java.lang.Object

Details zur Methode

getIdentifier

public String getIdentifier()

Der Bezeichner ist ein eindeutiger Name, der diesem Ereignisprozessor instance.

Returns:

Bezeichner für diesen Ereignisprozessor.

isRunning

public synchronized boolean isRunning()

Gibt zurück true , wenn der Ereignisprozessor ausgeführt wird. Wenn der Ereignisprozessor bereits ausgeführt wird, hat der Aufruf start() keine Auswirkungen.

Returns:

true , wenn der Ereignisprozessor ausgeführt wird.

start

public synchronized void start()

Startet die Verarbeitung von Ereignissen für alle Partitionen des Event Hubs, die dieser Ereignisprozessor besitzen kann, und weist jeder Partition ein dediziertes PartitionProcessor zu. Wenn andere Ereignisprozessoren für dieselbe Consumergruppe im Event Hub aktiv sind, wird die Verantwortung für Partitionen zwischen ihnen geteilt.

Nachfolgende Aufrufe zum Starten werden ignoriert, wenn dieser Ereignisprozessor bereits ausgeführt wird. Der Aufruf von start after stop() wird aufgerufen, um diesen Ereignisprozessor neu zu starten.

Starten des Prozessors zur Nutzung von Ereignissen aus allen Partitionen

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop()

Beendet die Verarbeitung von Ereignissen für alle Partitionen, die diesem Ereignisprozessor gehören. Alle PartitionProcessor werden heruntergefahren, und alle offenen Ressourcen werden geschlossen.

Nachfolgende Aufrufe von stop werden ignoriert, wenn der Ereignisprozessor nicht ausgeführt wird.

Beenden des Prozessors

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

Gilt für: