Odbieranie zdarzeń przy użyciu dostarczania ściągnięcia za pomocą języka Java

Ten artykuł zawiera szybki przewodnik krok po kroku dotyczący odbierania rozwiązania CloudEvents przy użyciu dostarczania ściągania usługi Event Grid. Udostępnia przykładowy kod do odbierania, potwierdzania (usuwania zdarzeń z usługi Event Grid).

Wymagania wstępne

Przed kontynuowaniem należy spełnić wymagania wstępne:

  • Dowiedz się, czym jest dostarczanie ściągnięcia. Aby uzyskać więcej informacji, zobacz pojęcia dotyczące dostarczania ściągnięcia i omówienie dostarczania ściągnięcia.

  • Przestrzeń nazw, temat i subskrypcja zdarzeń.

  • Najnowszy pakiet zestawu SDK w wersji beta . Jeśli używasz narzędzia Maven, możesz zapoznać się z centralnym repozytorium maven.

    Ważne

    Obsługa zestawu SDK płaszczyzny danych ściągania jest dostępna w pakietach beta. W projekcie należy użyć najnowszego pakietu beta.

  • Środowisko IDE, które obsługuje środowisko Java, takie jak IntelliJ IDEA, Eclipse IDE lub Visual Studio Code.

  • Środowisko Java JRE z uruchomionym językiem Java 8.

  • Zdarzenia powinny być dostępne w temacie. Zobacz tematy dotyczące publikowania zdarzeń w przestrzeni nazw.

Przykładowy kod

Przykładowy kod używany w tym artykule znajduje się w tej lokalizacji:

    https://github.com/jfggdl/event-grid-pull-delivery-quickstart

Odbieranie zdarzeń przy użyciu dostarczania ściągania

Zdarzenia z usługi Event Grid można odczytywać, określając temat przestrzeni nazw i subskrypcję zdarzeń kolejki z operacją odbierania. Subskrypcja zdarzeń to zasób, który skutecznie definiuje kolekcję cloudEvents, którą klient odbiorcy może odczytać. Ten przykładowy kod używa uwierzytelniania opartego na kluczach, ponieważ zapewnia szybkie i proste podejście do uwierzytelniania. W przypadku scenariuszy produkcyjnych należy użyć uwierzytelniania identyfikatora wpisu firmy Microsoft, ponieważ zapewnia znacznie bardziej niezawodny mechanizm uwierzytelniania.

package com.azure.messaging.eventgrid.samples;

import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.models.CloudEvent;
import com.azure.messaging.eventgrid.EventGridClient;
import com.azure.messaging.eventgrid.EventGridClientBuilder;
import com.azure.messaging.eventgrid.EventGridMessagingServiceVersion;
import com.azure.messaging.eventgrid.models.*;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * <p>Simple demo consumer app of CloudEvents from queue event subscriptions created for namespace topics.
 * This code samples should use Java 1.8 level or above to avoid compilation errors.
 * You should consult the resources below to use the client SDK and set up your project using maven.
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventgrid/azure-messaging-eventgrid">Event Grid data plane client SDK documentation</a>
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md">Azure BOM for client libraries</a>
 * @see <a href="https://aka.ms/spring/versions">Spring Version Mapping</a> if you are using Spring.
 * @see <a href="https://aka.ms/azsdk">Tool with links to control plane and data plane SDKs across all languages supported</a>.
 *</p>
 */
public class NamespaceTopicConsumer {
    private static final String TOPIC_NAME = "<yourNamespaceTopicName>";
    public static final String EVENT_SUBSCRIPTION_NAME = "<yourEventSusbcriptionName>";
    public static final String ENDPOINT =  "<yourFullHttpsUrlToTheNamespaceEndpoint>";
    public static final int MAX_NUMBER_OF_EVENTS_TO_RECEIVE = 10;
    public static final Duration MAX_WAIT_TIME_FOR_EVENTS = Duration.ofSeconds(10);

    private static EventGridClient eventGridClient;
    private static List<String> receivedCloudEventLockTokens = new ArrayList<>();
    private static List<CloudEvent> receivedCloudEvents = new ArrayList<>();

    //TODO  Do NOT include keys in source code. This code's objective is to give you a succinct sample about using Event Grid, not to provide an authoritative example for handling secrets in applications.
    /**
     * For security concerns, you should not have keys or any other secret in any part of the application code.
     * You should use services like Azure Key Vault for managing your keys.
     */
    public static final AzureKeyCredential CREDENTIAL = new AzureKeyCredential("<namespace key>");
    public static void main(String[] args) {
        //TODO Update Event Grid version number to your desired version. You can find more information on data plane APIs here:
        //https://learn.microsoft.com/en-us/rest/api/eventgrid/.
        eventGridClient = new EventGridClientBuilder()
                .httpClient(HttpClient.createDefault())  // Requires Java 1.8 level
                .endpoint(ENDPOINT)
                .serviceVersion(EventGridMessagingServiceVersion.V2023_06_01_PREVIEW)
                .credential(CREDENTIAL).buildClient();   // you may want to use .buildAsyncClient() for an asynchronous (project reactor) client.

        System.out.println("Waiting " +  MAX_WAIT_TIME_FOR_EVENTS.toSecondsPart() + " seconds for events to be read...");
        List<ReceiveDetails> receiveDetails = eventGridClient.receiveCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME,
                MAX_NUMBER_OF_EVENTS_TO_RECEIVE, MAX_WAIT_TIME_FOR_EVENTS).getValue();

        for (ReceiveDetails detail : receiveDetails) {
            // Add order message received to a tracking list
            CloudEvent orderCloudEvent = detail.getEvent();
            receivedCloudEvents.add(orderCloudEvent);
            // Add lock token to a tracking list. Lock token functions like an identifier to a cloudEvent
            BrokerProperties metadataForCloudEventReceived = detail.getBrokerProperties();
            String lockToken = metadataForCloudEventReceived.getLockToken();
            receivedCloudEventLockTokens.add(lockToken);
        }
        System.out.println("<-- Number of events received: " + receivedCloudEvents.size());

Potwierdzanie zdarzeń

Aby potwierdzić zdarzenia, użyj tego samego kodu używanego do odbierania zdarzeń i dodaj następujące wiersze, aby wywołać metodę prywatną potwierdzenia:

        // Acknowledge (i.e. delete from Event Grid the) events
        acknowledge(receivedCloudEventLockTokens);

Przykładowa implementacja metody potwierdzania wraz z metodą narzędzia do drukowania informacji o nieudanych tokenach blokady jest następująca:

    private static void acknowledge(List<String> lockTokens) {
        AcknowledgeResult acknowledgeResult = eventGridClient.acknowledgeCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new AcknowledgeOptions(lockTokens));
        List<String> succeededLockTokens = acknowledgeResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("@@@ " + succeededLockTokens.size() + " events were successfully acknowledged:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Acknowledged event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not acknowledged (deleted from Event Grid)");
            writeFailedLockTokens(acknowledgeResult.getFailedLockTokens());
        }
    }

    private static void writeFailedLockTokens(List<FailedLockToken> failedLockTokens) {
        for (FailedLockToken failedLockToken : failedLockTokens) {
            System.out.println("    Failed lock token: " + failedLockToken.getLockToken());
            System.out.println("    Error code: " + failedLockToken.getErrorCode());
            System.out.println("    Error description: " + failedLockToken.getErrorDescription());
        }
    }

Zdarzenia wydania

Zdarzenia wydania, aby udostępnić je do ponownego dostarczenia. Podobnie jak w przypadku potwierdzania zdarzeń, możesz dodać następującą metodę statyczną i wiersz, aby wywołać go w celu zwolnienia zdarzeń zidentyfikowanych przez tokeny blokady przekazane jako argument. Aby skompilować tę metodę, potrzebna jest writeFailedLockTokens metoda .

   private static void release(List<String> lockTokens) {
        ReleaseResult releaseResult = eventGridClient.releaseCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new ReleaseOptions(lockTokens));
        List<String> succeededLockTokens = releaseResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("^^^ " + succeededLockTokens.size() + " events were successfully released:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Released event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not released back to Event Grid.");
            writeFailedLockTokens(releaseResult.getFailedLockTokens());
        }
    }

Odrzucanie zdarzeń

Odrzuć zdarzenia, których aplikacja konsumenta nie może przetworzyć. Warunki, dla których odrzucasz zdarzenie, obejmują źle sformułowane zdarzenie, którego nie można przeanalizować ani problemy z aplikacją przetwarzającą zdarzenia.

    private static void reject(List<String> lockTokens) {
        RejectResult rejectResult = eventGridClient.rejectCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new RejectOptions(lockTokens));
        List<String> succeededLockTokens = rejectResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("--- " + succeededLockTokens.size() + " events were successfully rejected:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Rejected event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not rejected.");
            writeFailedLockTokens(rejectResult.getFailedLockTokens());
        }
    }

Następne kroki

  • Zobacz Dokumentacja interfejsu API języka Java.
  • Aby dowiedzieć się więcej na temat modelu dostarczania ściągnięcia, zobacz Omówienie dostarczania ściągnięcia.