Ricevere eventi da Hub eventi di Azure usando JavaReceive events from Azure Event Hubs using Java

Hub eventi è un sistema di inserimento a scalabilità elevata, in grado di inserire milioni di eventi al secondo, che permette a un'applicazione di elaborare e analizzare le elevate quantità di dati prodotti dalle applicazioni e dai dispositivi connessi.Event Hubs is a highly scalable ingestion system that can ingest millions of events per second, enabling an application to process and analyze the massive amounts of data produced by your connected devices and applications. Dopo la raccolta nell'hub eventi, i dati possono essere trasformati e archiviati tramite qualsiasi provider di analisi in tempo reale o qualsiasi cluster di archiviazione.Once collected into Event Hubs, you can transform and store data using any real-time analytics provider or storage cluster.

Per altre informazioni, vedere Panoramica di Hub eventi.For more information, see the Event Hubs overview.

Questa esercitazione illustra come ricevere eventi in un Hub eventi usando un'applicazione console scritta in Java.This tutorial shows how to receive events into an event hub using a console application written in Java.

prerequisitiPrerequisites

Per completare questa esercitazione è necessario soddisfare i prerequisiti seguenti:In order to complete this tutorial, you need the following prerequisites:

  • Ambiente di sviluppo in Java.A Java development environment. Per questa esercitazione si presuppone l'uso di Eclipse.For this tutorial, we assume Eclipse.
  • Un account Azure attivo.An active Azure account. Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.If you do not have an Azure subscription, create a free account before you begin.

Il codice in questa esercitazione si basa sul codice EventProcessorSample in GitHub, che è possibile esaminare per visualizzare la versione completa dell'applicazione in funzione.The code in this tutorial is based on the EventProcessorSample code on GitHub, which you can examine to see the full working application.

Ricevere messaggi con EventProcessorHost in JavaReceive messages with EventProcessorHost in Java

EventProcessorHost è una classe Java che semplifica la ricezione di eventi da Hub eventi tramite la gestione di checkpoint persistenti e ricezioni parallele da tali hub.EventProcessorHost is a Java class that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives from those Event Hubs. Usando EventProcessorHost è possibile suddividere gli eventi tra più ricevitori, anche se ospitati in nodi diversi.Using EventProcessorHost, you can split events across multiple receivers, even when hosted in different nodes. Questo esempio illustra come usare EventProcessorHost per un ricevitore singolo.This example shows how to use EventProcessorHost for a single receiver.

Creare un account di archiviazioneCreate a storage account

Per usare EventProcessorHost, è necessario un account di archiviazione di Azure:To use EventProcessorHost, you must have an Azure Storage account:

  1. Accedere al portale di Azure e fare clic su + Crea una risorsa nella parte sinistra della schermata.Log on to the Azure portal, and click + Create a resource on the left-hand side of the screen.
  2. Fare clic su Archiviazione e quindi su Account di archiviazione.Click Storage, then click Storage account. Nella finestra Crea account di archiviazione digitare un nome per l'account di archiviazione.In the Create storage account window, type a name for the storage account. Completare il resto dei campi, selezionare l'area geografica desiderata e quindi fare clic su Crea.Complete the rest of the fields, select your desired region, and then click Create.

  3. Fare clic sull'account di archiviazione appena creato e quindi su Chiavi di accesso:Click the newly created storage account, and then click Access Keys:

    Copiare il valore di key1 in una posizione temporanea per usarlo più avanti in questa esercitazione.Copy the key1 value to a temporary location, to use later in this tutorial.

Creare un progetto Java usando EventProcessorHostCreate a Java project using the EventProcessor Host

La libreria client Java per Hub eventi è disponibile per l'uso nei progetti Maven dal repository centrale di Maven ed è possibile farvi riferimento usando la seguente dichiarazione di dipendenza all'interno del file di progetto Maven.The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository, and can be referenced using the following dependency declaration inside your Maven project file. La versione corrente è 1.0.0:The current version is 1.0.0:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-eph</artifactId>
    <version>1.0.0</version>
</dependency>

Per diversi tipi di ambienti di compilazione è possibile ottenere in modo esplicito i file JAR rilasciati più recenti dal repository centrale di Maven.For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository.

  1. Per l'esempio seguente, creare prima un nuovo progetto Maven per un'applicazione console/shell nell'ambiente di sviluppo Java preferito.For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. La classe è denominata ErrorNotificationHandler.The class is called ErrorNotificationHandler.

    import java.util.function.Consumer;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    
    public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }
    
  2. Usare il codice seguente per creare una nuova classe denominata EventProcessorSample.Use the following code to create a new class called EventProcessorSample. Sostituire i segnaposto con i valori usati durante la creazione dell'Hub eventi e dell'account di archiviazione:Replace the placeholders with the values used when you created the event hub and storage account:

    package com.microsoft.azure.eventhubs.samples.eventprocessorsample;
    
    import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
    import com.microsoft.azure.eventhubs.EventData;
    import com.microsoft.azure.eventprocessorhost.CloseReason;
    import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
    import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    import com.microsoft.azure.eventprocessorhost.IEventProcessor;
    import com.microsoft.azure.eventprocessorhost.PartitionContext;
    
    import java.util.concurrent.ExecutionException;
    import java.util.function.Consumer;
    
    public class EventProcessorSample
    {
        public static void main(String args[]) throws InterruptedException, ExecutionException
        {
            String consumerGroupName = "$Default";
            String namespaceName = "----NamespaceName----";
            String eventHubName = "----EventHubName----";
            String sasKeyName = "----SharedAccessSignatureKeyName----";
            String sasKey = "----SharedAccessSignatureKey----";
            String storageConnectionString = "----AzureStorageConnectionString----";
            String storageContainerName = "----StorageContainerName----";
            String hostNamePrefix = "----HostNamePrefix----";
    
            ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                 .setNamespaceName(namespaceName)
                 .setEventHubName(eventHubName)
                 .setSasKeyName(sasKeyName)
                 .setSasKey(sasKey);
    
            EventProcessorHost host = new EventProcessorHost(
                 EventProcessorHost.createHostName(hostNamePrefix),
                 eventHubName,
                 consumerGroupName,
                 eventHubConnectionString.toString(),
                 storageConnectionString,
                 storageContainerName);
    
            System.out.println("Registering host named " + host.getHostName());
            EventProcessorOptions options = new EventProcessorOptions();
            options.setExceptionNotification(new ErrorNotificationHandler());
    
            host.registerEventProcessor(EventProcessor.class, options)
            .whenComplete((unused, e) ->
            {
                if (e != null)
                {
                    System.out.println("Failure while registering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                }
            })
            .thenAccept((unused) ->
            {
                System.out.println("Press enter to stop.");
                try 
                {
                    System.in.read();
                }
                catch (Exception e)
                {
                    System.out.println("Keyboard read failed: " + e.toString());
                }
            })
            .thenCompose((unused) ->
            {
                return host.unregisterEventProcessor();
            })
            .exceptionally((e) ->
            {
                System.out.println("Failure while unregistering: " + e.toString());
                if (e.getCause() != null)
                {
                    System.out.println("Inner exception: " + e.getCause().toString());
                }
                return null;
            })
            .get(); // Wait for everything to finish before exiting main!
    
            System.out.println("End of sample");
        }
    
  3. Creare un'altra classe denominata EventProcessor usando il codice seguente:Create one more class called EventProcessor, using the following code:

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;
    
        // OnOpen is called when a new event processor instance is created by the host. In a real implementation, this
        // is the place to do initialization so that events can be processed when they arrive, such as opening a database
        // connection.
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }
    
        // OnClose is called when an event processor instance is being shut down. The reason argument indicates whether the shut down
        // is because another host has stolen the lease for this partition or due to error or host shutdown. In a real implementation,
        // this is the place to do cleanup for resources that were opened in onOpen.
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }
    
        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        // It is NOT called for exceptions thrown out of onOpen/onClose/onEvents. EventProcessorHost is responsible for recovering from
        // the error, if possible, or shutting the event processor down if not, in which case there will be a call to onClose. The
        // notification provided to onError is primarily informational.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }
    
        // onEvents is called when events are received on this partition of the Event Hub. The maximum number of events in a batch
        // can be controlled via EventProcessorOptions. Also, if the "invoke processor after receive timeout" option is set to true,
        // this method will be called with null when a receive timeout occurs.
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                // It is important to have a try-catch around the processing of each event. Throwing out of onEvents deprives
                // you of the chance to process any remaining events in the batch. 
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;
    
                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one. Checkpointing is usually not a fast operation, so there is a tradeoff
                    // between checkpointing frequently (to minimize the number of events that will be reprocessed after a crash, or
                    // if the partition lease is stolen) and checkpointing infrequently (to reduce the impact on event processing
                    // performance). Checkpointing every five events is an arbitrary choice for this sample.
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                            data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
    

Nota

Questa esercitazione usa una singola istanza di EventProcessorHost.This tutorial uses a single instance of EventProcessorHost. Per aumentare la velocità effettiva è consigliabile eseguire più istanze di EventProcessorHost, preferibilmente in computer separati.To increase throughput, it is recommended that you run multiple instances of EventProcessorHost, preferably on separate machines. In questo modo si ottiene anche la ridondanza.This provides redundancy as well. In questi casi, le varie istanze si coordinano automaticamente tra loro per ottenere il bilanciamento del carico relativo agli eventi ricevuti.In those cases, the various instances automatically coordinate with each other in order to load balance the received events. Se si vuole che ognuno dei vari ricevitori elabori tutti gli eventi, è necessario usare il concetto ConsumerGroup .If you want multiple receivers to each process all the events, you must use the ConsumerGroup concept. Quando si ricevono eventi da più macchine, potrebbe risultare utile specificare nomi per le istanze di EventProcessorHost in base alle macchine (o ai ruoli) in cui sono distribuite.When receiving events from different machines, it might be useful to specify names for EventProcessorHost instances based on the machines (or roles) in which they are deployed.

Passaggi successiviNext steps

Per ulteriori informazioni su Hub eventi visitare i collegamenti seguenti:You can learn more about Event Hubs by visiting the following links: