Vzory úloh replikace událostí

Přehled federace a přehled funkcí replikátoru vysvětlují důvody a základní prvky úloh replikace a doporučuje se, abyste se s nimi seznámili, než budete pokračovat v tomto článku.

V tomto článku podrobně popisujeme pokyny k implementaci pro několik vzorů zvýrazněných v části přehledu.

Replikace

Model replikace kopíruje události z jednoho centra událostí do dalšího nebo z centra událostí do jiného cíle, jako je fronta služby Service Bus. Události se přeposílají bez jakýchkoli úprav datové části události.

Implementaci tohoto modelu pokrývají replikace událostí mezi službou Event Hubs a replikace událostí mezi službami Event Hubs a ukázky služby Service Bus a kurz Použití Apache Kafka MirrorMakeru se službou Event Hubs pro konkrétní případ replikace dat ze zprostředkovatele Apache Kafka do služby Event Hubs.

Datové proudy a zachování objednávek

Replikace prostřednictvím Azure Functions nebo Azure Stream Analytics nemá za cíl zajistit vytvoření přesně 1:1 klonů zdrojového centra událostí do cílového centra událostí, ale zaměřuje se na zachování relativního pořadí událostí tam, kde to aplikace vyžaduje. Aplikace to komunikuje seskupením souvisejících událostí se stejným klíčem oddílu a služba Event Hubs uspořádá zprávy se stejným klíčem oddílu postupně ve stejném oddílu.

Důležité

Informace o posunu jsou jedinečné pro každé centrum událostí a posuny pro stejné události se budou v jednotlivých instancích centra událostí lišit. Pokud chcete najít pozici ve kopírovacím streamu událostí, použijte posuny na základě času a odkazujte na rozšířená metadata přiřazená službou.

Posuny založené na čase začínají příjemce v určitém okamžiku:

  • EventPosition.FromStart() – znovu přečte všechna zachovaná data.
  • EventPosition.FromEnd() – přečte všechna nová data z doby připojení.
  • EventPosition.FromEnqueuedTime(dateTime) – všechna data od daného data a času.

V EventProcessoru nastavíte pozici prostřednictvím InitialOffsetProvider na EventProcessorOptions. U ostatních rozhraní API přijímače se pozice předává konstruktorem.

Předdefinované pomocné rutiny funkcí replikace poskytované jako ukázky, které se používají v pokynech založených na Azure Functions, zajišťují, aby se streamy událostí se stejným klíčem oddílu načteným ze zdrojového oddílu odesílaly do cílového centra událostí jako dávka v původním streamu a se stejným klíčem oddílu.

Pokud je počet oddílů zdrojového a cílového centra událostí stejný, všechny streamy v cíli se namapují na stejné oddíly jako ve zdroji. Pokud se počet oddílů liší, což je důležité v některých dalších vzorech popsaných v následujícím článku, mapování se bude lišit, ale streamy se vždy uchovávají pohromadě a v pořadí.

Relativní pořadí událostí patřících do různých datových proudů nebo nezávislých událostí bez klíče oddílu v cílovém oddílu se může vždy lišit od zdrojového oddílu.

Metadata přiřazená službou

Metadata události přiřazená službou získaná ze zdrojového centra událostí, čas původní fronty, pořadové číslo a posun jsou nahrazena novými hodnotami přiřazenými službou v cílovém centru událostí, ale s pomocnými funkcemi, úlohami replikace, které jsou k dispozici v našich ukázkách, se původní hodnoty zachovají ve vlastnostech uživatele: repl-enqueue-time (řetězec ISO8601), repl-sequence, repl-offset.

Tyto vlastnosti jsou typu řetězec a obsahují řetězcovou hodnotu odpovídajících původních vlastností. Pokud se událost předá vícekrát, metadata přiřazená službou bezprostředního zdroje se připojí k již existujícím vlastnostem s hodnotami oddělenými středníky.

Převzetí služeb při selhání

Pokud používáte replikaci pro účely zotavení po havárii, kvůli ochraně před událostmi regionální dostupnosti ve službě Event Hubs nebo před přerušením sítě bude každý takový scénář selhání vyžadovat převzetí služeb při selhání z jednoho centra událostí do druhého a inkasovat producenty nebo příjemce, aby používali sekundární koncový bod.

U všech scénářů převzetí služeb při selhání se předpokládá, že požadované prvky oborů názvů jsou strukturálně identické, což znamená, že centra událostí a skupiny příjemců mají stejný název a že pravidla sdíleného přístupového podpisu nebo pravidla řízení přístupu na základě role jsou nastavená stejným způsobem. Sekundární obor názvů můžete vytvořit (a aktualizovat) podle pokynů k přesunu oborů názvů a vynechání kroku vyčištění.

Pokud chcete vynutit přepnutí producentů a příjemců, musíte zpřístupnit informace o tom, který obor názvů použít pro vyhledávání v umístění, které je snadno dostupné a aktualizované. Pokud se producenti nebo spotřebitelé setkávají s častými nebo trvalými chybami, měli by se na toto umístění podívat a upravit konfiguraci. Existuje mnoho způsobů, jak sdílet konfiguraci, ale v následujícím příkladu ukazujeme na dva způsoby: DNS a sdílené složky.

Konfigurace převzetí služeb při selhání na základě DNS

Jedním z možných přístupů je uchovávat informace v záznamech DNS SRV v DNS, který řídíte, a odkazovat na příslušné koncové body centra událostí.

Důležité

Mějte na paměti, že služba Event Hubs neumožňuje přímé aliasování koncových bodů se záznamy CNAME, což znamená, že dns použijete jako odolný vyhledávací mechanismus pro adresy koncových bodů a nebudete přímo překládat informace o IP adresách.

Předpokládejme, že vlastníte doménu example.com a pro vaši aplikaci zónu test.example.com. Pro dvě alternativní služby Event Hubs teď vytvoříte dvě další vnořené zóny a v každé z nich záznam SRV.

Záznamy SRV mají podle běžných konvencí předponu _azure_eventhubs._amqp a uchovávají dva záznamy koncového bodu: jeden pro AMQP-over-TLS na portu 5671 a druhý pro AMQP-over-WebSockets na portu 443, přičemž oba odkazují na koncový bod Event Hubs oboru názvů odpovídající zóně.

Zóna Záznam SRV
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

V zóně aplikace pak vytvoříte záznam CNAME, který odkazuje na podřízenou zónu odpovídající primárnímu centru událostí:

Záznam CNAME Alias
eventhub.test.example.com eh1.test.example.com

Pomocí klienta DNS, který umožňuje explicitní dotazování záznamů CNAME a SRV (předdefinovaní klienti Javy a .NET umožňují pouze jednoduché překlady názvů na IP adresy), pak můžete přeložit požadovaný koncový bod. U DnsClient.NET například vyhledávací funkce:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

Funkce vrátí název cílového hostitele zaregistrovaný pro port 5671 zóny aktuálně aliasované pomocí CNAME, jak je znázorněno výše.

Provedení převzetí služeb při selhání vyžaduje úpravu záznamu CNAME a jeho nasměrování na alternativní zónu.

Výhodou použití DNS a konkrétně Azure DNS je, že se informace Azure DNS globálně replikují, a proto jsou odolné proti výpadkům v jedné oblasti.

Tento postup je podobný tomu, jak funguje geografické zotavení po havárii služby Event Hubs , ale plně pod vaší vlastní kontrolou a funguje také s aktivními a aktivními scénáři.

Konfigurace převzetí služeb při selhání na základě sdílené složky

Nejjednodušší alternativou k použití DNS ke sdílení informací o koncových bodech je vložení názvu primárního koncového bodu do souboru ve formátu prostého textu a obsluhování souboru z infrastruktury, která je robustní proti výpadkům a stále umožňuje aktualizace.

Pokud už používáte infrastrukturu webu s vysokou dostupností a replikací obsahu, přidejte tam takový soubor a v případě potřeby přepnutí soubor znovu publikujte.

Upozornění

Tímto způsobem byste měli publikovat jenom název koncového bodu, ne úplný připojovací řetězec včetně tajných kódů.

Další aspekty týkající se převzetí služeb při selhání příjemců

Další aspekty strategie převzetí služeb při selhání pro příjemce centra událostí závisí na potřebách procesoru událostí.

Pokud dojde k havárii, která vyžaduje opětovné sestavení systému, včetně databází, ze zálohovaných dat a databáze se předávají přímo nebo prostřednictvím zprostředkujícího zpracování z událostí v centru událostí, obnovíte zálohu a pak chcete do systému začít přehrávat události od okamžiku vytvoření zálohy databáze, a ne od okamžiku, kdy byl původní systém zničen.

Pokud selhání ovlivní jenom část systému nebo pouze jedno centrum událostí, které se stalo nedostupným, budete pravděpodobně chtít pokračovat ve zpracování událostí přibližně ze stejné pozice, kde bylo zpracování přerušeno.

K realizaci obou scénářů a použití procesoru událostí příslušné sady Azure SDK vytvoříte nové úložiště kontrolních bodů a zadáte počáteční pozici oddílu na základě časového razítka , ze kterého chcete pokračovat ve zpracování.

Pokud stále máte přístup k úložišti kontrolních bodů centra událostí, ze kterého přecházíte, rozšířená metadata , která jsou popsána výše, vám pomůžou přeskočit události, které už byly zpracovány, a pokračovat přesně od místa, kde jste naposledy skončili.

Sloučit

Model sloučení má jednu nebo více úloh replikace, které odkazují na jeden cíl, případně souběžně s běžnými producenty také odesílají události do stejného cíle.

Varianty těchto patter jsou:

  • Dvě nebo více funkcí replikace současně získávají události z různých zdrojů a odesílají je do stejného cíle.
  • Jedna další replikační funkce, která získává události ze zdroje, zatímco cíl je také používán přímo producenty.
  • Předchozí vzor, ale zrcadlený mezi dvěma nebo více službami Event Hubs, což vede k tomu, že služba Event Hubs obsahuje stejné streamy bez ohledu na to, kde se události vytvářejí.

První dvě varianty vzorů jsou triviální a neliší se od úloh prosté replikace.

Poslední scénář vyžaduje vyloučení již replikovaných událostí z opětovné replikace. Tato technika je demonstrována a vysvětlena v ukázce EventHubToEventHubMerge .

Editor

Vzor editoru vychází ze vzoru replikace , ale zprávy se před předáním upraví.

Příklady takových úprav:

  • Překódování – pokud obsah události (označovaný také jako "tělo" nebo "datová část") přichází ze zdroje zakódovaného pomocí formátu Apache Avro nebo nějakého proprietárního formátu serializace, ale systém, který cíl vlastní, očekává, že obsah bude kódován JSON , úloha replikace překódování nejprve deserializuje datovou část z Apache Avro do grafu objektů v paměti a pak tento graf serializuje do formátu JSON . formát události, která se přeposílala. Překódování zahrnuje také úlohy komprese a dekomprese obsahu.
  • Transformace – události, které obsahují strukturovaná data, můžou vyžadovat změnu tvaru těchto dat, aby bylo snazší je spotřebovat podřízenými uživateli. Může se jednat například o zploštění vnořených struktur, vyřazení nadbytečných datových prvků nebo přetváření datové části tak, aby přesně odpovídala danému schématu.
  • Dávkování – události můžou být přijímány v dávkách (více událostí v jednom přenosu) ze zdroje, ale musí se předávat jednotlivě do cíle nebo naopak. Úkol proto může předávat více událostí na základě přenosu jedné vstupní události nebo agregovat sadu událostí, které se pak přenesou společně.
  • Ověření – Před předáním dat událostí z externích zdrojů je často potřeba zkontrolovat, jestli jsou v souladu se sadou pravidel. Pravidla mohou být vyjádřena pomocí schémat nebo kódu. Události, u které se zjistí, že nedodržují předpisy, mohou být odstraněny, s problémem, který je zaznamenán v protokolech, nebo mohou být předány do speciálního cílového cíle, aby je bylo možné dále zpracovat.
  • Rozšiřování – data událostí pocházející z některých zdrojů můžou vyžadovat rozšíření o další kontext, aby byla použitelná v cílových systémech. To může zahrnovat vyhledání referenčních dat a vložení těchto dat s událostí nebo přidání informací o zdroji, který je známý pro úlohu replikace, ale není obsažen v událostech.
  • Filtrování – některé události přicházející ze zdroje může být potřeba z cíle zadržet na základě nějakého pravidla. Filtr otestuje událost proti pravidlu, a pokud událost neodpovídá pravidlu, událost zahodí. Filtrování duplicitních událostí dodržováním určitých kritérií a odstraňování následných událostí se stejnými hodnotami je formou filtrování.
  • Kryptografie – Úloha replikace může mít za úkol dešifrovat obsah přicházející ze zdroje a/nebo šifrovat obsah předávaný dál do cíle a/nebo může být nutné ověřit integritu obsahu a metadat vzhledem k podpisu přenášeného v události nebo takový podpis připojit.
  • Ověření identity – úloha replikace může připojit metadata, potenciálně chráněná digitálním podpisem, k události, která potvrzuje, že událost byla přijata prostřednictvím konkrétního kanálu nebo v konkrétní čas.
  • Řetězení – Úloha replikace může použít podpisy na streamy událostí tak, aby byla chráněna integrita datového proudu a bylo možné detekovat chybějící události.

Vzory transformace, dávkování a rozšiřování se obecně nejlépe implementují s úlohami Azure Stream Analytics .

Všechny tyto vzory je možné implementovat pomocí Azure Functions pomocí triggeru služby Event Hubs pro získávání událostí a výstupní vazby centra událostí pro jejich doručování.

Směrování

Model směrování vychází ze vzoru replikace , ale místo jednoho zdroje a jednoho cíle má úloha replikace několik cílů, jak je znázorněno tady v jazyce C#:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

Funkce směrování vezme v úvahu metadata zprávy nebo datovou část zprávy a pak vybere jeden z dostupných cílů, do které se má odeslat.

Ve službě Azure Stream Analytics toho můžete dosáhnout definováním několika výstupů a následným spuštěním dotazu na výstup.

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

Projekce protokolu

Model projekce protokolu zploštěluje stream událostí s indexovanou databází, přičemž události se stávají záznamy v databázi. Události se obvykle přidávají do stejné kolekce nebo tabulky a klíč oddílu centra událostí se stane součástí primárního klíče, aby byl záznam jedinečný.

Projekce protokolu může vytvořit historii dat události v časových řadách nebo komprimované zobrazení, přičemž pro každý klíč oddílu se zachovají jenom nejnovější události. Tvar cílové databáze je nakonec na vás a na potřebách vaší aplikace. Tento model se také označuje jako "event sourcing".

Tip

Ve službě Azure Stream Analytics můžete snadno vytvářet projekce protokolů do Azure SQL Database a Azure Cosmos DB a měli byste tuto možnost preferovat.

Následující funkce Azure Functions promítá obsah centra událostí zkomprimovaný do kolekce Azure Cosmos DB.

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

Další kroky