Modelli di attività di replica di eventi

La panoramica della federazione e la panoramica delle funzioni del replicatore illustrano la logica per e gli elementi di base delle attività di replica ed è consigliabile acquisire familiarità con essi prima di continuare con questo articolo.

In questo articolo vengono illustrate in dettaglio le linee guida per l'implementazione per diversi modelli evidenziati nella sezione panoramica.

Replica

Il modello di replica copia gli eventi da un hub eventi al successivo o da un hub eventi a un'altra destinazione, ad esempio una coda del bus di servizio. Gli eventi vengono inoltrati senza apportare modifiche al payload dell'evento.

L'implementazione di questo modello è coperta dalla replica di eventi tra Hub eventi e la replica di eventi tra Hub eventi e esempi del bus di servizio e l'esercitazione Usare Apache Kafka MirrorMaker con Hub eventi per il caso specifico di replica dei dati da un broker Apache Kafka in Hub eventi.

Flussi e conservazione degli ordini

La replica, tramite Funzioni di Azure o Analisi di flusso di Azure, non mira a garantire la creazione di cloni esatti 1:1 di un hub eventi di origine in un hub eventi di destinazione, ma è incentrata sul mantenimento dell'ordine relativo degli eventi in cui l'applicazione lo richiede. L'applicazione comunica questo raggruppamento di eventi correlati con la stessa chiave di partizione e Hub eventi dispone i messaggi con la stessa chiave di partizione in sequenza nella stessa partizione.

Importante

Le informazioni di "offset" sono univoche per ogni hub eventi e gli offset per gli stessi eventi variano a seconda delle istanze di Hub eventi. Per individuare una posizione in un flusso di eventi copiato, usare gli offset basati sul tempo e fare riferimento ai metadati assegnati dal servizio propagati.

Gli offset basati sul tempo avviano il ricevitore in un momento specifico:

  • EventPosition.FromStart(): legge di nuovo tutti i dati conservati.
  • EventPosition.FromEnd(): legge tutti i nuovi dati dal momento della connessione.
  • EventPosition.FromEnqueuedTime(dateTime): tutti i dati a partire da una data e un'ora specificati.

In EventProcessor impostare la posizione tramite InitialOffsetProvider in EventProcessorOptions. Con le altre API del ricevitore, la posizione viene passata attraverso il costruttore.

Gli helper della funzione di replica predefiniti forniti come esempi usati nelle linee guida basate su Funzioni di Azure assicurano che i flussi di eventi con la stessa chiave di partizione recuperata da una partizione di origine vengano inviati all'hub eventi di destinazione come batch nel flusso originale e con la stessa chiave di partizione.

Se il numero di partizioni dell'hub eventi di origine e di destinazione è identico, tutti i flussi nella destinazione verranno mappati alle stesse partizioni dell'origine. Se il numero di partizioni è diverso, che è importante in alcuni dei modelli descritti di seguito, il mapping sarà diverso, ma i flussi vengono sempre mantenuti insieme e in ordine.

L'ordine relativo degli eventi appartenenti a flussi diversi o di eventi indipendenti senza una chiave di partizione in una partizione di destinazione può essere sempre diverso dalla partizione di origine.

Metadati assegnati dal servizio

I metadati assegnati dal servizio di un evento ottenuto dall'hub eventi di origine, l'ora di accodamento originale, il numero di sequenza e l'offset vengono sostituiti da nuovi valori assegnati dal servizio nell'hub eventi di destinazione, ma con le funzioni helper, le attività di replica fornite negli esempi, i valori originali vengono mantenuti nelle proprietà utente: repl-enqueue-time (stringa ISO8601), repl-sequence, repl-offset.

Queste proprietà sono di tipo string e contengono il valore stringato delle rispettive proprietà originali. Se l'evento viene inoltrato più volte, i metadati assegnati dal servizio dell'origine immediata vengono aggiunti alle proprietà già esistenti, con valori separati da punto e virgola.

Failover

Se si usa la replica a scopo di ripristino di emergenza, per proteggersi da eventi di disponibilità a livello di area nel servizio Hub eventi o da interruzioni di rete, qualsiasi scenario di errore di questo tipo richiederà l'esecuzione di un failover da un hub eventi al successivo, indicando ai producer e/o ai consumer di usare l'endpoint secondario.

Per tutti gli scenari di failover, si presuppone che gli elementi necessari degli spazi dei nomi siano strutturalmente identici, vale a dire che Hub eventi e gruppi di consumer sono denominati in modo identico e che le regole di firma di accesso condiviso e/o le regole di controllo degli accessi in base al ruolo vengono configurate nello stesso modo. È possibile creare (e aggiornare) uno spazio dei nomi secondario seguendo le indicazioni per lo spostamento degli spazi dei nomi e omettendo il passaggio di pulizia.

Per forzare il passaggio da parte di producer e consumer, è necessario rendere disponibili le informazioni sullo spazio dei nomi da usare per la ricerca in una posizione facile da raggiungere e aggiornare. Se i produttori o i consumer riscontrano errori frequenti o persistenti, devono consultare tale posizione e modificare la configurazione. Esistono diversi modi per condividere tale configurazione, ma vengono indicati due nei modi seguenti: DNS e condivisioni file.

Configurazione del failover basato su DNS

Un approccio candidato consiste nel contenere le informazioni nei record DNS SRV in un DNS controllato e puntare ai rispettivi endpoint dell'hub eventi.

Importante

Tenere presente che Hub eventi non consente agli endpoint di essere direttamente con alias con record CNAME, il che significa che si userà DNS come meccanismo di ricerca resiliente per gli indirizzi endpoint e non risolvere direttamente le informazioni sugli indirizzi IP.

Si supponga di essere proprietari del dominio example.com e, per l'applicazione, una zona test.example.com. Per due hub eventi alternativi, si creeranno ora due zone annidate e un record SRV in ognuno di essi.

I record SRV sono, seguendo la convenzione comune, preceduti _azure_eventhubs._amqp da e contengono due record endpoint: uno per AMQP-over-TLS sulla porta 5671 e uno per AMQP-over-WebSocket sulla porta 443, entrambi puntando all'endpoint di Hub eventi dello spazio dei nomi corrispondente alla zona.

Zona Record SRV
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

Nella zona dell'applicazione si creerà quindi una voce CNAME che punta alla zona subordinata corrispondente all'hub eventi primario:

Record CNAME Alias
eventhub.test.example.com eh1.test.example.com

Usando un client DNS che consente di eseguire query sui record CNAME e SRV in modo esplicito (i client predefiniti di Java e .NET consentono solo la risoluzione semplice dei nomi agli indirizzi IP), è quindi possibile risolvere l'endpoint desiderato. Con DnsClient.NET, ad esempio, la funzione di ricerca è:

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

La funzione restituisce il nome host di destinazione registrato per la porta 5671 della zona attualmente con alias con CNAME, come illustrato in precedenza.

Per eseguire un failover è necessario modificare il record CNAME e puntare alla zona alternativa.

Il vantaggio dell'uso di DNS, e in particolare dns di Azure, è che le informazioni DNS di Azure vengono replicate a livello globale e quindi resilienti in caso di interruzioni a singola area.

Questa procedura è simile al funzionamento del ripristino di emergenza geografico di Hub eventi , ma completamente sotto il proprio controllo e funziona anche con scenari attivi/attivi.

Configurazione del failover basato su condivisione file

L'alternativa più semplice all'uso di DNS per la condivisione delle informazioni sugli endpoint consiste nell'inserire il nome dell'endpoint primario in un file di testo normale e gestire il file da un'infrastruttura affidabile contro le interruzioni e consente comunque gli aggiornamenti.

Se si esegue già un'infrastruttura del sito Web a disponibilità elevata con disponibilità globale e replica del contenuto, aggiungere tale file e ripubblicare il file se è necessario un'opzione.

Attenzione

È consigliabile pubblicare il nome dell'endpoint solo in questo modo, non una stringa di connessione completa, inclusi i segreti.

Considerazioni aggiuntive per il failover dei consumer

Per i consumer di Hub eventi, le considerazioni aggiuntive per la strategia di failover dipendono dalle esigenze del processore di eventi.

Se si verifica un'emergenza che richiede la ricompilazione di un sistema, inclusi i database, dai dati di backup e i database vengono inseriti direttamente o tramite l'elaborazione intermedia degli eventi contenuti nell'hub eventi, si ripristini il backup e si vuole avviare la riproduzione degli eventi nel sistema dal momento in cui è stato creato il backup del database e non dal momento in cui il sistema originale è stato eliminato definitivamente.

Se un errore interessa solo una sezione di un sistema o solo un singolo hub eventi, che è diventato non raggiungibile, è probabile che si voglia continuare a elaborare gli eventi dalla stessa posizione in cui l'elaborazione è stata interrotta.

Per realizzare uno scenario e usando il processore di eventi del rispettivo Azure SDK, si creerà un nuovo archivio checkpoint e si fornirà una posizione iniziale della partizione, in base al timestamp da cui si vuole riprendere l'elaborazione.

Se si ha ancora accesso all'archivio checkpoint dell'hub eventi da cui si passa, i metadati propagati descritti in precedenza consentono di ignorare gli eventi già gestiti e riprendere esattamente da dove è stata interrotta l'ultima volta.

Unione

Il modello di merge ha una o più attività di replica che puntano a una destinazione, possibilmente contemporaneamente ai producer regolari che inviano eventi alla stessa destinazione.

Le varianti di questi patter sono:

  • Due o più funzioni di replica acquisiscono simultaneamente eventi da origini separate e li inviano alla stessa destinazione.
  • Un'altra funzione di replica che acquisisce eventi da un'origine mentre la destinazione viene usata direttamente dai producer.
  • Il modello precedente, ma con mirroring tra due o più Hub eventi, con conseguente presenza di hub eventi contenenti gli stessi flussi, indipendentemente dalla posizione in cui vengono generati gli eventi.

Le prime due varianti di pattern sono semplici e non differiscono dalle attività di replica normale.

L'ultimo scenario richiede l'esclusione di eventi già replicati dalla replica. La tecnica viene illustrata e illustrata nell'esempio EventHubToEventHubMerge .

Editor

Il modello di editor si basa sul modello di replica , ma i messaggi vengono modificati prima che vengano inoltrati.

Di seguito sono riportati alcuni esempi di modifiche:

  • Transcodifica : se il contenuto dell'evento (detto anche "corpo" o "payload") arriva dall'origine codificato usando il formato Apache Avro o un formato di serializzazione proprietario, ma l'aspettativa del sistema proprietario della destinazione è che il contenuto venga codificato in FORMATO JSON , un'attività di replica di transcodifica deserializzerà prima il payload da Apache Avro in un oggetto grafico in memoria e quindi serializzerà tale grafico in JSON format per l'evento che viene inoltrato. La transcodifica include anche attività di compressione e decompressione del contenuto .
  • Trasformazione : gli eventi che contengono dati strutturati possono richiedere la rielaborazione dei dati per semplificare l'utilizzo da parte dei consumer downstream. Ciò può comportare operazioni come l'appiattimento di strutture annidate, l'eliminazione di elementi di dati estranei o la rielaborazione del payload per adattarsi esattamente a uno schema specificato.
  • Invio in batch: gli eventi possono essere ricevuti in batch (più eventi in un singolo trasferimento) da un'origine, ma devono essere inoltrati singolarmente a una destinazione o viceversa. Un'attività può quindi inoltrare più eventi in base a un singolo trasferimento di eventi di input o aggregare un set di eventi che vengono quindi trasferiti insieme.
  • Convalida : i dati degli eventi provenienti da origini esterne spesso devono essere verificati se sono conformi a un set di regole prima che vengano inoltrati. Le regole possono essere espresse usando schemi o codice. Gli eventi che non devono essere conformi possono essere eliminati, con il problema annotato nei log o possono essere inoltrati a una destinazione speciale per gestirli ulteriormente.
  • Arricchimento : i dati degli eventi provenienti da alcune origini possono richiedere l'arricchimento con ulteriore contesto perché possano essere utilizzabili nei sistemi di destinazione. Ciò può comportare la ricerca di dati di riferimento e l'incorporamento di tali dati con l'evento o l'aggiunta di informazioni sull'origine nota all'attività di replica, ma non contenute negli eventi.
  • Filtro: alcuni eventi in arrivo da un'origine potrebbero dover essere trattenuti dalla destinazione in base a una regola. Un filtro testa l'evento in base a una regola e elimina l'evento se l'evento non corrisponde alla regola. Filtrare gli eventi duplicati osservando determinati criteri ed eliminando gli eventi successivi con gli stessi valori è una forma di filtro.
  • Crittografia : un'attività di replica potrebbe dover decrittografare il contenuto in arrivo dall'origine e/o crittografare il contenuto inoltrato in avanti a una destinazione e/o deve verificare l'integrità del contenuto e dei metadati rispetto a una firma inserita nell'evento o allegare tale firma.
  • Attestazione : un'attività di replica può allegare metadati, potenzialmente protetti da una firma digitale, a un evento che attesta che l'evento è stato ricevuto tramite un canale specifico o in un momento specifico.
  • Concatenamento : un'attività di replica può applicare firme a flussi di eventi in modo che l'integrità del flusso sia protetta e che gli eventi mancanti possano essere rilevati.

I modelli di trasformazione, invio in batch e arricchimento sono in genere implementati meglio con i processi di Analisi di flusso di Azure .

Tutti questi modelli possono essere implementati usando Funzioni di Azure, usando il trigger di Hub eventi per l'acquisizione di eventi e l'associazione di output dell'hub eventi per la distribuzione.

Routing

Il modello di routing si basa sul modello di replica , ma invece di avere un'origine e una destinazione, l'attività di replica ha più destinazioni, illustrate qui in C#:

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

La funzione di routing considererà i metadati del messaggio e/o il payload del messaggio e quindi scegli una delle destinazioni disponibili a cui inviare.

In Analisi di flusso di Azure è possibile ottenere lo stesso risultato con la definizione di più output e quindi l'esecuzione di una query per ogni output.

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

Proiezione log

Il modello di proiezione del log appiattisce il flusso di eventi in un database indicizzato, con eventi che diventano record nel database. In genere, gli eventi vengono aggiunti alla stessa raccolta o tabella e la chiave di partizione dell'hub eventi diventa parte della chiave primaria che cerca di rendere univoco il record.

La proiezione del log può produrre uno storico della serie temporale dei dati dell'evento o una visualizzazione compattata, in cui solo l'evento più recente viene mantenuto per ogni chiave di partizione. La forma del database di destinazione è in definitiva fino alle esigenze dell'applicazione. Questo modello viene anche definito "origine eventi".

Suggerimento

È possibile creare facilmente proiezioni di log in database Azure SQL e Azure Cosmos DB in Analisi di flusso di Azure e si preferisce tale opzione.

La funzione di Azure seguente proietta il contenuto di un hub eventi compattato in un insieme Azure Cosmos DB.

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

Passaggi successivi