Elaborazione di flussi con Analisi di flusso di Azure

Cosmos DB
Hub eventi
Monitoraggio
Analisi di flusso

Questa architettura di riferimento illustra una pipeline di elaborazione di flussi end-to-end. La pipeline inserisce dati da due origini, esegue la correlazione tra i record nei due flussi e calcola una media mobile in un intervallo di tempo. I risultati vengono archiviati per analisi aggiuntive.

GitHub logo Un'implementazione di riferimento per questa architettura è disponibile in GitHub.

Architettura di riferimento per la creazione di una pipeline di elaborazione di flussi con Analisi di flusso di Azure

Scenario: una società di taxi raccoglie dati su ogni corsa. Per questo scenario si presuppone che siano presenti due dispositivi diversi che inviano dati. Il taxi ha un contatore che invia informazioni su ogni corsa, ovvero la durata, la distanza e le posizioni di partenza e partenza. Un dispositivo separato accetta i pagamenti dai clienti e invia dati sui prezzi delle corse. La società di taxi vuole calcolare la mancia media per miglia guidate, in tempo reale, per individuare le tendenze.

Architettura

L'architettura è costituita dai componenti seguenti.

Origini dati. In questa architettura sono presenti due origini dati che generano flussi di dati in tempo reale. Il primo flusso contiene le informazioni sulla corsa e il secondo contiene le informazioni sui costi delle corse. L'architettura di riferimento include un generatore di dati simulato che legge dati da un set di file statici ed esegue il push dei dati in Hub eventi. In un'applicazione reale le origini dati corrisponderebbero a dispositivi installati nei taxi.

Hub eventi di Azure. Hub eventi è un servizio di inserimento di eventi. Questa architettura usa due istanze di Hub eventi, una per ogni origine dati. Ogni origine dati invia un flusso di dati all'istanza associata di Hub eventi.

Analisi di flusso di Azure. Analisi di flusso è un motore di elaborazione di eventi. Un processo di Analisi di flusso legge i flussi di dati dalle due istanze di Hub eventi ed esegue l'elaborazione dei flussi.

Cosmos database. L'output dal processo di Analisi di flusso è una serie di record, scritti sotto forma di documento JSON in un database di documenti di Cosmos DB.

Microsoft Power BI. Power BI è una suite di strumenti di analisi aziendale che consente di analizzare dati e condividere informazioni dettagliate. In questa architettura carica i dati da Cosmos DB. Ciò consente agli utenti di analizzare il set completo di dati cronologici raccolti. È anche possibile trasmettere i risultati direttamente da Analisi di flusso a Power BI per una visualizzazione in tempo reale dei dati. Per altre informazioni, vedere Streaming in tempo reale in Power BI.

Monitoraggio di Azure. Monitoraggio di Azure raccoglie le metriche relative alla prestazioni dei servizi di Azure distribuiti nella soluzione. La visualizzazione delle metriche in un dashboard consente di ottenere informazioni dettagliate sull'integrità della soluzione.

Inserimento dati

Per simulare un'origine dati, questa architettura di riferimento usa il set di dati New York City Taxi Data[1]. Questo set di dati contiene dati sulle corse dei taxi a New York city in un periodo di quattro anni (2010-2013). Contiene due tipi di record: dati di corsa e dati tariffario. I dati relativi alle corsa includono la durata della corsa, la distanza percorsa e le ubicazioni di salita e discesa del cliente. I dati relativi ai costi della corsa includono gli importi relativi a costo di base, imposte e mancia. I campi comuni in entrambi i tipi di record includono il numero di taxi, il numero di licenza e l'ID del fornitore. Questi tre campi identificano in modo univoco un taxi e un tassista. I dati vengono archiviati in formato CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Il generatore di dati è un'applicazione .NET Core che legge i record e li invia a Hub eventi di Azure. Il generatore invia i dati relativi alle corse in formato JSON e i dati relativi ai costi in formato CSV.

Hub eventi usa partizioni per segmentare i dati. Le partizioni consentono a un consumer di leggere ogni partizione in parallelo. Quando si inviano dati a Hub eventi, è possibile specificare in modo esplicito la chiave di partizione. In caso contrario, i record vengono assegnati alle partizioni in modalità round-robin.

In questo scenario specifico i dati relativi alle corse e i dati relativi ai costi devono avere lo stesso ID di partizione per un taxi specifico. Ciò consente ad Analisi di flusso di applicare un certo livello di parallelismo durante la correlazione dei due flussi. Un record nella partizione n dei dati relativi alle corse corrisponderà a un record nella partizione n dei dati relativi ai costi.

Diagramma di elaborazione dei flussi con Analisi di flusso di Azure e Hub eventi di Azure

Nel generatore di dati il modello di dati comune per entrambi i tipi di record ha una proprietà PartitionKey che corrisponde alla concatenazione di Medallion, HackLicense e VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Questa proprietà viene usata per fornire una chiave di partizione esplicita durante l'invio a Hub eventi:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Elaborazione dei flussi

Il processo di elaborazione del flusso viene definito tramite una query SQL con alcuni passaggi distinti. I primi due passaggi consentono semplicemente di selezionare record dai due flussi di input.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

Il passaggio successivo crea un join tra i due flussi di input per selezionare i record corrispondenti da ogni flusso.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Questa query unisce i record in un set di campi che identificano in modo univoco i record corrispondenti ( PartitionId e PickupTime ).

Nota

Si vuole che TaxiRide i flussi e siano uniti dalla TaxiFare combinazione univoca di Medallion , e HackLicenseVendorIdPickupTime . In questo caso, vengono illustrati i campi e , ma questa operazione non deve essere PartitionIdMedallion eseguita come in HackLicenseVendorId genere.

In Analisi di flusso i join sono temporali, ovvero viene creato un join dei record entro un intervallo di tempo specifico. In caso contrario è possibile che il processo rimanga in attesa di una corrispondenza per un periodo illimitato. La funzione DATEDIFF specifica la distanza temporale massima consentita tra due record corrispondenti per permettere l'individuazione di una corrispondenza.

L'ultimo passaggio del processo calcola la mancia media per miglio, con un raggruppamento basato su una finestra di salto di 5 minuti.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Analisi di flusso offre diverse funzioni finestra. Una finestra di salto si sposta in avanti nel tempo in base a un periodo fisso, in questo caso 1 minute per hop. Il risultato consiste nel calcolare una media mobile negli ultimi 5 minuti.

Nell'architettura mostrata qui vengono salvati in Cosmos DB solo i risultati del processo di Analisi di flusso. Per uno scenario di Big Data, prendere in considerazione anche l'uso di Acquisizione di Hub eventi di Azure per salvare i dati non elaborati degli eventi nell'archivio BLOB di Azure. La conservazione dei dati non elaborati consentirà di eseguire query in batch sui dati cronologici in un momento successivo per ottenere nuove informazioni dettagliate dai dati.

Considerazioni sulla scalabilità

Hub eventi

La capacità di elaborazione di Hub eventi viene misurata in unità elaborate. È possibile ridimensionare automaticamente un hub eventi abilitando l'aumento automatico, che ridimensiona automaticamente le unità elaborate in base al traffico, fino a un limite massimo configurato.

Analisi di flusso

Per Analisi di flusso le risorse di calcolo allocate a un processo vengono misurate in unità di streaming. I processi di Analisi di flusso vengono ridimensionati in modo ottimale se è possibile eseguire il processo in parallelo. In questo modo Analisi di flusso può distribuire il processo tra più nodi di calcolo.

Per l'input di Hub eventi è possibile usare la parola chiave PARTITION BY per partizionare il processo di Analisi di flusso. I dati verranno suddivisi in subset in base alle partizioni di Hub eventi.

Le funzioni finestra e i join temporali richiedono unità di streaming aggiuntive. Quando possibile, usare PARTITION BY in modo che ogni partizione venga elaborata separatamente. Per altre informazioni, vedere Informazioni e modifica delle unità di streaming.

Se non è possibile eseguire in parallelo l'intero processo di Analisi di flusso, provare a suddividere il processo in più passaggi, a partire da uno o più passaggi paralleli. In questo modo è possibile eseguire in parallelo i primi passaggi. In questa architettura di riferimento, ad esempio:

  • I passaggi 1 e 2 sono semplici istruzioni SELECT che selezionano record entro una singola partizione.
  • Il passaggio 3 crea un join partizionato tra due flussi di input. Questo passaggio sfrutta il fatto che i record corrispondenti condividono la stessa chiave di partizione e quindi hanno sicuramente lo stesso ID partizione in ogni flusso di input.
  • Il passaggio 4 esegue l'aggregazione in tutte le partizioni. Questo passaggio non può essere eseguito in parallelo.

È possibile usare il diagramma del processo di Analisi di flusso per visualizzare il numero di partizioni assegnate a ogni passaggio nel processo. Il diagramma seguente mostra il diagramma del processo per questa architettura di riferimento:

Diagramma del processo

Cosmos DB

La capacità di elaborazione per Cosmos DB viene misurata in unità richiesta (UR). Per ridimensionare un contenitore Cosmos DB fino a un valore superiore a 10.000 UR, è necessario specificare una chiave di partizione quando si crea il contenitore e includere la chiave di partizione in ogni documento.

In questa architettura di riferimento vengono creati nuovi documenti solo una volta al minuto (intervallo della finestra di salto), quindi i requisiti per la velocità effettiva sono abbastanza bassi. Per questo motivo non è necessario una chiave di partizione in questo scenario.

Considerazioni sul monitoraggio

Con qualsiasi soluzione di elaborazione di flussi è importante monitorare le prestazioni e l'integrità del sistema. Monitoraggio di Azure raccoglie le metriche e i log di diagnostica per i servizi di Azure usati nell'architettura. Monitoraggio di Azure è incorporato nella piattaforma Azure e non richiede codice aggiuntivo nell'applicazione.

I segnali di avviso seguenti indicano che è necessario aumentare il numero di risorse di Azure rilevanti:

  • Hub eventi limita le richieste o è vicino alla quota giornaliera di messaggi.
  • Il processo di Analisi di flusso usa più dell'80% di unità di streaming allocate.
  • Cosmos DB inizia a limitare le richieste.

L'architettura di riferimento include un dashboard personalizzato, che viene distribuito nel portale di Azure. Dopo aver distribuito l'architettura, è possibile visualizzare il dashboard aprendo il portale di Azure e selezionandolo dall'elenco dei dashboard. Per altre informazioni sulla creazione e la distribuzione di dashboard personalizzati nel portale di Azure, vedere Creare dashboard di Azure a livello di codice.

L'immagine seguente mostra il dashboard dopo l'esecuzione di Analisi di flusso per circa un'ora.

Screenshot del dashboard relativo alle corse in taxi

Il pannello in basso a sinistra mostra l'incremento dell'utilizzo di unità di streaming per il processo di Analisi di flusso durante i primi 15 minuti e quindi la stabilizzazione del valore. Questo è uno schema tipico quando il processo raggiunge uno stato stabile.

Si noti che Hub eventi limita le richieste, come mostrato nel pannello in alto a destra. Una richiesta limitata occasionalmente non costituisce un problema, perché l'SDK del client di Hub eventi esegue automaticamente nuovi tentativi quando riceve un errore di limitazione. Se tuttavia vengono visualizzati errori di limitazione continui, Hub eventi necessita di altre unità elaborate. Il grafico seguente mostra un'esecuzione dei test che usa la funzionalità di aumento automatico di Hub eventi, che aumenta automaticamente le unità elaborate in base alla necessità.

Screenshot di Hub eventi durante il revisionamento automatico

L'aumento automatico è stato abilitato in corrispondenza del contrassegno 06:35. È possibile notare il calo nelle richieste limitate, perché Hub eventi ha eseguito il ridimensionamento automatico fino a 3 unità elaborate.

È interessante notare che ciò comporta l'effetto collaterale dell'incremento dell'utilizzo di unità di streaming nel processo di Analisi di flusso. La limitazione consente a Hub eventi di ridurre in modo artificiale la frequenza di inserimento per il processo di Analisi di flusso. La risoluzione di un collo di bottiglia delle prestazioni rivela spesso un altro collo di bottiglia. In questo caso l'allocazione di unità di streaming aggiuntive per il processo di Analisi di flusso ha consentito di risolvere il problema.

Considerazioni sul costo

Usare il calcolatore dei prezzi di Azure per stimare i costi. Ecco alcune considerazioni per i servizi usati in questa architettura di riferimento.

Analisi di flusso di Azure

Analisi di flusso di Azure il prezzo è in base al numero di unità di streaming ($0,11/ora) necessarie per elaborare i dati nel servizio.

Analisi di flusso può essere costoso se non si elaborano i dati in tempo reale o in piccole quantità di dati. Per questi casi d'uso, è consigliabile usare Funzioni di Azure o App per la logica per spostare i dati da Hub eventi di Azure a un archivio dati.

Hub eventi di Azure e Azure Cosmos DB

Per considerazioni sui costi relativi Hub eventi di Azure e Cosmos database, vedere Considerazioni sui costi vedere Elaborazione di flussi con Azure Databricks di riferimento.

Distribuire la soluzione

Per distribuire ed eseguire l'implementazione di riferimento, seguire la procedura illustrata nel file README in GitHub.

Considerazioni su DevOps

  • Creare gruppi di risorse separati per gli ambienti di produzione, sviluppo e test. L'uso di gruppi di risorse separati semplifica la gestione delle distribuzioni, l'eliminazione delle distribuzioni di test e l'assegnazione dei diritti di accesso.

  • Usare Azure Resource Manager modello per distribuire le risorse di Azure seguendo il processo di infrastruttura come codice (IaC). Con i modelli, l'automazione delle distribuzioni Azure DevOps Serviceso altre soluzioni CI/CD è più semplice.

  • Inserire ogni carico di lavoro in un modello di distribuzione separato e archiviare le risorse nei sistemi di controllo del codice sorgente. È possibile distribuire i modelli insieme o singolarmente come parte di un processo CI/CD, semplificando il processo di automazione.

    In questa architettura i Hub eventi di Azure, Log Analytics e Cosmos database vengono identificati come un singolo carico di lavoro. Queste risorse sono incluse in un singolo modello di Arm.

  • Prendere in considerazione la gestione temporanea dei carichi di lavoro. Eseguire la distribuzione in varie fasi ed eseguire i controlli di convalida in ogni fase prima di passare alla fase successiva. In questo modo è possibile eseguire il push degli aggiornamenti negli ambienti di produzione in modo altamente controllato e ridurre al minimo i problemi di distribuzione imprevisti.

  • È consigliabile Monitoraggio di Azure per analizzare le prestazioni della pipeline di elaborazione dei flussi. Per altre informazioni, vedere Monitoraggio Azure Databricks.

Per altre informazioni, vedere la sezione DevOps in Microsoft Azure Well-Architected Framework.

Può essere utile esaminare gli scenari di esempio di Azure seguenti, che illustrano soluzioni specifiche usando alcune delle stesse tecnologie: