Szerkesztés

Share via


Delta-tó létrehozása alkalmi lekérdezések támogatására online szabadidős és utazási foglalás esetén

Azure Event Hubs
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

Ez az architektúra egy delta lake-példát kínál az utazásfoglaláshoz, ahol nagy mennyiségű nyers dokumentum jön létre nagy gyakorisággal.

Az Apache® és az Apache Spark™ az Apache Software Foundation bejegyzett védjegyei vagy védjegyei a Egyesült Államok és/vagy más országokban. Az Apache Software Foundation nem támogatja ezeket a jeleket.

Architektúra

A Delta Lake architektúrájának diagramja.

Töltse le az architektúra Visio-fájlját.

A szabadidős és utazási foglalási forgatókönyvek nagy mennyiségű nyers dokumentumot hozhatnak létre nagy gyakorisággal. Előfordulhat azonban, hogy nem kell a dokumentumok teljes tartalmát indexelnie. Előfordulhat például, hogy a felhasználóknak egy ismert tranzakcióazonosító vagy ügyfélnév alapján kell keresniük egy adott dátumon, hogy lekérjenek egy számukra érdekes dokumentumkészletet.

Adatfolyam

Az architektúra mögötti fogalom a nem használt adatokból való kereséshez hasznos metaadatok leválasztása:

  • Csak a metaadatok lesznek indexelve egy lekérdezhető szolgáltatásban (például a Sparkban), míg a tényleges adatok egy adattóban vannak tárolva.
  • A data lake-ben lévő nyers dokumentumok elérési útjuk alapján indexelt metaadatokhoz vannak társítva.
  • Dokumentumok lekérdezésekor a szolgáltatás megkeresi a dokumentumok metaadatait, és a tényleges dokumentumokat az elérési útjuk alapján kéri le a data lake-ből.

Ez a megoldás jelentősen csökkenti a költségeket és növeli a teljesítményt, mivel a metaadatok a teljes adattulajdon töredékét alkotják (például petabájtnyi nyers dokumentumot több tíz gigabájtnyi tömör metaadattal lehet leírni).

Emellett az ilyen típusú forgatókönyvek tipikus kihívása, hogy a korábbi mélységi és valós idejű követelmények egységes, könnyen karbantartható, nagy teljesítményű rendszerbe keveredjenek. A Delta Lake architektúrája választ ad erre a kihívásra.

Összetevők

Azure-alkalmazás szolgáltatás egy szolgáltatásként nyújtott platform (PaaS) az alkalmazások felügyelt virtuális gépeken való létrehozásához és üzemeltetéséhez. Az App Service kezeli a mögöttes számítási infrastruktúrát, amelyen az alkalmazások futnak, és az erőforrás-használati kvóták és alkalmazásmetrikák monitorozását, a diagnosztikai adatok naplózását és a metrikákon alapuló riasztásokat biztosít.

Az Azure Data Factory az Azure felhőbeli kinyerési, átalakító és betöltési (ETL) szolgáltatása a kiszolgáló nélküli adatintegráció és adatátalakítás vertikális felskálázásához. A szolgáltatás kódolás nélküli felhasználói felületet biztosít az intuitív tartalomkészítéshez, valamint az egy ablaktáblás monitorozáshoz és felügyelethez. A meglévő SQL Server Integration Services-csomagokat (SSIS) az Azure-ba is áthelyezheti, és teljes kompatibilitással futtathatja őket az Azure Data Factoryben.

Az Azure Data Lake Storage Gen2 az Azure Blob Storage-ra épülő big data-elemzési képességek készlete. A Data Lake Storage Gen2 az Azure Data Lake Storage Gen1 és az Azure Blob Storage képességeit konvergálja. A Data Lake Storage Gen2 például fájlrendszer-szemantikát, fájlszintű biztonságot és skálázást biztosít. Mivel ezek a képességek a Blob Storage-ra épülnek, alacsony költségű, rétegzett tárterületet is kap, magas rendelkezésre állási/vészhelyreállítási képességekkel.

Az Azure Event Hubs egy teljes mértékben felügyelt, valós idejű adatbetöltési szolgáltatás, amely egyszerű, megbízható és méretezhető. Segítségével bármely adatforrásból másodpercek alatt több millió eseményt streamelhet, amelyekből dinamikus adatfolyamatot alakíthat ki, hogy azonnal reagálhasson az üzleti kihívásokra.

Az Azure Databricks egy Apache Spark-alapú adatelemzési platform, amely a Microsoft Azure Cloud Serviceshez van optimalizálva. Az Azure Databricks három környezetet kínál az adatigényes alkalmazások fejlesztéséhez: Databricks SQL, Databricks Adattudomány > Engineering és Databricks Machine Tanulás.

Alternatívák

A metaadatok indexelésének alternatívájaként indexelheti a lekérdezési képességeket kínáló szolgáltatás összes nyers adatát, például az Azure Databrickset, az Azure Synapse Analyticset, az Azure Cognitive Searcht vagy az Azure Data Explorert. Ez a megközelítés azonnalibb, de figyeljen az adatméret, a teljesítménykövetelmények és a frissítés gyakoriságának együttes hatására, különösen költség szempontjából.

A Delta Lake használatával ellentétben a Lambda-architektúra a valós idejű adatokat az előzményadatoktól eltérő adattárban tárolja, és az ügyfél a logikát futtatva transzparenssé teszi a heterogén lekérdezéseket a felhasználó számára. Ennek a megoldásnak az előnye a használható szolgáltatások (például az Azure Stream Analytics és az Azure SQL Database) nagyobb készlete, de az architektúra összetettebbé válik, és a kódbázis költségesebbé válik.

A Spark az Azure Databricks, az Azure Synapse Analytics és az Azure HDInsight használatával van elosztva. Ezért ez az architektúra bármelyik Azure-adatszolgáltatással implementálható, lehetőleg a Delta Lake 0.8-at vagy 1.0-t támogató legújabb Spark-verzióval.

Forgatókönyv részletei

A nyers adatok láthatósága a szabadidős és utazási foglalási forgatókönyvekben több szereplő számára is fontos. A technikai támogatási csapatok felügyelik a valós idejű diagnosztikát, hogy folyamatosan monitorozzák a tranzakciók feldolgozását, és gyorsan reagáljanak a nem kívánt problémákra. Az adatmérnökök felügyelik az adatok exportálását az érdekelt felek áttekintéséhez és az elemzések valós idejű betöltéséhez. Az ügyféltámogatási csapatoknak előzmény- és legutóbbi adatokra van szükségük az ügyfelek kérdéseinek és panaszainak kezeléséhez. Végül a jogi csapatok biztosítják a megfelelőségi kötelezettségek betartását és a jogi lépéseket. Az ilyen típusú követelmények jellemzőek a külső szolgáltatókat összesítő és a felhasználói vásárlásokat kezelő piactereken. A szabadidős és utazási foglalási rendszerek például nem értik a felhasználókat és a szolgáltatókat a szolgáltatások keresésében, a szolgáltatóktól származó hasznos ajánlatok összesítésében és a felhasználói foglalások kezelésében.

Egy piactér diagramja szolgáltatókkal és B2B- és B2C-felhasználókkal.

Lehetséges használati esetek

Ez az architektúra ideális az utazási és vendéglátási iparágak számára. A következő forgatókönyvekre alkalmazható:

  • Gyorsan lekérte a valós idejű (például diagnosztikai) vagy a korábbi (megfelelőségi) nyers dokumentumokat az eredeti formátumban.
  • Petabájtnyi adat kezelése.
  • Másodperctartományos teljesítmény garantálása valós idejű diagnosztikához.
  • Egységes megközelítés elérése a valós idejű diagnosztikához, a korábbi lekérdezésekhez és a takarmányozási elemzésekhez.
  • Az alsóbb rétegbeli valós idejű elemzések betáplálása.
  • Költségek szabályozása.
  • Adatok beszerzése nyers dokumentumként (például json-, xml- vagy csv-fájlként).
  • Ha az adatok töredéke elegendő a lekérdezések leírásához.
  • Amikor a felhasználók teljes nyers dokumentumokat szeretnének lekérni.
  • Ha a teljes adatmérethez a rendszer a célár fölötti skálázásra lenne szükség.

Ez az architektúra nem feltétlenül megfelelő, ha:

  • Az adatok rekordhalmazokként lesznek insourcedva.
  • A felhasználóknak elemzéseket kell futtatniuk.
  • A felhasználók hajlandók saját csomagolt BI-eszközt használni.
  • Az adatok mérete költség szempontjából nem jelent kihívást.

A nyers dokumentumok nem feltétlenül szükségesek.

Megfontolások

Ezek a szempontok implementálják az Azure Well-Architected Framework alappilléreit, amely a számítási feladatok minőségének javítására használható vezérelvek halmaza. További információ: Microsoft Azure Well-Architected Framework.

Teljesítmény hatékonysága

A teljesítménybeli hatékonyság lehetővé teszi, hogy a számítási feladatok hatékonyan méretezhetők legyenek a felhasználók igényei szerint. További információ: Teljesítményhatékonysági pillér áttekintése.

A felhasználók dupla ugrást hajtanak végre az adatok eléréséhez. Először lekérdezik a metaadatokat, majd lekérik a kívánt dokumentumkészletet. Előfordulhat, hogy nehéz újra felhasználni a meglévő vagy csomagolt ügyféleszközöket.

Az Azure Data Lake Storage Gen2 három hozzáférési szintet biztosít: gyakori elérésű, ritka elérésű és archív. Olyan esetekben, amikor a dokumentumokat időnként lekérik, a ritka elérésű teljesítményszintnek a gyakori teljesítményszinthez hasonló teljesítményt kell biztosítania, de az alacsonyabb költségek előnyeivel. Olyan helyzetekben, ahol a lekérés valószínűsége nagyobb az újabb adatokkal, fontolja meg a ritka elérésű és a gyakori elérésű szintek keverését. Az archív szintű tároló használata alternatívát jelenthet a szigorú törlésre, valamint az adatok méretének csökkentésére azáltal, hogy csak hasznos információkat vagy több összesített adatot tárol.

A data lake valószínűleg petabájtnyi adatot fog kezelni, ezért az adatmegőrzési szabályzatok általában érvényesek. Adatszabályozási megoldásokat kell alkalmazni az adat életciklusának kezelésére, például a régi adatok gyakori és ritka elérésű tárolási szintek közötti áthelyezésére, a régi adatok törlésére vagy archiválására, valamint arra, hogy mikor kell adatokat összesíteni egy alsóbb rétegbeli elemzési megoldásban.

Gondolja át, hogyan működik ez a megközelítés a downstream elemzési forgatókönyvekkel. Bár ez a példa számítási feladat nem elemzésre szolgál, célszerű az alsóbb rétegbeli valós idejű elemzéseket táplálni, míg a kötegforgatókönyveket a data lake-ből lehet táplálni.

Méretezhetőség

Az Azure Event Hubs rendkívül sokoldalú egy olyan tranzakciós rendszer leválasztásakor, amely nyers dokumentumokat hoz létre egy diagnosztikai és megfelelőségi rendszerből; könnyen implementálható már meglévő architektúrákban; és végső soron könnyen használható. Előfordulhat azonban, hogy a tranzakciós rendszer már használja a streamelési mintát a bejövő dokumentumok feldolgozásához. Ebben az esetben valószínűleg integrálnia kell a diagnosztikát és a megfelelőséget kezelő logikát a streamelési alkalmazásba alstreamként.

DevOps

A használt szolgáltatások automatikus üzembe helyezéséhez ebben a példában a legjobb, ha folyamatos integrációs és folyamatos üzembe helyezési (CI/CD) folyamatokat használ. Fontolja meg egy olyan megoldás használatát, mint az Azure DevOps vagy a GitHub Actions.

Költségoptimalizálás

A költségoptimalizálás a szükségtelen kiadások csökkentésének és a működési hatékonyság javításának módjairól szól. További információ: A költségoptimalizálási pillér áttekintése.

A legtöbb esetben az Azure-díjkalkulátorral megbecsülheti költségeit. További szempontokat a Microsoft Azure Well-Architected Framework költség szakaszában talál.

A forgatókönyv üzembe helyezése

A következő példaarchitektúra feltételezi, hogy egy vagy több Azure Event Hubs-névtér strukturált nyers dokumentumokat (például json- vagy xml-fájlokat) fog tartalmazni. A dokumentumok és forrásszolgáltatások tényleges típusa és formátuma, valamint az integráció típusa azonban nagymértékben függ az adott forgatókönyvtől és architektúrától.

Streamelés

A Spark strukturált streamelése során a rendszer lekérte, tömöríti, elemzi és lefordítja a nyers adatokat táblázatos adatokká egy streamelési adatkeretben.

A rendszer a következő PySpark-kódrészletet használja streamelési DataFrame betöltésére az Event Hubsból:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

A streamelt DataFrame feldolgozásához az alábbi kódrészlet szolgál. Először kibontja az Event Hubs-üzenetet, ha szükséges, majd táblázatos formátumban elemzi annak json-struktúráját. Ez a kód egy példa, és az adott forgatókönyvhöz kell igazítani:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

A tényleges adatfeldolgozás két lépésből áll. Az első a metaadatok kinyerése a nyers dokumentumok feldolgozását követő kereséséhez. A tényleges metaadatok a használati esettől függenek, de általánosítható példák lehetnek a releváns dátumok és azonosítók, a dokumentumtípusok, a forrásszolgáltatás és bármilyen kategóriatípus:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

A második feldolgozási lépés egy elérési út létrehozása az Azure Data Lake Storage Gen2-be, ahol nyers dokumentumokat fog tárolni:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Metaadatok betöltése delta-tóban

A metaadatok olyan delta táblába íródnak, amely valós idejű lekérdezési képességeket tesz lehetővé. Az írások egy pufferben vannak streamelve, és a táblába érkező lekérdezések egyesíthetik a puffer eredményeit a tábla előzményrészéből származókkal.

Az alábbi kódrészlet bemutatja, hogyan definiálhat deltatáblát a metaadattárban, és hogyan particionálhatja dátum szerint:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Vegye figyelembe, hogy a transactionId mező numerikus. Az elosztott rendszereket átadó tipikus üzenetek grafikus GUID-ket használhatnak a tranzakciók egyedi azonosítására. A numerikus adattípusok azonban nagyobb lekérdezési teljesítményt tesznek lehetővé a legtöbb adatplatformon.

Az egyedi tranzakcióazonosítók hozzárendelése a felhőbeli adatplatformok (például a Spark) elosztott jellege miatt nehézkes lehet. Hasznos módszer, ha egy ilyen tranzakcióazonosítót egy partícióazonosítóra (például az Event Hubs-partíciószámra) és egy partíción belüli növekményes számra alapozza. Erre a megközelítésre példa a monotonically_increasing_id() az Azure Databricksben.

Az alábbi kódrészlet bemutatja, hogyan fűzheti hozzá a streamet nyers dokumentumok metaadataival a deltatáblához:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Vegye figyelembe, hogy a particionálás a stream táblaséma szerinti írása közben történik.

Adatbetöltés egy adattóban

A tényleges nyers dokumentumok egy megfelelő tárolási teljesítményszintre vannak írva az Azure Data Lake Gen2-ben.

Az alábbi kódrészlet egy egyszerű függvényt mutat be, amely feltölt egy fájlt az Azure Data Lake Store Gen2-be; Az osztály foreachDataStreamWriter metódusának használatával feltöltheti a streamelt DataFrame minden rekordjában tárolt fájlt:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Ügyfél

Az ügyfél lehet egy egyéni webalkalmazás, amely metaadatokkal kéri le a dokumentumútvonalakat a delta táblából standard SQL-utasításokkal, és a tényleges dokumentumot a data lake-ből standard Azure Data Lake Storage Gen2 API-kkal.

Az alábbi kódrészlet például bemutatja, hogyan kérhető le egy adott tranzakció összes dokumentumának elérési útja:

select * from metadata where transactionId = '123456'

Következő lépések

Tekintse meg a kapcsolódó architekturális útmutatót:

Tekintse meg az alábbi kapcsolódó architektúrákat: