Použití Javy k odesílání nebo příjmu událostí z Azure Event Hubs

V tomto rychlém startu se dozvíte, jak odesílat události do centra událostí a přijímat je z centra událostí pomocí balíčku Java azure-messaging-eventhubs .

Tip

Pokud pracujete s prostředky Azure Event Hubs v aplikaci Spring, doporučujeme jako alternativu zvážit Spring Cloud Azure . Spring Cloud Azure je opensourcový projekt, který poskytuje bezproblémovou integraci Spring se službami Azure. Další informace o Spring Cloud Azure a ukázku použití služby Event Hubs najdete v tématu Spring Cloud Stream se službou Azure Event Hubs.

Požadavky

Pokud s Azure Event Hubs teprve začínáte, podívejte se na přehled služby Event Hubs před tímto rychlým startem.

K dokončení tohoto rychlého startu potřebujete následující požadavky:

  • Předplatné Microsoft Azure. Pokud chcete používat služby Azure, včetně služby Azure Event Hubs, potřebujete předplatné. Pokud nemáte existující účet Azure, můžete si zaregistrovat bezplatnou zkušební verzi nebo využít výhody předplatitele MSDN při vytváření účtu.
  • Vývojové prostředí Java. V tomto rychlém startu se používá Eclipse. Vyžaduje se sada Java Development Kit (JDK) s verzí 8 nebo vyšší.
  • Vytvořte obor názvů služby Event Hubs a centrum událostí. Prvním krokem je použití webu Azure Portal k vytvoření oboru názvů typu Event Hubs a získání přihlašovacích údajů pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku. Potom podle pokynů v článku získejte připojovací řetězec pro obor názvů služby Event Hubs: Získejte připojovací řetězec. Později v tomto rychlém startu použijete připojovací řetězec.

Odesílání událostí

V této části se dozvíte, jak vytvořit aplikaci Java pro odesílání událostí do centra událostí.

Přidání odkazu na knihovnu Azure Event Hubs

Nejprve vytvořte nový projekt Maven pro konzolovou/shellovou aplikaci ve svém oblíbeném vývojovém prostředí Java. Aktualizujte pom.xml soubor následujícím způsobem. Klientská knihovna Java pro službu Event Hubs je k dispozici v centrálním úložišti Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Poznámka:

Aktualizujte verzi na nejnovější verzi publikovanou v úložišti Maven.

Ověření aplikace v Azure

V tomto rychlém startu se dozvíte dva způsoby připojení ke službě Azure Event Hubs: bez hesla a připojovací řetězec. První možnost ukazuje, jak se pomocí objektu zabezpečení v Microsoft Entra ID a řízení přístupu na základě role (RBAC) připojit k oboru názvů služby Event Hubs. Nemusíte se starat o pevně zakódované připojovací řetězec v kódu nebo v konfiguračním souboru nebo v zabezpečeném úložišti, jako je Azure Key Vault. Druhá možnost ukazuje, jak se pomocí připojovací řetězec připojit k oboru názvů služby Event Hubs. Pokud s Azure začínáte, můžete najít připojovací řetězec možnost, která se snadněji sleduje. Doporučujeme používat možnost bez hesla v reálných aplikacích a produkčních prostředích. Další informace najdete v tématu Ověřování a autorizace. Další informace o ověřování bez hesla najdete na stránce přehledu.

Přiřazení rolí uživateli Microsoft Entra

Při místním vývoji se ujistěte, že uživatelský účet, který se připojuje ke službě Azure Event Hubs, má správná oprávnění. K odesílání a přijímání zpráv budete potřebovat roli vlastníka dat služby Azure Event Hubs. K přiřazení této role budete potřebovat roli Uživatelský přístup Správa istrator nebo jinou roli, která tuto akci zahrnujeMicrosoft.Authorization/roleAssignments/write. Role Azure RBAC můžete uživateli přiřadit pomocí webu Azure Portal, Azure CLI nebo Azure PowerShellu. Další informace o dostupných oborech pro přiřazení rolí najdete na stránce přehledu oboru.

Následující příklad přiřadí roli k vašemu uživatelskému Azure Event Hubs Data Owner účtu, který poskytuje úplný přístup k prostředkům služby Azure Event Hubs. V reálném scénáři postupujte podle principu nejnižšího oprávnění , aby uživatelům poskytla pouze minimální oprávnění potřebná pro bezpečnější produkční prostředí.

Předdefinované role Azure pro Azure Event Hubs

Pro Službu Azure Event Hubs je správa oborů názvů a všech souvisejících prostředků prostřednictvím webu Azure Portal a rozhraní API pro správu prostředků Azure již chráněno pomocí modelu Azure RBAC. Azure poskytuje následující předdefinované role Azure pro autorizaci přístupu k oboru názvů služby Event Hubs:

  • Vlastník dat služby Azure Event Hubs: Umožňuje přístup k datům k oboru názvů služby Event Hubs a jeho entitám (fronty, témata, odběry a filtry).
  • Odesílatel dat služby Azure Event Hubs: Pomocí této role dáte odesílateli přístup k oboru názvů služby Event Hubs a jeho entitám.
  • Příjemce dat služby Azure Event Hubs: Pomocí této role dejte příjemci přístup k oboru názvů služby Event Hubs a jeho entitám.

Pokud chcete vytvořit vlastní roli, přečtěte si téma Práva potřebná pro operace služby Event Hubs.

Důležité

Ve většině případů bude trvat minutu nebo dvě, než se přiřazení role rozšíří v Azure. Ve výjimečných případech může trvat až osm minut. Pokud při prvním spuštění kódu dojde k chybám ověřování, chvíli počkejte a zkuste to znovu.

  1. Na webu Azure Portal vyhledejte obor názvů služby Event Hubs pomocí hlavního panelu hledání nebo levé navigace.

  2. Na stránce přehledu vyberte v nabídce vlevo řízení přístupu (IAM ).

  3. Na stránce Řízení přístupu (IAM) vyberte kartu Přiřazení rolí.

  4. V horní nabídce vyberte + Přidat a potom přidejte přiřazení role z výsledné rozevírací nabídky.

    A screenshot showing how to assign a role.

  5. Pomocí vyhledávacího pole vyfiltrujte výsledky podle požadované role. V tomto příkladu vyhledejte Azure Event Hubs Data Owner a vyberte odpovídající výsledek. Pak zvolte Další.

  6. V části Přiřadit přístup vyberte Uživatel, skupina nebo instanční objekt a pak zvolte + Vybrat členy.

  7. V dialogovém okně vyhledejte své uživatelské jméno Microsoft Entra (obvykle vaše user@domain e-mailová adresa) a pak v dolní části dialogového okna zvolte Vybrat .

  8. Vyberte Zkontrolovat a přiřadit přejděte na poslední stránku a pak proces dokončete opětovnou kontrolou a přiřazením .

Napsání kódu pro odesílání zpráv do centra událostí

Přidejte třídu s názvem Sendera přidejte do třídy následující kód:

Důležité

  • Aktualizujte <NAMESPACE NAME> ho názvem oboru názvů služby Event Hubs.
  • Aktualizujte <EVENT HUB NAME> ho názvem centra událostí.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Sestavte program a ujistěte se, že nedošlo k žádným chybám. Tento program spustíte po spuštění programu příjemce.

Příjem událostí

Kód v tomto kurzu je založený na ukázce EventProcessorClient na GitHubu, kterou můžete prozkoumat, abyste viděli úplnou funkční aplikaci.

Při používání služby Azure Blob Storage jako úložiště kontrolních bodů postupujte podle těchto doporučení:

  • Pro každou skupinu příjemců použijte samostatný kontejner. Můžete použít stejný účet úložiště, ale pro každou skupinu použít jeden kontejner.
  • Nepoužívejte kontejner pro nic jiného a nepoužívejte účet úložiště pro nic jiného.
  • Účet úložiště by měl být ve stejné oblasti jako nasazená aplikace. Pokud je aplikace místní, zkuste zvolit nejbližší možnou oblast.

Na stránce účtu úložiště na webu Azure Portal v části Blob Service se ujistěte, že jsou zakázaná následující nastavení.

  • Hierarchický obor názvů
  • Obnovitelné odstranění objektu blob
  • Vytváření verzí

Vytvoření úložiště Azure a kontejneru objektů blob

V tomto rychlém startu použijete Azure Storage (konkrétně Blob Storage) jako úložiště kontrolních bodů. Vytváření kontrolních bodů je proces, pomocí kterého procesor událostí označí nebo potvrdí pozici poslední úspěšně zpracovávané události v rámci oddílu. Označení kontrolního bodu se obvykle provádí v rámci funkce, která zpracovává události. Další informace o vytváření kontrolních bodů najdete v tématu Procesor událostí.

Podle těchto kroků vytvořte účet služby Azure Storage.

  1. Vytvoření účtu služby Azure Storage
  2. Vytvoření kontejneru objektů blob
  3. Ověření v kontejneru objektů blob

Při místním vývoji se ujistěte, že uživatelský účet, který přistupuje k datům objektů blob, má správná oprávnění. K čtení a zápisu dat objektů blob budete potřebovat Přispěvatel dat objektů blob služby Storage. Abyste mohli tuto roli přiřadit sami sobě, musíte mít přiřazenou roli User Access Správa istrator nebo jinou roli, která zahrnuje akci Microsoft.Authorization/roleAssignments/write. Role Azure RBAC můžete uživateli přiřadit pomocí webu Azure Portal, Azure CLI nebo Azure PowerShellu. Další informace o dostupných oborech pro přiřazení rolí najdete na stránce přehledu oboru.

V tomto scénáři přiřadíte oprávnění k vašemu uživatelskému účtu s vymezeným oborem účtu úložiště, abyste postupovali podle zásady nejnižších oprávnění. Tento postup poskytuje uživatelům jenom minimální potřebná oprávnění a vytváří bezpečnější produkční prostředí.

Následující příklad přiřadí roli Přispěvatel dat v objektech blob služby Storage k vašemu uživatelskému účtu, který poskytuje přístup ke čtení i zápisu k datům objektů blob v účtu úložiště.

Důležité

Ve většině případů bude trvat minutu nebo dvě, než se přiřazení role rozšíří v Azure, ale ve výjimečných případech může trvat až osm minut. Pokud při prvním spuštění kódu dojde k chybám ověřování, chvíli počkejte a zkuste to znovu.

  1. Na webu Azure Portal vyhledejte svůj účet úložiště pomocí hlavního panelu hledání nebo levé navigace.

  2. Na stránce přehledu účtu úložiště v nabídce vlevo vyberte Řízení přístupu (IAM ).

  3. Na stránce Řízení přístupu (IAM) vyberte kartu Přiřazení rolí.

  4. V horní nabídce vyberte + Přidat a potom přidejte přiřazení role z výsledné rozevírací nabídky.

    A screenshot showing how to assign a storage account role.

  5. Pomocí vyhledávacího pole vyfiltrujte výsledky podle požadované role. V tomto příkladu vyhledejte Přispěvatel dat objektů blob služby Storage a vyberte odpovídající výsledek a pak zvolte Další.

  6. V části Přiřadit přístup vyberte Uživatel, skupina nebo instanční objekt a pak zvolte + Vybrat členy.

  7. V dialogovém okně vyhledejte své uživatelské jméno Microsoft Entra (obvykle vaše user@domain e-mailová adresa) a pak v dolní části dialogového okna zvolte Vybrat .

  8. Vyberte Zkontrolovat a přiřadit přejděte na poslední stránku a pak proces dokončete opětovnou kontrolou a přiřazením .

Přidání knihoven Event Hubs do projektu Java

Do souboru pom.xml přidejte následující závislosti.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Na začátek souboru Java přidejte následující import příkazy.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Vytvořte třídu s názvem Receivera přidejte do třídy následující řetězcové proměnné. Zástupné symboly nahraďte správnými hodnotami.

    Důležité

    Zástupné symboly nahraďte správnými hodnotami.

    • <NAMESPACE NAME> nahraďte názvem vašeho oboru názvů Event Hubs.
    • <EVENT HUB NAME> s názvem vašeho centra událostí v oboru názvů.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Do třídy přidejte následující main metodu.

    Důležité

    Zástupné symboly nahraďte správnými hodnotami.

    • <STORAGE ACCOUNT NAME> s názvem vašeho účtu Azure Storage.
    • <CONTAINER NAME> s názvem kontejneru objektů blob v účtu úložiště
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Přidejte dvě pomocné metody (PARTITION_PROCESSOR a ERROR_HANDLER) zpracovávají události a chyby do Receiver třídy.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Sestavte program a ujistěte se, že nedošlo k žádným chybám.

Spuštění aplikací

  1. Nejprve spusťte aplikaci Receiver.

  2. Potom spusťte aplikaci Sender .

  3. V okně aplikace Příjemce potvrďte, že se zobrazí události publikované aplikací Sender.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Stisknutím klávesy ENTER v okně aplikace příjemce aplikaci aplikaci zastavte.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Další kroky

Projděte si následující ukázky na GitHubu: