Värd för händelsebearbetning

Anteckning

Den här artikeln gäller för den gamla versionen Azure Event Hubs SDK. Aktuell version av SDK finns i Balansera partitionsbelastning över flera instanser av ditt program. Information om hur du migrerar din kod till den nyare versionen av SDK finns i dessa migreringsguider.

Azure Event Hubs är en kraftfull tjänst för inmatning av telemetri som kan användas för att strömma miljontals händelser till låg kostnad. I den här artikeln beskrivs hur du använder in matade händelser med hjälp av Event Processor Host (EPH). en intelligent konsumentagent som förenklar hanteringen av kontrollpunkter, leasing och parallella händelseläsare.

Nyckeln till skalning för Event Hubs är idén med partitionerade konsumenter. Till skillnad från mönstret för konkurrerande konsumenter möjliggör det partitionerade konsumentmönstret hög skalning genom att ta bort flaskhalsen för konkurrens och underlätta parallellitet från slut till slut.

Scenario för hemsäkerhet

Ett exempelscenario är ett hemsäkerhetsföretag som övervakar 100 000 hem. Varje minut hämtar den data från olika sensorer, till exempel en rörelsedetektor, en öppen sensor för dörren/fönstret, en detektor för glassbrytning osv. som installeras i varje hem. Företaget tillhandahåller en webbplats för besökare som kan övervaka aktiviteten i deras hem nästan i realtid.

Varje sensor push-erar data till en händelsehubb. Händelsehubben har konfigurerats med 16 partitioner. I slutänden behöver du en mekanism som kan läsa dessa händelser, konsolidera dem (filtrera, aggregera osv.) och dumpa aggregeringen till en lagringsblob som sedan projiceras till en användarvänlig webbsida.

Skriva konsumentprogrammet

När du utformar konsumenten i en distribuerad miljö måste scenariot hantera följande krav:

  1. Skala: Skapa flera konsumenter, där varje konsument äger ägarskapet för läsning från några Event Hubs partitioner.
  2. Belastningsutjämning: Öka eller minska konsumenterna dynamiskt. När till exempel en ny sensortyp (till exempel en kolmonoxiddetektor) läggs till i varje hem ökar antalet händelser. I så fall ökar operatorn (en människa) antalet konsumentinstanser. Sedan kan poolen med konsumenter balansera om antalet partitioner som de äger för att dela belastningen med de nyligen tillagda konsumenterna.
  3. Sömlöst återuppta vid fel: Om en konsument (konsument A) misslyckas (till exempel om den virtuella dator som är värd för konsumenten plötsligt kraschar) måste andra konsumenter kunna hämta partitionerna som ägs av konsument A och fortsätta. Dessutom bör fortsättningspunkten, som kallas en kontrollpunkt eller offset, vara vid den exakta punkt där konsument A misslyckades, eller något före det.
  4. Förbruka händelser: De föregående tre punkterna handlar om hanteringen av konsumenten, men det måste finnas kod för att använda händelserna och göra något användbart med den. du kan till exempel aggregera den och ladda upp den till Blob Storage.

I stället för att skapa en egen lösning Event Hubs du den här funktionen via gränssnittet IEventProcessor och klassen EventProcessorHost.

IEventProcessor-gränssnitt

För det första implementerar konsumerande program IEventProcessor-gränssnittet, som har fyra metoder: OpenAsync, CloseAsync, ProcessErrorAsync och ProcessEventsAsync. Det här gränssnittet innehåller den faktiska koden för att använda de händelser som Event Hubs skickar. Följande kod visar en enkel implementering:

public class SimpleEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
       Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
       return Task.CompletedTask;
    }

    public Task OpenAsync(PartitionContext context)
    {
       Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
       return Task.CompletedTask;
     }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
       Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
       return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       foreach (var eventData in messages)
       {
          var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
             Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
       }
       return context.CheckpointAsync();
    }
}

Skapa sedan en instans av en EventProcessorHost-instans. Beroende på överlagringen används följande parametrar när du skapar EventProcessorHost-instansen i konstruktorn:

  • hostName: namnet på varje konsumentinstans. Varje instans av EventProcessorHost måste ha ett unikt värde för den här variabeln i en konsumentgrupp, så hårdkoda inte det här värdet.
  • eventHubPath: Namnet på händelsehubben.
  • consumerGroupName: Event Hubs använder $Default som namn på standardkonsumentgruppen, men det är en bra idé att skapa en konsumentgrupp för din specifika aspekt av bearbetningen.
  • eventHubConnectionString: Anslutningssträngen till händelsehubben, som kan hämtas från Azure Portal. Den här anslutningssträngen ska ha behörighet att lyssna på händelsehubben.
  • storageConnectionString: Lagringskontot som används för intern resurshantering.

Viktigt

  • Aktivera inte funktionen för mjuk borttagning på lagringskontot som används som kontrollpunktslager.
  • Använd inte hierarkisk lagring (Azure Data Lake Storage Gen 2) som ett kontrollpunktslager.

Slutligen registrerar konsumenter EventProcessorHost-instansen med Event Hubs tjänsten. Registrering av en händelseprocessorklass med en instans av EventProcessorHost startar händelsebearbetning. Registreringen instruerar Event Hubs-tjänsten att förvänta sig att konsumentappen använder händelser från vissa av sina partitioner och anropar IEventProcessor-implementeringskoden när den pushar händelser för att använda.

Anteckning

consumerGroupName är fallkänsligt. Ändringar i consumerGroupName kan resultera i läsning av alla partitioner från början av dataströmmen.

Exempel

Anta till exempel att det finns 5 virtuella datorer (VM) som är dedikerade för att konsumera händelser och ett enkelt konsolprogram på varje virtuell dator, vilket gör att den faktiska förbrukningen fungerar. Varje konsolprogram skapar sedan en EventProcessorHost-instans och registrerar den med Event Hubs tjänsten.

I det här exempelscenariot kan vi säga att 16 partitioner allokeras till de 5 EventProcessorHost-instanserna. Vissa EventProcessorHost-instanser kan äga några fler partitioner än andra. För varje partition som en EventProcessorHost-instans äger skapas en instans av SimpleEventProcessor klassen . Därför finns det 16 instanser av SimpleEventProcessor totalt, med en tilldelad till varje partition.

I följande lista sammanfattas det här exemplet:

  • 16 Event Hubs partitioner.
  • 5 virtuella datorer, 1 konsumentapp (till exempel Consumer.exe) på varje virtuell dator.
  • 5 EPH-instanser registrerade, 1 i varje virtuell dator Consumer.exe.
  • 16 SimpleEventProcessor objekt som skapats av de 5 EPH-instanserna.
  • 1 Consumer.exe kan innehålla 4 SimpleEventProcessor objekt, eftersom den 1 EPH-instansen kan äga 4 partitioner.

Spårning av partitionsägarskap

Ägarskapet för en partition till en EPH-instans (eller en konsument) spåras via det Azure Storage konto som tillhandahålls för spårning. Du kan visualisera spårningen som en enkel tabell på följande sätt. Du kan se den faktiska implementeringen genom att undersöka blobarna under Storage angivna kontot:

Konsumentgruppens namn Partitions-ID Värdnamn (ägare) Uppköpt tid för lån (eller ägarskap) Förskjutning i partition (kontrollpunkt)
$Default 0 Virtuell _ konsument-VM3 2018-04-15T01:23:45 156
$Default 1 Virtuell _ konsument-DATOR4 2018-04-15T01:22:13 734
$Default 2 Virtuell _ konsument-VM0 2018-04-15T01:22:56 122
:
:
$Default 15 Virtuell _ konsument-VM3 2018-04-15T01:22:56 976

Här får varje värd ägarskap för en partition under en viss tid (lånets varaktighet). Om en värd slutar fungera (den virtuella datorn stängs av) går lånet ut. Andra värdar försöker få ägarskap för partitionen, och en av värdarna lyckas. Den här processen återställer lånet på partitionen med en ny ägare. På så sätt kan endast en läsare i taget läsa från en viss partition inom en konsumentgrupp.

Ta emot meddelanden

Varje anrop till ProcessEventsAsync levererar en samling händelser. Det är ditt ansvar att hantera dessa händelser. Om du vill se till att processorvärden bearbetar varje meddelande minst en gång måste du skriva din egen fortsätt att försöka kod igen. Men var försiktig när det gäller meddelandededed.

Vi rekommenderar att du gör saker relativt snabbt. det vill säga göra så lite bearbetning som möjligt. Använd i stället konsumentgrupper. Om du behöver skriva till lagring och göra viss routning är det bättre att använda två konsumentgrupper och ha två IEventProcessor-implementeringar som körs separat.

Någon gång under bearbetningen kanske du vill hålla reda på vad du har läst och slutfört. Det är viktigt att hålla reda på om du måste starta om läsningen, så att du inte återgår till början av dataströmmen. EventProcessorHost förenklar spårningen med hjälp av kontrollpunkter. En kontrollpunkt är en plats, eller offset, för en viss partition i en viss konsumentgrupp, där du är nöjd med att du har bearbetat meddelandena. Du markerar en kontrollpunkt i EventProcessorHost genom att anropa metoden CheckpointAsync i PartitionContext-objektet. Den här åtgärden utförs i metoden ProcessEventsAsync, men kan även göras i CloseAsync.

Kontrollpunkter

Metoden CheckpointAsync har två överlagringar: den första, utan parametrar, kontrollpunkter till den högsta händelseförskjutningen i samlingen som returneras av ProcessEventsAsync. Den här förskjutningen är ett "vattenmärke". Den förutsätter att du har bearbetat alla de senaste händelserna när du anropar den. Om du använder den här metoden på det här sättet bör du vara medveten om att du förväntas anropa den när din andra händelsebearbetningskod har returnerats. Med den andra överlagringen kan du ange en EventData-instans till kontrollpunkten. Med den här metoden kan du använda en annan typ av vattenstämpel för kontrollpunkten. Med den här vattenstämpeln kan du implementera ett "lågt vattenmärke": den lägsta sekvenserade händelsen som du är säker på har bearbetats. Den här överlagringen tillhandahålls för att möjliggöra flexibilitet vid förskjutningshantering.

När kontrollpunkten utförs skrivs en JSON-fil med partitionsspecifik information (mer specifikt förskjutningen) till det lagringskonto som anges i konstruktorn till EventProcessorHost. Den här filen uppdateras kontinuerligt. Det är viktigt att tänka på kontrollpunkter i kontexten – det skulle vara ovisst att markera varje meddelande. Lagringskontot som används för kontrollpunkter skulle förmodligen inte hantera den här belastningen, men ännu viktigare är att kontrollpunkter för varje enskild händelse är en indikation på ett meddelandemönster i kö där en Service Bus-kö kan vara ett bättre alternativ än en händelsehubb. Tanken med Event Hubs är att du får leverans "minst en gång" i stor skala. Genom att göra dina underordnade system idempotenta är det enkelt att återställa från fel eller omstarter som resulterar i att samma händelser tas emot flera gånger.

Trådsäkerhet och processorinstanser

Som standard är EventProcessorHost trådsäker och fungerar synkront med avseende på instansen av IEventProcessor. När händelser tas emot för en partition anropas ProcessEventsAsyncIEventProcessor-instansen för den partitionen och blockerar ytterligare anrop till ProcessEventsAsync för partitionen. Efterföljande meddelanden och anrop till ProcessEventsAsync-kön i bakgrunden när meddelandepumpen fortsätter att köras i bakgrunden på andra trådar. Den här trådsäkerheten tar bort behovet av trådsäkra samlingar och ökar prestanda avsevärt.

Stäng av på ett smidigt sätt

Slutligen aktiverar EventProcessorHost.UnregisterEventProcessorAsync en ren avstängning av alla partitionsläsare och bör alltid anropas när du stänger av en instans av EventProcessorHost. Om du inte gör det kan det orsaka fördröjningar när du startar andra instanser av EventProcessorHost på grund av förfallotid och epokkonflikter. Epokhantering beskrivs i detalj i avsnittet Epok i artikeln.

Lånehantering

Registrering av en händelseprocessorklass med en instans av EventProcessorHost startar händelsebearbetning. Värdinstansen får lån på vissa partitioner av händelsehubben, som eventuellt hämtar några från andra värdinstanser på ett sätt som konvergerar på en jämn fördelning av partitioner över alla värdinstanser. För varje hyrd partition skapar värdinstansen en instans av den angivna händelseprocessorklassen, tar emot händelser från partitionen och skickar dem till händelseprocessorinstansen. Allt eftersom fler instanser läggs till och fler lån köps, balanserar EventProcessorHost så småningom belastningen mellan alla konsumenter.

Som tidigare nämnts förenklar spårningstabellen den automatiska skalningen av EventProcessorHost.UnregisterEventProcessorAsync. När en instans av EventProcessorHost startar hämtar den så många lån som möjligt och börjar läsa händelser. När lånen snart upphör att gälla försöker EventProcessorHost förnya dem genom att göra en reservation. Om lånet är tillgängligt för förnyelse fortsätter processorn att läsa, men om det inte är det stängs läsaren och CloseAsync anropas. CloseAsync är ett bra tillfälle att utföra en slutlig rensning för den partitionen.

EventProcessorHost innehåller egenskapen PartitionManagerOptions. Den här egenskapen möjliggör kontroll över lånehantering. Ange dessa alternativ innan du registrerar implementeringen av IEventProcessor.

Kontrollera värdalternativ för händelseprocessor

Dessutom tar en överlagring av RegisterEventProcessorAsync ett EventProcessorOptions-objekt som en parameter. Använd den här parametern för att styra beteendet för själva EventProcessorHost.UnregisterEventProcessorAsync. EventProcessorOptions definierar fyra egenskaper och en händelse:

  • MaxBatchSize:Den maximala storleken för den samling som du vill ta emot i ett anrop av ProcessEventsAsync. Den här storleken är inte den minsta, bara den maximala storleken. Om det finns färre meddelanden som ska tas emot körs ProcessEventsAsync med så många som var tillgängliga.
  • PrefetchCount:Ett värde som används av den underliggande AMQP-kanalen för att fastställa den övre gränsen för hur många meddelanden klienten ska ta emot. Det här värdet ska vara större än eller lika med MaxBatchSize.
  • InvokeProcessorAfterReceiveTimeout:Om den här parametern är sann anropas ProcessEventsAsync när det underliggande anropet för att ta emot händelser på en partitions tidsgräns. Den här metoden är användbar för att ta tidsbaserade åtgärder under perioder av inaktivitet på partitionen.
  • InitialOffsetProvider:Gör att en funktionspekare eller ett lambda-uttryck kan anges, som anropas för att ange den inledande förskjutningen när en läsare börjar läsa en partition. Utan att ange den här förskjutningen startar läsaren vid den äldsta händelsen, såvida inte en JSON-fil med en offset redan har sparats i det lagringskonto som angetts till EventProcessorHost-konstruktorn. Den här metoden är användbar när du vill ändra beteendet för läsarens start. När den här metoden anropas innehåller objektparametern partitions-ID:t som läsaren startas för.
  • ExceptionReceivedEventArgs:Gör att du kan ta emot meddelanden om eventuella underliggande undantag som inträffar i EventProcessorHost. Om allt inte fungerar som förväntat är den här händelsen ett bra ställe att börja titta på.

Epok

Så här fungerar epoker för mottagning:

Med epok

Epok är en unik identifierare (epokvärde) som tjänsten använder för att framtvinga partitions-/låneägarskap. Du skapar en Epoch-baserad mottagare med hjälp av metoden CreateEpochReceiver. Den här metoden skapar en Epoch-baserad mottagare. Mottagaren skapas för en specifik händelsehubbpartition från den angivna konsumentgruppen.

Epokfunktionen ger användarna möjlighet att se till att det bara finns en mottagare i en konsumentgrupp vid en tidpunkt, med följande regler:

  • Om det inte finns någon befintlig mottagare i en konsumentgrupp kan användaren skapa en mottagare med val annat epokvärde.
  • Om det finns en mottagare med ett epokvärde e1 och en ny mottagare skapas med ett epokvärde e2 där e1 <= e2, kopplas mottagaren med e1 från automatiskt, och mottagare med e2 skapas.
  • Om det finns en mottagare med ett epokvärde e1 och en ny mottagare skapas med ett epokvärde e2 där e1 > e2, så går det inte att skapa e2 med felet: Det finns redan en mottagare med eoch e1.

Ingen epok

Du skapar en icke-Epoch-baserad mottagare med hjälp av metoden CreateReceiver.

Det finns vissa scenarier i strömbearbetning där användare vill skapa flera mottagare i en enda konsumentgrupp. För att stödja sådana scenarier har vi möjlighet att skapa en mottagare utan epok och i det här fallet tillåter vi upp till 5 samtidiga mottagare i konsumentgruppen.

Blandat läge

Vi rekommenderar inte programanvändning där du skapar en mottagare med epok och sedan växlar till no-epoch eller vice versa i samma konsumentgrupp. När detta inträffar hanterar tjänsten den dock med hjälp av följande regler:

  • Om det redan finns en mottagare som redan har skapats med epok e1 och aktivt tar emot händelser och en ny mottagare skapas utan epok, kommer skapandet av en ny mottagare att misslyckas. Epokmottagare har alltid företräde i systemet.
  • Om en mottagare redan har skapats med epok e1 och kopplades från, och en ny mottagare skapas utan epok på en ny MessagingFactory, kommer skapandet av en ny mottagare att lyckas. Det finns en varning här att vårt system kommer att identifiera "mottagare frånkoppling" efter cirka 10 minuter.
  • Om det finns en eller flera mottagare som skapats utan epok och en ny mottagare skapas med e1, kopplas alla gamla mottagare från.

Anteckning

Vi rekommenderar att du använder olika konsumentgrupper för program som använder epoker och för dem som inte använder epoker för att undvika fel.

Nästa steg

Nu när du är bekant med värden för händelseprocessorer kan du läsa följande artiklar för att lära dig mer om Event Hubs: