Een Java-toepassing maken die gebruikmaakt van Azure Cosmos DB voor NoSQL en de wijzigingenfeedprocessor

VAN TOEPASSING OP: NoSQL

Azure Cosmos DB is een volledig beheerde NoSQL-databaseservice van Microsoft. Hiermee kunt u eenvoudig wereldwijd gedistribueerde en zeer schaalbare toepassingen bouwen. Deze handleiding begeleidt u bij het maken van een Java-toepassing die gebruikmaakt van de Azure Cosmos DB for NoSQL-database en de wijzigingenfeedprocessor implementeert voor realtime gegevensverwerking. De Java-toepassing communiceert met De Azure Cosmos DB voor NoSQL met behulp van Azure Cosmos DB Java SDK v4.

Belangrijk

Deze zelfstudie geldt alleen voor Azure Cosmos DB Java SDK v4. Bekijk de opmerkingen bij de release van Azure Cosmos DB Java SDK v4, de Maven-opslagplaats, de wijzigingenfeedprocessor in Azure Cosmos DB en de handleiding voor probleemoplossing van Azure Cosmos DB Java SDK v4 voor meer informatie. Als u momenteel een oudere versie dan v4 gebruikt, raadpleegt u de gids Migreren naar Azure Cosmos DB Java SDK v4 voor hulp om te upgraden naar v4.

Vereisten

  • Azure Cosmos DB-account: u kunt het maken op basis van de Azure Portal of u kunt ook Azure Cosmos DB Emulator gebruiken.

  • Java Development Environment: Zorg ervoor dat u Java Development Kit (JDK) hebt geïnstalleerd op uw computer met ten minste acht versies.

  • Azure Cosmos DB Java SDK V4: biedt de benodigde functies voor interactie met Azure Cosmos DB.

Achtergrond

De Azure Cosmos DB-wijzigingenfeed biedt een gebeurtenisgestuurde interface om acties te activeren als reactie op het invoegen van documenten die veel worden gebruikt.

Gebeurtenissen in de wijzigingenfeed bijhouden wordt grotendeels gedaan door de bibliotheek voor wijzigingenfeedverwerking die in de SDK is ingebouwd. Deze bibliotheek is krachtig genoeg om gebeurtenissen in de wijzigingenfeed tussen meerdere werkrollen te verdelen, indien dat is gewenst. U hoeft alleen een callback op te geven in de bibliotheek van de wijzigingenfeed.

In dit eenvoudige voorbeeld van een Java-toepassing wordt de realtime gegevensverwerking met Azure Cosmos DB en de wijzigingenfeedprocessor gedemonstreerd. De toepassing voegt voorbeelddocumenten in een 'feedcontainer' in om een gegevensstroom te simuleren. De wijzigingenfeedverwerker, gebonden aan de feedcontainer, verwerkt binnenkomende wijzigingen en registreert de inhoud van het document. De processor beheert automatisch leases voor parallelle verwerking.

Broncode

U kunt de SDK-voorbeeldopslagplaats klonen en dit voorbeeld vinden in SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Walkthrough

  1. Configureer de ChangeFeedProcessorOptions in een Java-toepassing met behulp van Azure Cosmos DB en Azure Cosmos DB Java SDK V4. De ChangeFeedProcessorOptions biedt essentiële instellingen voor het beheren van het gedrag van de wijzigingenfeedverwerker tijdens de gegevensverwerking.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    
  2. Initialiseer ChangeFeedProcessor met relevante configuraties, waaronder de hostnaam, feedcontainer, leasecontainer en logica voor gegevensverwerking. De methode start() initieert de gegevensverwerking, waardoor gelijktijdige en realtime verwerking van binnenkomende gegevenswijzigingen uit de feedcontainer mogelijk is.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Geef op dat de gemachtigde binnenkomende gegevenswijzigingen verwerkt met behulp van de handleChanges() methode . De methode verwerkt de ontvangen JsonNode-documenten van de wijzigingenfeed. Als ontwikkelaar hebt u twee opties voor het verwerken van het JsonNode-document dat u hebt aangeboden door de wijzigingenfeed. Een optie is om het document te bewerken in de vorm van een JsonNode. Dit is vooral handig als u niet één uniform gegevensmodel voor alle documenten hebt. De tweede optie: transformeer de JsonNode naar een POJO met dezelfde structuur als de JsonNode. Dan kunt u werken op de POJO.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Bouw de Java-toepassing en voer deze uit. De toepassing start de wijzigingenfeedverwerker, voegt voorbeelddocumenten in de feedcontainer in en verwerkt de binnenkomende wijzigingen.

Conclusie

In deze handleiding hebt u geleerd hoe u een Java-toepassing maakt met behulp van Azure Cosmos DB Java SDK V4 die gebruikmaakt van de Azure Cosmos DB voor NoSQL-database en de wijzigingenfeedprocessor gebruikt voor realtime gegevensverwerking. U kunt deze toepassing uitbreiden om complexere use cases af te handelen en robuuste, schaalbare en wereldwijd gedistribueerde toepassingen te bouwen met behulp van Azure Cosmos DB.

Aanvullende resources

Volgende stappen

U kunt nu verdergaan met meer informatie over de wijzigingsfeedschatter in de volgende artikelen: