Zpracování streamů s využitím Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Tato referenční architektura ukazuje kompletní kanál zpracování datových proudů. Tento typ kanálu má čtyři fáze: ingestování, zpracování, ukládání a analýzu a vytváření sestav. V této referenční architektuře kanál ingestuje data ze dvou zdrojů, provádí spojení souvisejících záznamů z každého datového proudu, rozšiřuje výsledek a vypočítá průměr v reálném čase. Výsledky se ukládají pro další analýzu.

GitHub logo Referenční implementace pro tuto architekturu je k dispozici na GitHubu.

Architektura

Diagram showing a reference architecture for stream processing with Azure Databricks.

Stáhněte si soubor Visia této architektury.

Workflow

Tato architektura se skládá z následujících součástí:

Zdroje dat. V této architektuře existují dva zdroje dat, které generují datové proudy v reálném čase. První proud obsahuje informace o jízdě a druhý obsahuje informace o jízdě. Referenční architektura zahrnuje simulovaný generátor dat, který čte ze sady statických souborů a odesílá data do služby Event Hubs. Zdroje dat v reálné aplikaci by byly zařízení nainstalovaná v taxislužbách.

Azure Event Hubs. Event Hubs je služba pro příjem událostí. Tato architektura používá dvě instance centra událostí, jednu pro každý zdroj dat. Každý zdroj dat odešle datový proud do přidruženého centra událostí.

Azure Databricks. Databricks je analytická platforma založená na Apache Sparku optimalizovaná pro platformu cloudových služeb Microsoft Azure. Databricks se používá ke korelaci jízdy taxíkem a dat jízdy jízdy a jízdy a také k obohacení korelovaných dat o sousední data uložená v systému souborů Databricks.

Azure Cosmos DB. Výstup úlohy Azure Databricks je řada záznamů, které se zapisují do služby Azure Cosmos DB pro Apache Cassandra. Azure Cosmos DB pro Apache Cassandra se používá, protože podporuje modelování dat časových řad.

Azure Log Analytics. Data protokolu aplikací shromážděná službou Azure Monitor se ukládají do pracovního prostoru služby Log Analytics. Dotazy Log Analytics je možné použít k analýze a vizualizaci metrik a kontrole zpráv protokolu za účelem identifikace problémů v aplikaci.

Alternativy

  • Synapse Link je upřednostňovaným řešením Microsoftu pro analýzu nad daty Azure Cosmos DB.

Podrobnosti scénáře

Scénář: Taxislužba shromažďuje data o každé jízdě taxíkem. V tomto scénáři předpokládáme, že existují dvě samostatná zařízení, která odesílají data. Taxi má měřič, který odesílá informace o každé jízdě – dobu trvání, vzdálenost a vyzvednutí a odkládací místa. Samostatné zařízení přijímá platby od zákazníků a odesílá data o jízdné. Aby bylo potřeba zjistit trendy ridershipu, chce taxislužba vypočítat průměrný tip na míle řízený v reálném čase pro každou čtvrť.

Potenciální případy použití

Toto řešení je optimalizované pro maloobchod.

Příjem dat

K simulaci zdroje dat tato referenční architektura používá datovou sadu dat taxislužby New Yorku[1].. Tato datová sada obsahuje data o jízdách taxíkem v New Yorku za čtyři roky (2010 – 2013). Obsahuje dva typy záznamů: data jízdy a údaje o jízdě. Data o jízdě zahrnují dobu jízdy, vzdálenost jízdy a vyzvednutí a odkládací místo. Údaje o jízdné zahrnují ceny jízdné, daně a tipové částky. Mezi běžná pole v obou typech záznamů patří číslo medailiónu, licence hacku a ID dodavatele. Společně tato tři pole jednoznačně identifikují taxi a řidiče. Data se ukládají ve formátu CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois ve společnosti Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Generátor dat je aplikace .NET Core, která čte záznamy a odesílá je do služby Azure Event Hubs. Generátor odesílá data jízdy ve formátu JSON a data jízdného ve formátu CSV.

Služba Event Hubs používá k segmentování dat oddíly . Oddíly umožňují příjemci číst každý oddíl paralelně. Když odesíláte data do služby Event Hubs, můžete klíč oddílu explicitně zadat. V opačném případě se záznamy přiřazují k oddílům způsobem kruhového dotazování.

V tomto scénáři by data jízdy a jízdné měly skončit se stejným ID oddílu pro danou taxislužba. Databricks tak může při korelaci těchto dvou datových proudů použít stupeň paralelismu. Záznam v oddílu n dat o jízdě bude odpovídat záznamu v oddílu n dat jízdy.

Diagram of stream processing with Azure Databricks and Event Hubs.

Stáhněte si soubor Visia této architektury.

V generátoru dat má společný datový model pro oba typy PartitionKey záznamů vlastnost, která je zřetězením Medallion, HackLicensea VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Tato vlastnost se používá k poskytnutí explicitního klíče oddílu při odesílání do služby Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Kapacita propustnosti služby Event Hubs se měří v jednotkách propustnosti. Centrum událostí můžete automaticky škálovat tak, že povolíte automatické nafouknutí, které automaticky škáluje jednotky propustnosti na základě provozu až na nakonfigurované maximum.

Zpracování datových proudů

V Azure Databricks provádí zpracování dat úloha. Úloha se přiřadí ke clusteru a spustí se v clusteru. Úloha může být buď vlastní kód napsaný v Javě, nebo poznámkový blok Sparku.

V této referenční architektuře je úloha archiv Java s třídami napsanými v Javě i Scala. Při zadávání archivu Java pro úlohu Databricks je třída určena ke spuštění clusterem Databricks. Zde hlavní metoda com.microsoft.pnp.TaxiCabReader třídy obsahuje logiku zpracování dat.

Čtení datového proudu ze dvou instancí centra událostí

Logika zpracování dat používá strukturované streamování Sparku ke čtení ze dvou instancí centra událostí Azure:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Obohacení dat informacemi o sousedství

Data o jízdě zahrnují souřadnice zeměpisné šířky a délky vyzvednutí a odkládacích míst. I když jsou tyto souřadnice užitečné, nejsou snadno využity k analýze. Tato data jsou proto rozšířena o data sousedství, která se čtou ze souboru obrazce.

Formát shapefile je binární a není snadno parsován, ale knihovna GeoTools poskytuje nástroje pro geoprostorová data, která používají formát shapefile. Tato knihovna se používá ve třídě com.microsoft.pnp.GeoFinder k určení názvu sousedství na základě souřadnic vyzvednutí a vyřazení.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Připojení k datům jízdy a jízdy

Nejprve se transformují data jízdy a jízdy:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

A pak se data jízdy spojí s daty jízdného:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Zpracování dat a vložení do služby Azure Cosmos DB

Průměrná částka jízdného pro každou čtvrť se vypočítá pro daný časový interval:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Který se pak vloží do služby Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Důležité informace

Tyto aspekty implementují pilíře dobře architektuře Azure, což je sada hlavních principů, které je možné použít ke zlepšení kvality úlohy. Další informace naleznete v tématu Microsoft Azure Well-Architected Framework.

Zabezpečení

Zabezpečení poskytuje záruky proti záměrným útokům a zneužití cenných dat a systémů. Další informace najdete v tématu Přehled pilíře zabezpečení.

Přístup k pracovnímu prostoru Azure Databricks se řídí pomocí konzoly správce. Konzola správce obsahuje funkce pro přidání uživatelů, správu uživatelských oprávnění a nastavení jednotného přihlašování. Řízení přístupu pro pracovní prostory, clustery, úlohy a tabulky je také možné nastavit prostřednictvím konzoly správce.

Správa tajných kódů

Azure Databricks obsahuje úložiště tajných kódů, které slouží k ukládání tajných kódů, včetně připojovací řetězec, přístupových klíčů, uživatelských jmen a hesel. Tajné kódy v úložišti tajných kódů Azure Databricks jsou rozdělené podle oborů:

databricks secrets create-scope --scope "azure-databricks-job"

Tajné kódy se přidají na úrovni oboru:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Poznámka:

Místo nativního oboru Azure Databricks je možné použít obor založený na službě Azure Key Vault. Další informace najdete v oborech založených na službě Azure Key Vault.

V kódu jsou tajné kódy přístupné prostřednictvím nástrojů pro tajné kódy Azure Databricks.

Sledování

Služba Azure Databricks je založená na Apache Sparku a obě používají log4j jako standardní knihovnu pro protokolování. Kromě výchozího protokolování poskytovaného Apache Sparkem můžete implementovat protokolování do Azure Log Analytics podle článku Monitorování Azure Databricks.

Protože com.microsoft.pnp.TaxiCabReader třídy zpracovává jízdy a jízdné zprávy, je možné, že jeden z nich může být poškozený, a proto není platný. V produkčním prostředí je důležité analyzovat tyto poškozené zprávy a identifikovat problém se zdroji dat, aby bylo možné je rychle opravit, aby se zabránilo ztrátě dat. Třída com.microsoft.pnp.TaxiCabReader registruje Apache Spark Akumulátor, který sleduje počet poškozených jízdných záznamů a jízdy:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark používá k odesílání metrik knihovnu Dropwizard a některá nativní pole metrik Dropwizard nejsou kompatibilní s Azure Log Analytics. Tato referenční architektura proto zahrnuje vlastní jímku Dropwizard a reporter. Formátuje metriky ve formátu očekávaném službou Azure Log Analytics. Když Apache Spark hlásí metriky, posílají se také vlastní metriky pro špatně zformulovaná data jízdy a jízdného.

Následuje příklad dotazů, které můžete použít v pracovním prostoru služby Azure Log Analytics k monitorování provádění úlohy streamování. Argument ago(1d) v každém dotazu vrátí všechny záznamy vygenerované v posledním dni a dá se upravit tak, aby se zobrazilo jiné časové období.

Výjimky zaprotokolované během provádění dotazu streamu

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Akumulace poškozených jízdných dat a dat o jízdě

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Provádění úloh v průběhu času

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Další informace najdete v tématu Monitorování Azure Databricks.

DevOps

  • Vytvořte samostatné skupiny prostředků pro produkční, vývojové a testovací prostředí. Samostatné skupiny prostředků usnadňují správu nasazení, odstraňování testovacích nasazení a přiřazování přístupových práv.

  • Pomocí šablony Azure Resource Manageru nasaďte prostředky Azure po procesu infrastruktury jako kódu (IaC). Díky šablonám je snazší automatizovat nasazení pomocí Azure DevOps Services nebo jiných řešení CI/CD.

  • Každou úlohu umístěte do samostatné šablony nasazení a uložte prostředky do systémů správy zdrojového kódu. Šablony můžete nasadit společně nebo jednotlivě jako součást procesu CI/CD, což usnadňuje proces automatizace.

    V této architektuře jsou služby Azure Event Hubs, Log Analytics a Azure Cosmos DB označené jako jedna úloha. Tyto prostředky jsou součástí jedné šablony ARM.

  • Zvažte přípravu úloh. Nasaďte je do různých fází a před přechodem na další fázi spusťte kontroly ověřování v každé fázi. Díky tomu můžete odesílat aktualizace do produkčních prostředí vysoce kontrolovaným způsobem a minimalizovat neočekávané problémy s nasazením.

    V této architektuře existuje několik fází nasazení. Zvažte vytvoření kanálu Azure DevOps a přidání těchto fází. Tady je několik příkladů fází, které můžete automatizovat:

    • Spuštění clusteru Databricks
    • Konfigurace rozhraní příkazového řádku Databricks
    • Instalace nástrojů Scala
    • Přidání tajných kódů Databricks

    Zvažte také psaní automatizovaných integračních testů za účelem zlepšení kvality a spolehlivosti kódu Databricks a jeho životního cyklu.

  • Zvažte použití služby Azure Monitor k analýze výkonu kanálu zpracování datových proudů. Další informace najdete v tématu Monitorování Azure Databricks.

Další informace najdete v části DevOps v architektuře Microsoft Azure Well-Architected Framework.

Optimalizace nákladů

Optimalizace nákladů se zabývá způsoby, jak snížit zbytečné výdaje a zlepšit efektivitu provozu. Další informace najdete v tématu Přehled pilíře optimalizace nákladů.

K odhadu nákladů použijte cenovou kalkulačku Azure. Tady je několik důležitých informací o službách používaných v této referenční architektuře.

Event Hubs

Tato referenční architektura nasadí službu Event Hubs na úrovni Standard . Cenový model je založený na jednotkách propustnosti, událostech příchozího přenosu dat a zachytávání událostí. Událost příchozího přenosu dat je jednotka dat o velikosti 64 kB nebo menší. Větší zprávy se účtují v násobcích po 64 kB. Jednotky propustnosti zadáte prostřednictvím webu Azure Portal nebo rozhraní API pro správu služby Event Hubs.

Pokud potřebujete více dnů uchovávání, zvažte úroveň Dedicated . Tato úroveň nabízí nasazení s jedním tenantem s nejnáročnějšími požadavky. Tato nabídka vytvoří cluster založený na jednotkách kapacity (CU), které nejsou vázané jednotkami propustnosti.

Úroveň Standard se také účtuje na základě událostí příchozího přenosu dat a jednotek propustnosti.

Informace o cenách služby Event Hubs najdete v cenách služby Event Hubs.

Azure Databricks

Azure Databricks nabízí dvě úrovně Standard a Premium , které podporují tři úlohy. Tato referenční architektura nasadí pracovní prostor Azure Databricks na úrovni Premium .

Datoví technici úlohy Light a Datoví technici jsou určené datovým inženýrům k vytváření a spouštění úloh. Úloha Analýza dat je určená pro datové vědce, aby mohli interaktivně zkoumat, vizualizovat, manipulovat s nimi a sdílet data a přehledy.

Azure Databricks nabízí mnoho cenových modelů.

  • Plán průběžných plateb

    Na základě vybrané instance virtuálního počítače se vám účtují virtuální počítače zřízené v clusterech a jednotkách Databricks (DBU). DBU je jednotka výpočetního výkonu, účtovaná po sekundách. Spotřeba jednotek DBU závisí na velikosti a typu instance, na které běží Azure Databricks. Ceny budou záviset na vybrané úloze a úrovni.

  • Plán před nákupem

    Do jednotek Azure Databricks (DBU) se zavazujete jako jednotky potvrzení Databricks (DBCU) po dobu jednoho nebo tří let. Ve srovnání s modelem průběžných plateb můžete ušetřit až 37 %.

Další informace najdete v tématu Azure Databricks – ceny.

Azure Cosmos DB

V této architektuře se řada záznamů zapisuje do služby Azure Cosmos DB úlohou Azure Databricks. Za kapacitu, kterou si rezervujete, se účtuje v jednotkách žádostí za sekundu (RU/s), která se používá k provádění operací vložení. Jednotka pro fakturaci je 100 RU/s za hodinu. Například náklady na zápis 100 kB položek jsou 50 RU/s.

V případě operací zápisu zřiďte dostatečnou kapacitu pro podporu počtu zápisů potřebných za sekundu. Zřízenou propustnost můžete zvýšit pomocí portálu nebo Azure CLI před provedením operací zápisu a po dokončení těchto operací snížit propustnost. Propustnost pro období zápisu je minimální propustnost potřebná pro daná data a propustnost vyžadovaná pro operaci vložení za předpokladu, že neběží žádná jiná úloha.

Příklad analýzy nákladů

Předpokládejme, že pro kontejner nakonfigurujete hodnotu propustnosti 1 000 RU/s. Je nasazená po dobu 24 hodin po dobu 30 dnů, celkem 720 hodin.

Kontejner se účtuje za 10 jednotek 100 RU/s za hodinu za každou hodinu. 10 jednotek v hodnotě 0,008 USD (za 100 RU/s za hodinu) se účtují 0,08 USD za hodinu.

Za 720 hodin nebo 7 200 jednotek (100 RU) se vám účtuje 57,60 USD za měsíc.

Úložiště se také účtuje za každou GB použitou pro uložená data a index. Další informace najdete v cenovém modelu služby Azure Cosmos DB.

Pomocí kalkulačky kapacity Služby Azure Cosmos DB získáte rychlý odhad nákladů na úlohy.

Další informace najdete v části věnované nákladům v tématu Dobře navržená architektura Microsoft Azure.

Nasazení tohoto scénáře

Pokud chcete nasadit a spustit referenční implementaci, postupujte podle kroků v souboru readme GitHubu.

Další kroky