Gebeurtenisprocessorhost

Notitie

Dit artikel is van toepassing op de oude versie Azure Event Hubs SDK. Zie Partitiebelasting verdelen over meerdere exemplaren van uw toepassing voor de huidige versie van de SDK. Zie deze migratiehandleidingen voor meer informatie over het migreren van uw code naar de nieuwere versie van de SDK.

Azure Event Hubs is een krachtige service voor telemetrie-opname die kan worden gebruikt om miljoenen gebeurtenissen tegen lage kosten te streamen. In dit artikel wordt beschreven hoe u opgenomen gebeurtenissen gebruikt met behulp van de Event Processor Host (EPH); een intelligente consumentenagent die het beheer van controlepunten, leasen en parallelle gebeurtenislezers vereenvoudigt.

De sleutel om te schalen voor Event Hubs is het idee van gepartities. In tegenstelling tot het patroon van concurrerende consumenten maakt het gepart partitioneerde consumentenpatroon een grote schaal mogelijk door het knelpunt van de strijd te verwijderen en end-to-end-parallellisme te vergemakkelijken.

Scenario voor thuisbeveiliging

Als voorbeeldscenario kunt u een bedrijf voor thuisbeveiliging overwegen dat 100.000 huizen bewaakt. Elke minuut worden er gegevens van verschillende sensoren verzameld, zoals een bewegingsdetector, een sensor die door/venster wordt geopend, een sensor voor het breken van een ruit, enzovoort, die in elk huis zijn geïnstalleerd. Het bedrijf biedt een website waar inwoners de activiteit van hun huis in bijna realtime kunnen bewaken.

Elke sensor pusht gegevens naar een Event Hub. De Event Hub is geconfigureerd met 16 partities. Aan de verbruikte kant hebt u een mechanisme nodig dat deze gebeurtenissen kan lezen, samenvoegen (filter, aggregatie, enzovoort) en de aggregatie kan dumpen naar een opslagblob, die vervolgens naar een gebruiksvriendelijke webpagina wordt geprojecteerd.

De consumententoepassing schrijven

Bij het ontwerpen van de consument in een gedistribueerde omgeving moet in het scenario aan de volgende vereisten worden voldaan:

  1. Schaal: Maak meerdere consumenten, waarbij elke consument eigenaar is van het lezen van een Event Hubs partities.
  2. Load balancer: Het aantal consumenten dynamisch vergroten of verkleinen. Wanneer er bijvoorbeeld een nieuw sensortype (bijvoorbeeld een koolstofdetector) aan elke woning wordt toegevoegd, neemt het aantal gebeurtenissen toe. In dat geval verhoogt de operator (een mens) het aantal exemplaren van consumenten. Vervolgens kan de groep consumenten het aantal partities dat ze in hun bezit hebben opnieuw verdelen om de belasting te delen met de zojuist toegevoegde consumenten.
  3. Probleemloos hervatten bij storingen: Als een consument (consument A) uitvalt (bijvoorbeeld als de virtuele machine die de consument host plotseling vastvalt), moeten andere consumenten de partities kunnen ophalen die eigendom zijn van consument A en doorgaan. Bovendien moet het vervolgpunt, een controlepunt of offset genoemd, zich op het exacte punt staan waarop consumer A is mislukt, of iets eerder.
  4. Gebeurtenissen gebruiken: Hoewel de vorige drie punten te maken hebben met het beheer van de consument, moet er code zijn om de gebeurtenissen te gebruiken en er iets nuttigs mee te doen; u kunt deze bijvoorbeeld aggregeren en uploaden naar blobopslag.

In plaats van hiervoor uw eigen oplossing te bouwen, Event Hubs deze functionaliteit via de IEventProcessor-interface en de EventProcessorHost-klasse.

Interface IEventProcessor

Ten eerste implementeren verbruikende toepassingen de IEventProcessor-interface, die vier methoden heeft: OpenAsync, CloseAsync, ProcessErrorAsync en ProcessEventsAsync. Deze interface bevat de werkelijke code voor het verbruiken van de gebeurtenissen die Event Hubs verzendt. De volgende code toont een eenvoudige implementatie:

public class SimpleEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
       Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
       return Task.CompletedTask;
    }

    public Task OpenAsync(PartitionContext context)
    {
       Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
       return Task.CompletedTask;
     }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
       Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
       return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       foreach (var eventData in messages)
       {
          var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
             Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
       }
       return context.CheckpointAsync();
    }
}

Instantieer vervolgens een EventProcessorHost-exemplaar. Afhankelijk van de overbelasting worden bij het maken van het EventProcessorHost-exemplaar in de constructor de volgende parameters gebruikt:

  • hostName: de naam van elk exemplaar van de consument. Elk exemplaar van EventProcessorHost moet een unieke waarde hebben voor deze variabele binnen een consumentengroep, dus codeer deze waarde niet in code.
  • eventHubPath: De naam van de Event Hub.
  • consumerGroupName: Event Hubs gebruikt $Default als de naam van de standaard consumentengroep, maar het is een goed idee om een consumentengroep te maken voor uw specifieke aspect van verwerking.
  • eventHubConnectionString: De connection string naar de Event Hub, die kan worden opgehaald uit de Azure Portal. Deze connection string moet de machtiging Luisteren hebben voor de Event Hub.
  • storageConnectionString: Het opslagaccount dat wordt gebruikt voor intern resourcebeheer.

Belangrijk

Schakel de functie voor zacht verwijderen niet in voor het opslagaccount dat wordt gebruikt als controlepuntopslag.

Ten slotte registreren consumenten het EventProcessorHost-exemplaar bij de Event Hubs service. Als u een gebeurtenisprocessorklasse registreert bij een exemplaar van EventProcessorHost, wordt de gebeurtenisverwerking gestart. Door te registreren kan de Event Hubs-service ervan uit gaan dat de consumenten-app gebeurtenissen uit sommige partities verbruikt en de implementatiecode van de IEventProcessor aanroept wanneer deze gebeurtenissen pusht om te verbruiken.

Notitie

De consumerGroupName is casegevoelig. Wijzigingen in de consumerGroupName kunnen ertoe leiden dat alle partities vanaf het begin van de stream worden gelezen.

Voorbeeld

Stel bijvoorbeeld dat er vijf virtuele machines (VM's) zijn die zijn toegewezen aan het verbruik van gebeurtenissen, en een eenvoudige consoletoepassing op elke VM, die het werkelijke verbruikswerk doet. Elke consoletoepassing maakt vervolgens één EventProcessorHost-exemplaar en registreert dit bij de Event Hubs service.

In dit voorbeeldscenario worden 16 partities toegewezen aan de 5 EventProcessorHost-exemplaren. Sommige EventProcessorHost-exemplaren zijn mogelijk eigenaar van een aantal partities dan andere. Voor elke partitie die een EventProcessorHost-exemplaar bezit, wordt er een exemplaar van de klasse SimpleEventProcessor gemaakt. Daarom zijn er in totaal 16 SimpleEventProcessor exemplaren, met één toegewezen aan elke partitie.

In de volgende lijst wordt dit voorbeeld samengevat:

  • 16 Event Hubs partities.
  • 5 VM's, 1 consumenten-app (bijvoorbeeld Consumer.exe) in elke VM.
  • 5 geregistreerde EPH-exemplaren, 1 in elke VM per Consumer.exe.
  • 16 SimpleEventProcessor objecten die zijn gemaakt door de 5 EPH-exemplaren.
  • 1 Consumer.exe app kan 4 objecten bevatten, omdat het SimpleEventProcessor 1 EPH-exemplaar 4 partities kan hebben.

Bijhouden van het eigendom van partities

Het eigendom van een partitie aan een EPH-exemplaar (of een consument) wordt bij te houden via het Azure Storage account dat wordt opgegeven voor het bijhouden van gegevens. U kunt het bijhouden als volgt visualiseren als een eenvoudige tabel. U kunt de daadwerkelijke implementatie bekijken door de blobs te bekijken onder Storage opgegeven account:

Naam van consumentengroep Partitie-id Hostnaam (eigenaar) Verkregen tijd voor lease (of eigendom) Offset in partitie (controlepunt)
$Default 0 _Consumenten-VM3 2018-04-15T01:23:45 156
$Default 1 _Consumenten-VM4 2018-04-15T01:22:13 734
$Default 2 _Consumenten-VM0 2018-04-15T01:22:56 122
:
:
$Default 15 _Consumenten-VM3 2018-04-15T01:22:56 976

Hier krijgt elke host het eigendom van een partitie voor een bepaalde duur (de duur van de lease). Als een host uitvalt (VM wordt afgesloten), verloopt de lease. Andere hosts proberen het eigendom van de partitie te krijgen en een van de hosts slaagt. Met dit proces wordt de lease op de partitie opnieuw ingesteld met een nieuwe eigenaar. Op deze manier kan slechts één lezer tegelijk lezen uit een bepaalde partitie binnen een consumentengroep.

Berichten ontvangen

Elke aanroep van ProcessEventsAsync levert een verzameling gebeurtenissen. Het is uw verantwoordelijkheid om deze gebeurtenissen te verwerken. Als u er zeker van wilt zijn dat de processorhost elk bericht ten minste één keer verwerkt, moet u uw eigen code schrijven om het opnieuw te proberen. Maar wees voorzichtig met verttreinigde berichten.

Het wordt aanbevolen dat u dingen relatief snel doet; Dat wil zeggen dat u zo weinig mogelijk verwerking kunt doen. Gebruik in plaats daarvan consumentengroepen. Als u naar de opslag moet schrijven en wat routering wilt uitvoeren, is het beter om twee consumentengroepen te gebruiken en twee IEventProcessor-implementaties te hebben die afzonderlijk worden uitgevoerd.

Op een bepaald moment wilt u tijdens de verwerking bijhouden wat u hebt gelezen en voltooid. Bijhouden is essentieel als u het lezen opnieuw moet starten, zodat u niet terugkeert naar het begin van de stroom. EventProcessorHost vereenvoudigt dit bijhouden met behulp van controlepunten. Een controlepunt is een locatie, of offset, voor een bepaalde partitie binnen een bepaalde consumentengroep, waarop u tevreden bent dat u de berichten hebt verwerkt. Het markeren van een controlepunt in EventProcessorHost wordt bereikt door de methode CheckpointAsync aan te roepen op het Object PartitionContext. Deze bewerking wordt uitgevoerd binnen de methode ProcessEventsAsync, maar kan ook worden uitgevoerd in CloseAsync.

Controlepunten maken

De methode CheckpointAsync heeft twee overloads: de eerste, zonder parameters, controlepunten naar de hoogste gebeurtenis offset binnen de verzameling die wordt geretourneerd door ProcessEventsAsync. Deze offset is een 'boven water'-markering; Er wordt ervan uitgenomen dat u alle recente gebeurtenissen hebt verwerkt wanneer u deze aanroept. Als u deze methode op deze manier gebruikt, moet u er rekening mee houden dat u deze moet aanroepen nadat uw andere code voor gebeurtenisverwerking is geretourneerd. Met de tweede overbelasting kunt u een EventData-exemplaar naar een controlepunt opgeven. Met deze methode kunt u een ander type watermerk gebruiken om een controlepunt te maken. Met dit watermerk kunt u een markering 'laag water' implementeren: de laagst gesequentieeerde gebeurtenis die zeker is verwerkt. Deze overbelasting wordt geboden om flexibiliteit in offsetbeheer mogelijk te maken.

Wanneer het controlepunt wordt uitgevoerd, wordt een JSON-bestand met partitiespecifieke informatie (met name de offset) naar het opslagaccount geschreven dat in de constructor is opgegeven voor EventProcessorHost. Dit bestand wordt voortdurend bijgewerkt. Het is essentieel om controlepunten in context te overwegen. Het is niet verstandig om elk bericht te plaatsen. Het opslagaccount dat wordt gebruikt voor het maken van controlepunten zou deze belasting waarschijnlijk niet verwerken, maar belangrijker is dat het maken van controlepunten voor elke gebeurtenis wijst op een berichtenpatroon in de wachtrij waarvoor een Service Bus-wachtrij mogelijk een betere optie is dan een Event Hub. Het idee achter Event Hubs is dat u 'ten minste één keer' op grote schaal wordt geleverd. Door uw downstreamsystemen idempotent te maken, is het eenvoudig om te herstellen van fouten of herstarts die ertoe leiden dat dezelfde gebeurtenissen meerdere keren worden ontvangen.

Thread safety and processor instances (Veiligheid van threads en processoren)

EventProcessorHost is standaard thread-veilig en gedraagt zich synchroon met betrekking tot het exemplaar van IEventProcessor. Wanneer gebeurtenissen voor een partitie binnenkomen, wordt ProcessEventsAsync aangeroepen op het IEventProcessor-exemplaar voor die partitie en worden verdere aanroepen naar ProcessEventsAsync voor de partitie geblokkeerd. Volgende berichten en aanroepen naar ProcessEventsAsync-wachtrij achter de schermen wanneer de berichtpomp op de achtergrond blijft worden uitgevoerd op andere threads. Deze thread-veiligheid maakt thread-veilige verzamelingen niet meer nodig en verhoogt de prestaties aanzienlijk.

Op de goede manier afsluiten

Ten slotte maakt EventProcessorHost.UnregisterEventProcessorAsync een schone afsluiting van alle partitielezers mogelijk en moet deze altijd worden aangeroepen bij het afsluiten van een exemplaar van EventProcessorHost. Als u dit niet doet, kan dit vertragingen veroorzaken bij het starten van andere exemplaren van EventProcessorHost vanwege leaseverloop en epoche-conflicten. Epoche-beheer wordt uitgebreid besproken in de sectie Epoch van het artikel.

Leasebeheer

Als u een gebeurtenisprocessorklasse registreert bij een exemplaar van EventProcessorHost, wordt de gebeurtenisverwerking gestart. Het host-exemplaar verkrijgt leases op sommige partities van de Event Hub, mogelijk door sommige van andere host-exemplaren te pakken, op een manier die convergeert op een gelijkmatige verdeling van partities over alle host-exemplaren. Voor elke geleasde partitie maakt het host-exemplaar een exemplaar van de opgegeven gebeurtenisprocessorklasse, ontvangt vervolgens gebeurtenissen van die partitie en geeft deze door aan het exemplaar van de gebeurtenisprocessor. Naarmate er meer exemplaren worden toegevoegd en er meer leases worden binnengegrepen, wordt de belasting uiteindelijk door EventProcessorHost verdeeld over alle consumenten.

Zoals eerder is uitgelegd, vereenvoudigt de traceringstabel de aard van automatisch schalen van EventProcessorHost.UnregisterEventProcessorAsync aanzienlijk. Als een exemplaar van EventProcessorHost wordt gestart, verkrijgt het zo veel mogelijk leases en begint het met het lezen van gebeurtenissen. Wanneer de leases bijna verlopen, probeert EventProcessorHost deze te vernieuwen door een reservering te plaatsen. Als de lease beschikbaar is voor verlenging, blijft de processor lezen, maar als dat niet het is, wordt de lezer gesloten en wordt CloseAsync aangeroepen. CloseAsync is een goed moment om een definitieve opschoonactie voor die partitie uit te voeren.

EventProcessorHost bevat een eigenschap PartitionManagerOptions. Deze eigenschap maakt controle over leasebeheer mogelijk. Stel deze opties in voordat u uw IEventProcessor-implementatie registreert.

Opties voor de host van de gebeurtenisprocessor bepalen

Daarnaast gebruikt één overload van RegisterEventProcessorAsync een EventProcessorOptions-object als parameter. Gebruik deze parameter om het gedrag van EventProcessorHost.UnregisterEventProcessorAsync zelf te bepalen. EventProcessorOptions definieert vier eigenschappen en één gebeurtenis:

  • MaxBatchSize:de maximale grootte van de verzameling die u wilt ontvangen bij het aanroepen van ProcessEventsAsync. Deze grootte is niet het minimum, alleen de maximale grootte. Als er minder berichten moeten worden ontvangen, wordt ProcessEventsAsync uitgevoerd met zoveel als er beschikbaar waren.
  • PrefetchCount:een waarde die wordt gebruikt door het onderliggende AMQP-kanaal om de bovengrens te bepalen van het aantal berichten dat de client moet ontvangen. Deze waarde moet groter zijn dan of gelijk zijn aan MaxBatchSize.
  • InvokeProcessorAfterReceiveTimeout:als deze parameter true is, wordt ProcessEventsAsync aangeroepen wanneer er een times-out is voor de onderliggende aanroep voor het ontvangen van gebeurtenissen op een partitie. Deze methode is handig voor het uitvoeren van op tijd gebaseerde acties tijdens perioden van inactiviteit op de partitie.
  • InitialOffsetProvider:hiermee kunt u een functie-aanwijzer of lambda-expressie instellen, die wordt aangeroepen om de eerste offset op te geven wanneer een lezer begint met het lezen van een partitie. Zonder deze offset op te geven, begint de lezer bij de oudste gebeurtenis, tenzij een JSON-bestand met een offset al is opgeslagen in het opslagaccount dat is opgegeven voor de EventProcessorHost-constructor. Deze methode is handig als u het gedrag van het opstarten van de lezer wilt wijzigen. Wanneer deze methode wordt aangeroepen, bevat de objectparameter de partitie-id waarvoor de lezer wordt gestart.
  • ExceptionReceivedEventArgs:Hiermee kunt u meldingen ontvangen van onderliggende uitzonderingen die optreden in EventProcessorHost. Als alles niet werkt zoals verwacht, is deze gebeurtenis een goede plek om te beginnen met zoeken.

Epoch

Dit is de manier waarop het ontvangen epoche werkt:

Met Epoche

Epoch is een unieke id (epoche-waarde) die de service gebruikt om het eigendom van partities/leases af te dwingen. U maakt een ontvanger op basis van Epoch met behulp van de methode CreateEpochReceiver. Met deze methode maakt u een ontvanger op basis van Epoch. De ontvanger wordt gemaakt voor een specifieke Event Hub-partitie van de opgegeven consumentengroep.

De epoche-functie biedt gebruikers de mogelijkheid om ervoor te zorgen dat er op elk moment slechts één ontvanger in een consumentengroep is, met de volgende regels:

  • Als er geen bestaande ontvanger in een consumentengroep is, kan de gebruiker een ontvanger met een tijdvakwaarde maken.
  • Als er een ontvanger is met een epoche-waarde e1 en er een nieuwe ontvanger wordt gemaakt met een epoche-waarde e2 waarbij e1 <= e2, wordt de verbinding van de ontvanger met e1 automatisch verbroken, wordt de ontvanger met e2 gemaakt.
  • Als er een ontvanger is met een epochewaarde e1 en er een nieuwe ontvanger wordt gemaakt met een epoche-waarde e2 waarbij e1 > e2, mislukt het maken van e2 met de fout: Er bestaat al een ontvanger met epoche e1.

Geen epoche

U maakt een niet-Epoch-ontvanger met behulp van de methode CreateReceiver.

Er zijn enkele scenario's in stroomverwerking waarbij gebruikers meerdere ontvangers in één consumentengroep willen maken. Ter ondersteuning van dergelijke scenario's hebben we de mogelijkheid om een ontvanger te maken zonder epoche en in dit geval staan we maximaal vijf gelijktijdige ontvangers toe voor de consumentengroep.

Gemengde modus

Het gebruik van toepassingen wordt niet aangeraden wanneer u een ontvanger met epoche maakt en vervolgens overschakelt naar no-epoch of vice versa in dezelfde consumentengroep. Als dit gedrag echter optreedt, verwerkt de service dit met behulp van de volgende regels:

  • Als er al een ontvanger is gemaakt met epoche e1 en actief gebeurtenissen ontvangt en er een nieuwe ontvanger wordt gemaakt zonder epoche, mislukt het maken van een nieuwe ontvanger. Epoche-ontvangers hebben altijd voorrang in het systeem.
  • Als er al een ontvanger is gemaakt met epoche e1 en de verbinding is verbroken en er een nieuwe ontvanger wordt gemaakt zonder epoche op een nieuwe MessagingFactory, slaagt het maken van een nieuwe ontvanger. Er is hier een nadeel dat ons systeem na ongeveer 10 minuten de 'verbinding met de ontvanger' detecteert.
  • Als er een of meer ontvangers zijn gemaakt zonder epoche en er een nieuwe ontvanger wordt gemaakt met epoche e1, wordt de verbinding met alle oude ontvangers verbroken.

Notitie

We raden u aan verschillende consumentengroepen te gebruiken voor toepassingen die gebruikmaken van tijdvakken en voor toepassingen die geen tijdvakken gebruiken om fouten te voorkomen.

Volgende stappen

Nu u bekend bent met de Event Processor Host, kunt u de volgende artikelen lezen voor meer informatie over Event Hubs: