bibliothèque cliente Azure Event Hubs pour Java - version 5.16.0

Azure Event Hubs est un service de publication-abonnement hautement évolutif qui peut ingérer des millions d’événements par seconde et les diffuser en continu vers plusieurs consommateurs. Cela vous permet de traiter et d’analyser les énormes quantités de données produites par vos appareils et applications connectés. Une fois qu’Event Hubs a collecté les données, vous pouvez les récupérer, les transformer et les stocker à l’aide de n’importe quel fournisseur d’analyse en temps réel ou avec des adaptateurs de traitement par lots/stockage. Si vous souhaitez en savoir plus sur Azure Event Hubs, vous pouvez consulter : Qu’est-ce qu’Event Hubs ?

La bibliothèque de client Azure Event Hubs permet de publier et de consommer des événements Azure Event Hubs et peut être utilisée pour :

  • Émettre des données de télémétrie sur votre application à des fins décisionnelles et de diagnostic.
  • Publier des faits à propos de l’état de votre application que les parties intéressées peuvent observer et utiliser comme déclencheur d’une action.
  • Observer les opérations et les interactions intéressantes qui se produisent au sein de votre entreprise ou d’un autre écosystème, ce qui permet aux systèmes faiblement couplés d’interagir sans qu’il soit nécessaire de les lier ensemble.
  • Recevoir les événements d’un ou de plusieurs serveurs de publication, les transformer pour qu’ils répondent mieux aux besoins de votre écosystème, puis publier les événements transformés dans un nouveau flux à l’attention des consommateurs.

| Code sourceDocumentation de référence sur les | API | Documentation produitÉchantillons | Dépannage

Table des matières

Prise en main

Prérequis

Inclure le package

Inclure le fichier de nomenclature

Incluez le kit azure-sdk-bom à votre projet pour qu’il soit dépendant de la version disponibilité générale (GA) de la bibliothèque. Dans l’extrait de code suivant, remplacez l’espace réservé {bom_version_to_target} par le numéro de version. Pour en savoir plus sur la nomenclature, consultez LE FICHIER README DE NOMENCLATURE DU KIT DE DÉVELOPPEMENT LOGICIEL AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

puis incluez la dépendance directe dans la section dépendances sans la balise de version comme indiqué ci-dessous.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
  </dependency>
</dependencies>

Inclure une dépendance directe

Si vous souhaitez prendre la dépendance sur une version particulière de la bibliothèque qui n’est pas présente dans la nomenclature, ajoutez la dépendance directe à votre projet comme suit.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.16.0</version>
</dependency>

Authentifier le client

Pour que la bibliothèque cliente Event Hubs interagisse avec un hub d’événements, elle doit comprendre comment se connecter et autoriser avec elle.

Créer un producteur Event Hub à l’aide d’une chaîne de connexion

Le moyen le plus simple consiste à utiliser une chaîne de connexion, qui est créée automatiquement lors de la création d’un espace de noms Event Hubs. Si vous n’êtes pas familiarisé avec les stratégies d’accès partagé dans Azure, vous pouvez suivre le guide pas à pas pour obtenir une chaîne de connexion Event Hubs.

Les clients producteurs et consommateurs Event Hub asynchrones et synchrones peuvent être créés à l’aide de EventHubClientBuilder. L’appel crée build*Client() un producteur ou un consommateur synchrone tandis que build*AsyncClient() crée son équivalent asynchrone.

L’extrait de code ci-dessous crée un producteur Event Hub synchrone.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Créer un client Event Hub à l’aide de Plateforme d'identités Microsoft (anciennement Azure Active Directory)

Le Kit de développement logiciel (SDK) Azure pour Java prend en charge un package Azure Identity, ce qui facilite l’obtention des informations d’identification à partir de Plateforme d'identités Microsoft. Tout d’abord, ajoutez le package :

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>

Toutes les méthodes implémentées pour demander des informations d’identification se trouvent sous le com.azure.identity.credential package. L’exemple ci-dessous montre comment utiliser une clé secrète client d’application Azure Active Directory (AAD) pour autoriser avec Azure Event Hubs.

Autorisation avec DefaultAzureCredential

L’autorisation est la plus simple à l’aide de DefaultAzureCredential. Il trouve les meilleures informations d’identification à utiliser dans son environnement en cours d’exécution. Pour plus d’informations sur l’utilisation de l’autorisation Azure Active Directory avec Event Hubs, consultez la documentation associée.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

Concepts clés

  • Un producteur Event Hub est une source de données de télémétrie, d’informations diagnostics, de journaux d’utilisation ou d’autres données de journal, dans le cadre d’une solution d’appareil incorporée, d’une application d’appareil mobile, d’un titre de jeu s’exécutant sur une console ou un autre appareil, d’une solution d’entreprise basée sur un client ou un serveur, ou d’un site web.

  • Un consommateur Event Hub récupère ces informations à partir d’Event Hub et les traite. Le traitement peut impliquer l’agrégation, le calcul complexe et le filtrage. ou encore distribution ou stockage des informations brutes ou transformées. Les consommateurs Event Hub correspondent souvent à des composants d’infrastructure de plateforme robustes à grande échelle intégrant des fonctionnalités d’analytique, par exemple Azure Stream Analytics, Apache Spark ou Apache Storm.

  • Une partition constitue une séquence ordonnée d’événements conservée dans un hub d’événements. Azure Event Hubs assure la diffusion de messages suivant un modèle de consommateur partitionné dans lequel chaque consommateur ne lit qu’un sous-ensemble spécifique, ou partition, du flux de message. Les événements les plus récents sont ajoutés à la fin de cette séquence. Le nombre de partitions est spécifié lors de la création du hub d’événements. Il n’est pas modifiable.

  • Un groupe de consommateurs constitue une vue de tout un hub d’événements. Les groupes de consommateurs permettent à plusieurs applications consommatrices de disposer chacune d’une vue distincte du flux d’événements, et de lire le flux séparément, à son propre rythme et à partir de sa propre position. Il peut y avoir au maximum cinq lecteurs simultanés sur une partition par groupe de consommateurs. Toutefois, il est recommandé de se limiter à un seul consommateur actif pour une association donnée entre une partition et un groupe de consommateurs. Chaque lecteur actif reçoit les événements de sa partition ; s’il y a plusieurs lecteurs sur la même partition, ils recevront des événements en double.

Pour plus de concepts et une discussion plus approfondie, consultez Fonctionnalités Event Hubs. En outre, les concepts d’AMQP sont bien documentés dans OASIS Advanced Messaging Queuing Protocol (AMQP) version 1.0.

Exemples

Publication d’événements sur un hub d’événements

Pour publier des événements, vous devez créer un élément asynchrone EventHubProducerAsyncClient ou synchrone EventHubProducerClient. Chaque producteur peut envoyer des événements à une partition spécifique ou autoriser le service Event Hubs à décider sur quels événements de partition doivent être publiés. Il est recommandé d’utiliser le routage automatique lorsque la publication d’événements doit être hautement disponible ou lorsque les données d’événement doivent être distribuées uniformément entre les partitions.

Créer un producteur Event Hub et publier des événements

Les développeurs peuvent créer un producteur en utilisant EventHubClientBuilder et en appelant buildProducer*Client(). CreateBatchOptions.setPartitionId(String) La spécification envoie des événements à une partition spécifique. Si partitionId n’est pas spécifié, les événements sont automatiquement routés vers une partition. CreateBatchOptions.setPartitionKey(String) La spécification indique au service Event Hubs de hacher les événements et de les envoyer à la même partition.

L’extrait de code ci-dessous crée un producteur synchrone et envoie des événements à n’importe quelle partition, ce qui permet au service Event Hubs d’acheminer l’événement vers une partition disponible.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));
EventDataBatch eventDataBatch = producer.createBatch();

for (EventData eventData : allEvents) {
    if (!eventDataBatch.tryAdd(eventData)) {
        producer.send(eventDataBatch);
        eventDataBatch = producer.createBatch();

        // Try to add that event that couldn't fit before.
        if (!eventDataBatch.tryAdd(eventData)) {
            throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                + eventDataBatch.getMaxSizeInBytes());
        }
    }
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
    producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();

Notez que EventDataBatch.tryAdd(EventData) n’est pas thread-safe. Veillez à synchroniser l’accès à la méthode lors de l’utilisation de plusieurs threads pour ajouter des événements.

Publier des événements à l’aide de l’identificateur de partition

De nombreuses opérations liées aux hubs d’événements ont lieu dans l’étendue d’une partition spécifique. N’importe quel client peut appeler getPartitionIds() ou getEventHubProperties() pour obtenir les ID de partition et les métadonnées dans son instance Event Hub.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(batch);

Publier des événements à l’aide d’une clé de partition

Lorsqu’un ensemble d’événements n’est associé à aucune partition spécifique, il peut être souhaitable de demander que le service Event Hubs conserve différents événements ou lots d’événements sur la même partition. Pour ce faire, définissez un partition key lors de la publication des événements. Dans le scénario ci-dessous, tous les événements étant liés aux villes, ils sont envoyés avec la clé de partition définie sur « cities ».

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .buildProducerClient();

List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
    new EventData("New York"));

SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);

Consommer des événements à partir d’une partition Event Hub

Pour consommer des événements, créez un EventHubConsumerAsyncClient ou EventHubConsumerClient pour un groupe de consommateurs spécifique. En outre, un consommateur doit spécifier où dans le flux d’événements pour commencer à recevoir les événements.

Consommer des événements avec EventHubConsumerAsyncClient

Dans l’extrait de code ci-dessous, nous créons un consommateur asynchrone qui reçoit des événements de partitionId et n’écoute que les événements les plus récents qui sont envoyés à la partition. Les développeurs peuvent commencer à recevoir des événements de plusieurs partitions en utilisant les mêmes EventHubConsumerAsyncClient en appelant receiveFromPartition(String, EventPosition) avec un autre ID de partition.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        new DefaultAzureCredentialBuilder().build())
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Obtain partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation.  If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
    .subscribe(partitionEvent -> {
        PartitionContext partitionContext = partitionEvent.getPartitionContext();
        EventData event = partitionEvent.getData();

        System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
        System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
    }, error -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.err.print("An error occurred:" + error);
    }, () -> {
        // This is a terminal signal.  No more events will be received from the same Flux object.
        System.out.print("Stream has ended.");
    });

Consommer des événements avec EventHubConsumerClient

Les développeurs peuvent créer un consommateur synchrone qui retourne des événements dans des lots à l’aide d’un EventHubConsumerClient. Dans l’extrait de code ci-dessous, un consommateur est créé qui commence à lire les événements à partir du début du flux d’événements de la partition.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
    startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
    // For each event, perform some sort of processing.
    System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
    lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
    EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

    // Gets the next set of events from partition '0' to consume and process.
    IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
        nextPosition, Duration.ofSeconds(30));
}

Consommer des événements à l’aide d’un EventProcessorClient

Pour utiliser des événements pour toutes les partitions d’un Event Hub, vous pouvez créer un EventProcessorClient pour un groupe de consommateurs spécifique.

délègue EventProcessorClient le traitement des événements à une fonction de rappel que vous fournissez, ce qui vous permet de vous concentrer sur la logique nécessaire pour fournir de la valeur tandis que le processeur est responsable de la gestion des opérations de consommation sous-jacentes.

Dans notre exemple, nous allons nous concentrer sur la création de EventProcessorClient, utiliser le SampleCheckpointStore disponible dans les exemples et une fonction de rappel qui traite les événements reçus à partir d’Event Hub et écrit dans la console. Pour les applications de production, il est recommandé d’utiliser un magasin durable comme Checkpoint Store avec des objets blob de stockage Azure.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
        credential)
    .checkpointStore(new SampleCheckpointStore())
    .processEvent(eventContext -> {
        System.out.printf("Partition id = %s and sequence number of event = %s%n",
            eventContext.getPartitionContext().getPartitionId(),
            eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    })
    .buildEventProcessorClient();

Dépannage

Voir TROUBLESHOOTING.md.

Étapes suivantes

Au-delà de ceux abordés, la bibliothèque cliente Azure Event Hubs offre une prise en charge pour de nombreux autres scénarios afin de tirer parti de l’ensemble de fonctionnalités complet du service Azure Event Hubs. Pour explorer certains de ces scénarios, case activée les exemples README.

Contribution

Si vous souhaitez devenir un contributeur actif à ce projet, consultez nos Lignes directrices sur les contributions pour plus d’informations.

Impressions