Delen via


Azure Event Hubs Checkpoint Store-bibliotheek voor Javascript met behulp van storage-blobs

Een op Azure Blob Storage gebaseerde oplossing voor het opslaan van controlepunten en om te helpen bij het verdelen van de taakverdeling bij gebruik EventHubConsumerClient vanuit de bibliotheek @azure/event-hubs

Broncode | Pakket (npm) | API-referentiedocumentatie | Monsters

Aan de slag

Het pakket installeren

De blobbibliotheek van Azure Event Hubs Checkpoint Store installeren met npm

npm install @azure/eventhubs-checkpointstore-blob

Vereisten: u moet een Azure-abonnement, een Event Hubs-naamruimte hebben om dit pakket te kunnen gebruiken en een opslagaccount

Als u dit pakket in een Node.js-toepassing gebruikt, gebruikt u Node.js 8.x of hoger.

Typescript configureren

TypeScript-gebruikers moeten knooppunttypedefinities hebben geïnstalleerd:

npm install @types/node

U moet ook inschakelen compilerOptions.allowSyntheticDefaultImports in uw tsconfig.json. Houd er rekening mee dat als u hebt ingeschakeld compilerOptions.esModuleInterop, allowSyntheticDefaultImports standaard is ingeschakeld. Zie het handboek voor compileropties van TypeScript voor meer informatie.

Belangrijkste concepten

  • Schaal: Maak meerdere consumenten, waarbij elke consument eigenaar wordt van het lezen van een paar Event Hubs-partities.

  • Taakverdeling: Toepassingen die ondersteuning bieden voor taakverdeling bestaan uit een of meer exemplaren EventHubConsumerClient waarvan is geconfigureerd om gebeurtenissen te gebruiken van dezelfde Event Hub en consumentengroep en dezelfde CheckpointStore. Ze verdelen de workload over verschillende exemplaren door de partities die moeten worden verwerkt onderling te verdelen.

  • Controlepunten: Het is een proces waarmee lezers hun positie binnen een partitie-gebeurtenisreeks markeren of doorvoeren. Het plaatsen van controlepunten is de verantwoordelijkheid van de consumer en vindt plaats per partitie binnen een consumergroep. Deze verantwoordelijkheid houdt in dat elke partitielezer voor elke consumergroep de huidige positie in de gebeurtenisstroom moet bijhouden en de service kan informeren wanneer de gegevensstroom is voltooid.

    Als een lezer van een partitie is losgekoppeld en er vervolgens weer verbinding wordt gemaakt, begint het lezen bij het controlepunt dat eerder is verzonden door de laatste lezer van de betreffende partitie in de consumergroep. Wanneer de lezer verbinding maakt, wordt de offset doorgegeven aan de Event Hub om de locatie op te geven waar moet worden gelezen. Op deze manier kunt u het plaatsen van controlepunten gebruiken om gebeurtenissen te markeren als 'voltooid' door downstream-toepassingen. Bovendien beschikt u met controlepunten over tolerantie bij een failover tussen lezers die op verschillende apparaten worden uitgevoerd. Het is mogelijk om terug te keren naar de oudere gegevens door een lagere offset van dit controlepuntproces op te geven. Via dit mechanisme zorgt het plaatsen van controlepunten voor failover-tolerantie en voor herhaling van gebeurtenisstromen.

    Een BlobCheckpointStore is een klasse die belangrijke methoden implementeert die nodig zijn voor de EventHubConsumerClient om taken te verdelen en controlepunten bij te werken.

Voorbeelden

CheckpointStore Een maken met behulp van Azure Blob Storage

Gebruik het onderstaande codefragment om een CheckpointStorete maken. U moet de verbindingsreeks opgeven voor uw opslagaccount.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Controlepunt gebeurtenissen met Behulp van Azure Blob Storage

Als u controlepuntgebeurtenissen wilt ontvangen met behulp van Azure Blob Storage, moet u een object doorgeven dat compatibel is met de interface SubscriptionEventHandlers, samen met code om de updateCheckpoint() methode aan te roepen.

In dit voorbeeld SubscriptionHandlers implementeert u SubscriptionEventHandlers en verwerkt u ook controlepunten.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Problemen oplossen

Logboeken inschakelen

U kunt de omgevingsvariabele AZURE_LOG_LEVEL instellen op een van de volgende waarden om logboekregistratie in te schakelen voor stderr:

  • verbose
  • Info
  • waarschuwing
  • fout

U kunt het logboekniveau ook programmatisch instellen door het pakket @azure/logger te importeren en de setLogLevel functie aan te roepen met een van de waarden op logboekniveau.

Bij het instellen van een logboekniveau via een programma of via de AZURE_LOG_LEVEL omgevingsvariabele, worden logboeken die zijn geschreven met een logboekniveau dat gelijk is aan of kleiner is dan het niveau dat u kiest, verzonden. Wanneer u bijvoorbeeld het logboekniveau instelt op info, worden de logboeken die zijn geschreven voor niveaus warning en error ook verzonden. Deze SDK volgt de Azure SDK voor TypeScript-richtlijnen bij het bepalen bij welk niveau moet worden aangemeld.

U kunt ook de DEBUG omgevingsvariabele instellen om logboeken op te halen wanneer u deze bibliotheek gebruikt. Dit kan handig zijn als u ook logboeken van de afhankelijkheden rhea-promiserhea en wilt verzenden.

Opmerking: AZURE_LOG_LEVEL, indien ingesteld, heeft voorrang op FOUTOPSPORING. Geef geen azure bibliotheken op via DEBUG wanneer u ook AZURE_LOG_LEVEL opgeeft of setLogLevel aanroept.

U kunt de volgende omgevingsvariabele instellen om de logboeken voor foutopsporing op te halen wanneer u deze bibliotheek gebruikt.

  • Alleen foutopsporingslogboeken op informatieniveau ophalen uit de Eventhubs Checkpointstore-blob.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Logboekregistratie bij een bestand

  • Schakel logboekregistratie in zoals hierboven wordt weergegeven en voer het testscript als volgt uit:

    • Logboekinstructies van uw testscript gaan naar out.log en logboekregistratieinstructies van de SDK gaan naar debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Logboekinstructies van uw testscript en de SDK gaan naar hetzelfde bestand out.log door stderr om te leiden naar stdout (&1) en vervolgens stdout omleiden naar een bestand:

      node your-test-script.js >out.log 2>&1
      
    • Logboekregistratie-instructies van uw testscript en de SDK gaan naar hetzelfde bestand out.log.

      node your-test-script.js &> out.log
      

Volgende stappen

Bekijk de map met voorbeelden voor een gedetailleerd voorbeeld.

Bijdragen

Als u een bijdrage wilt leveren aan deze bibliotheek, leest u de handleiding voor bijdragen voor meer informatie over het bouwen en testen van de code.

Weergaven