Ricevere eventi da Hub eventi di Azure usando JavaReceive events from Azure Event Hubs using Java

IntroduzioneIntroduction

Hub eventi è un sistema di inserimento a scalabilità elevata, in grado di inserire milioni di eventi al secondo, che permette a un'applicazione di elaborare e analizzare le elevate quantità di dati prodotti dalle applicazioni e dai dispositivi connessi.Event Hubs is a highly scalable ingestion system that can ingest millions of events per second, enabling an application to process and analyze the massive amounts of data produced by your connected devices and applications. Dopo la raccolta nell'hub eventi, i dati possono essere trasformati e archiviati tramite qualsiasi provider di analisi in tempo reale o qualsiasi cluster di archiviazione.Once collected into Event Hubs, you can transform and store data using any real-time analytics provider or storage cluster.

Per altre informazioni, vedere Panoramica di Hub eventi.For more information, see the Event Hubs overview.

Questa esercitazione illustra come ricevere eventi in un Hub eventi usando un'applicazione console scritta in Java.This tutorial shows how to receive events into an event hub using a console application written in Java.

PrerequisitiPrerequisites

Per completare questa esercitazione è necessario soddisfare i prerequisiti seguenti:In order to complete this tutorial, you need the following prerequisites:

  • Ambiente di sviluppo in Java.A Java development environment. Per questa esercitazione si presuppone l'uso di Eclipse.For this tutorial, we assume Eclipse.
  • Un account Azure attivo.An active Azure account.
    Se non si ha un account, è possibile creare un account gratuito in pochi minuti.If you don't have an account, you can create a free account in just a couple of minutes. Per informazioni dettagliate, vedere la pagina relativa alla versione di valutazione gratuita di Azure.For details, see Azure Free Trial.

Ricevere messaggi con EventProcessorHost in JavaReceive messages with EventProcessorHost in Java

EventProcessorHost è una classe Java che semplifica la ricezione di eventi da Hub eventi tramite la gestione di checkpoint persistenti e ricezioni parallele da tali hub.EventProcessorHost is a Java class that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives from those Event Hubs. Usando EventProcessorHost è possibile suddividere gli eventi tra più ricevitori, anche se ospitati in nodi diversi.Using EventProcessorHost, you can split events across multiple receivers, even when hosted in different nodes. Questo esempio illustra come usare EventProcessorHost per un ricevitore singolo.This example shows how to use EventProcessorHost for a single receiver.

Creare un account di archiviazioneCreate a storage account

Per usare EventProcessorHost, è necessario un account di archiviazione di Azure:To use EventProcessorHost, you must have an Azure Storage account:

  1. Accedere al portale di Azure e fare clic su + Nuovo nella parte sinistra della schermata.Log on to the Azure portal, and click + New on the left-hand side of the screen.
  2. Fare clic su Archiviazione e quindi su Account di archiviazione.Click Storage, then click Storage account. Nel pannello Crea account di archiviazione digitare un nome per l'account di archiviazione.In the Create storage account blade, type a name for the storage account. Completare il resto dei campi, selezionare l'area geografica desiderata e quindi fare clic su Crea.Complete the rest of the fields, select your desired region, and then click Create.

  3. Fare clic sull'account di archiviazione appena creato e quindi su Gestisci chiavi di accesso:Click the newly created storage account, and then click Manage Access Keys:

    Copiare la chiave di accesso primaria in una posizione temporanea per usarla più avanti in questa esercitazione.Copy the primary access key to a temporary location, to use later in this tutorial.

Creare un progetto Java usando EventProcessorHostCreate a Java project using the EventProcessor Host

La libreria client Java per Hub eventi è disponibile per l'uso nei progetti Maven dal Repository centrale di Mavened è possibile farvi riferimento usando la seguente dichiarazione di dipendenza all'interno del file di progetto Maven:The Java client library for Event Hubs is available for use in Maven projects from the Maven Central Repository, and can be referenced using the following dependency declaration inside your Maven project file:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs</artifactId>
    <version>{VERSION}</version>
</dependency>
<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-eventhubs-eph</artifactId>
    <version>{VERSION}</version>
</dependency>
<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-eventhubs-eph</artifactId>
  <version>0.14.0</version>
</dependency>

Per diversi tipi di ambienti di compilazione, è possibile ottenere in modo esplicito i file JAR rilasciati più recenti dal repository centrale Maven o dal punto di distribuzione rilascio in GitHub.For different types of build environments, you can explicitly obtain the latest released JAR files from the Maven Central Repository or from the release distribution point on GitHub.

  1. Per l'esempio seguente, creare prima un nuovo progetto Maven per un'applicazione console/shell nell'ambiente di sviluppo Java preferito.For the following sample, first create a new Maven project for a console/shell application in your favorite Java development environment. La classe è denominata ErrorNotificationHandler.The class is called ErrorNotificationHandler.

    import java.util.function.Consumer;
    import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
    
    public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }
    
  2. Usare il codice seguente per creare una nuova classe denominata EventProcessor.Use the following code to create a new class called EventProcessor.

    import com.microsoft.azure.eventhubs.EventData;
    import com.microsoft.azure.eventprocessorhost.CloseReason;
    import com.microsoft.azure.eventprocessorhost.IEventProcessor;
    import com.microsoft.azure.eventprocessorhost.PartitionContext;
    
    public class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;
    
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }
    
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }
    
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }
    
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> messages) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got message batch");
            int messageCount = 0;
            for (EventData data : messages)
            {
                System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                        data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBody(), "UTF8"));
                messageCount++;
    
                this.checkpointBatchingCount++;
                if ((checkpointBatchingCount % 5) == 0)
                {
                    System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                        data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                    context.checkpoint(data);
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + messageCount + " for host " + context.getOwner());
        }
    }
    
  3. Creare un'altra classe denominata EventProcessorSample usando il codice seguente.Create one more class called EventProcessorSample, using the following code.

    import com.microsoft.azure.eventprocessorhost.*;
    import com.microsoft.azure.servicebus.ConnectionStringBuilder;
    import com.microsoft.azure.eventhubs.EventData;
    
    public class EventProcessorSample
    {
        public static void main(String args[])
        {
            final String consumerGroupName = "$Default";
            final String namespaceName = "----ServiceBusNamespaceName-----";
            final String eventHubName = "----EventHubName-----";
            final String sasKeyName = "-----SharedAccessSignatureKeyName-----";
            final String sasKey = "---SharedAccessSignatureKey----";
    
            final String storageAccountName = "---StorageAccountName----";
            final String storageAccountKey = "---StorageAccountKey----";
            final String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=" + storageAccountName + ";AccountKey=" + storageAccountKey;
    
            ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);
    
            EventProcessorHost host = new EventProcessorHost(eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString);
    
            System.out.println("Registering host named " + host.getHostName());
            EventProcessorOptions options = new EventProcessorOptions();
            options.setExceptionNotification(new ErrorNotificationHandler());
            try
            {
                host.registerEventProcessor(EventProcessor.class, options).get();
            }
            catch (Exception e)
            {
                System.out.print("Failure while registering: ");
                if (e instanceof ExecutionException)
                {
                    Throwable inner = e.getCause();
                    System.out.println(inner.toString());
                }
                else
                {
                    System.out.println(e.toString());
                }
            }
    
            System.out.println("Press enter to stop");
            try
            {
                System.in.read();
                host.unregisterEventProcessor();
    
                System.out.println("Calling forceExecutorShutdown");
                EventProcessorHost.forceExecutorShutdown(120);
            }
            catch(Exception e)
            {
                System.out.println(e.toString());
                e.printStackTrace();
            }
    
            System.out.println("End of sample");
        }
    }
    
  4. Sostituire i campi seguenti con i valori usati durante la creazione dell'Hub eventi e dell'account di archiviazione.Replace the following fields with the values used when you created the event hub and storage account.

    final String namespaceName = "----ServiceBusNamespaceName-----";
    final String eventHubName = "----EventHubName-----";
    
    final String sasKeyName = "-----SharedAccessSignatureKeyName-----";
    final String sasKey = "---SharedAccessSignatureKey----";
    
    final String storageAccountName = "---StorageAccountName----"
    final String storageAccountKey = "---StorageAccountKey----";
    

Nota

Questa esercitazione usa una singola istanza di EventProcessorHost.This tutorial uses a single instance of EventProcessorHost. Per aumentare la velocità effettiva è consigliabile eseguire più istanze di EventProcessorHost, preferibilmente in computer separati.To increase throughput, it is recommended that you run multiple instances of EventProcessorHost, preferably on separate machines. In questo modo si ottiene anche la ridondanza.This provides redundancy as well. In questi casi, le varie istanze si coordinano automaticamente tra loro per ottenere il bilanciamento del carico relativo agli eventi ricevuti.In those cases, the various instances automatically coordinate with each other in order to load balance the received events. Se si vuole che ognuno dei vari ricevitori elabori tutti gli eventi, è necessario usare il concetto ConsumerGroup .If you want multiple receivers to each process all the events, you must use the ConsumerGroup concept. Quando si ricevono eventi da più macchine, potrebbe risultare utile specificare nomi per le istanze di EventProcessorHost in base alle macchine (o ai ruoli) in cui sono distribuite.When receiving events from different machines, it might be useful to specify names for EventProcessorHost instances based on the machines (or roles) in which they are deployed.

Passaggi successiviNext steps

Per ulteriori informazioni su Hub eventi visitare i collegamenti seguenti:You can learn more about Event Hubs by visiting the following links: