EventProcessorClient Klasse
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClient
- com.
public class EventProcessorClient
EventProcessorClient bietet einen praktischen Mechanismus zum Nutzen von Ereignissen aus allen Partitionen eines Event Hubs im Kontext einer Consumergruppe. Ereignisprozessorbasierte Anwendung besteht aus einer oder mehreren Instanzen von EventProcessorClient(s), die für die Nutzung von Ereignissen aus derselben Event Hub-Consumergruppe eingerichtet sind, um die Workload auf verschiedene Instanzen zu verteilen und den Fortschritt bei der Verarbeitung von Ereignissen zu verfolgen. Basierend auf der Anzahl der ausgeführten Instanzen kann jeder EventProcessorClient 0 oder mehr Partitionen besitzen, um die Workload zwischen allen Instanzen auszugleichen.
Beispiel: Erstellen eines EventProcessorClient
Im folgenden Beispiel wird ein In-Memory-Speicher CheckpointStore verwendet, aber azure-messaging-eventhubs-checkpointstore-blob stellt einen Prüfpunktspeicher bereit, der von Azure Blob Storage unterstützt wird. Darüber hinaus fullyQualifiedNamespace
ist der Hostname des Event Hubs-Namespace. Sie wird im Bereich "Essentials" aufgeführt, nachdem Sie über das Azure-Portal zum Event Hubs-Namespace navigiert haben. Die verwendeten Anmeldeinformationen sind DefaultAzureCredential
darauf zurückzuführen, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen. Navigieren consumerGroup
Sie zum Event Hub-instance, und wählen Sie im Bereich "Entitäten" die Option "Consumergruppen" aus. Die consumerGroup
ist erforderlich. Die verwendeten Anmeldeinformationen sind DefaultAzureCredential
darauf zurückzuführen, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Methodenzusammenfassung
Modifizierer und Typ | Methode und Beschreibung |
---|---|
String |
getIdentifier()
Der Bezeichner ist ein eindeutiger Name, der diesem Ereignisprozessor instance. |
synchronized boolean |
isRunning()
Gibt zurück |
synchronized void |
start()
Startet die Verarbeitung von Ereignissen für alle Partitionen des Event Hubs, die dieser Ereignisprozessor besitzen kann, und weist jeder Partition ein dediziertes PartitionProcessor zu. |
synchronized void |
stop()
Beendet die Verarbeitung von Ereignissen für alle Partitionen, die diesem Ereignisprozessor gehören. |
Geerbte Methoden von java.lang.Object
Details zur Methode
getIdentifier
public String getIdentifier()
Der Bezeichner ist ein eindeutiger Name, der diesem Ereignisprozessor instance.
Returns:
isRunning
public synchronized boolean isRunning()
Gibt zurück true
, wenn der Ereignisprozessor ausgeführt wird. Wenn der Ereignisprozessor bereits ausgeführt wird, hat der Aufruf start() keine Auswirkungen.
Returns:
true
, wenn der Ereignisprozessor ausgeführt wird.start
public synchronized void start()
Startet die Verarbeitung von Ereignissen für alle Partitionen des Event Hubs, die dieser Ereignisprozessor besitzen kann, und weist jeder Partition ein dediziertes PartitionProcessor zu. Wenn andere Ereignisprozessoren für dieselbe Consumergruppe im Event Hub aktiv sind, wird die Verantwortung für Partitionen zwischen ihnen geteilt.
Nachfolgende Aufrufe zum Starten werden ignoriert, wenn dieser Ereignisprozessor bereits ausgeführt wird. Der Aufruf von start after stop() wird aufgerufen, um diesen Ereignisprozessor neu zu starten.
Starten des Prozessors zur Nutzung von Ereignissen aus allen Partitionen
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.checkpointStore(new SampleCheckpointStore())
.buildEventProcessorClient();
eventProcessorClient.start();
// Continue to perform other tasks while the processor is running in the background.
//
// Finally, stop the processor client when application is finished.
eventProcessorClient.stop();
stop
public synchronized void stop()
Beendet die Verarbeitung von Ereignissen für alle Partitionen, die diesem Ereignisprozessor gehören. Alle PartitionProcessor werden heruntergefahren, und alle offenen Ressourcen werden geschlossen.
Nachfolgende Aufrufe von stop werden ignoriert, wenn der Ereignisprozessor nicht ausgeführt wird.
Beenden des Prozessors
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.checkpointStore(new SampleCheckpointStore())
.buildEventProcessorClient();
eventProcessorClient.start();
// Continue to perform other tasks while the processor is running in the background.
//
// Finally, stop the processor client when application is finished.
eventProcessorClient.stop();
Gilt für:
Azure SDK for Java
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für