Az Azure Cosmos DB használata az Apache Spark 2 használatával a Azure Synapse Linkben

Megjegyzés

Ha az Azure Cosmos DB-hez készült Azure Synapse Linket a Spark 3-at használja, tekintse meg ezt a cikket Azure Synapse Hivatkozás az Azure Cosmos DB-hez a Spark 3-on

Ebből a cikkből megtudhatja, hogyan használhatja az Azure Cosmos DB-t a Synapse Apache Spark 2 használatával. A Scala, a Python, a SparkSQL és a C# teljes körű támogatásával a Synapse Apache Spark központi szerepet kap az elemzési, adatfeldolgozási, adatelemzési és adatfeltárási forgatókönyvekben az Azure Cosmos DB-hez készült Azure Synapse Linkben.

Az Azure Cosmos DB használata során a következő képességek támogatottak:

  • A Synapse Apache Spark lehetővé teszi, hogy közel valós időben elemezze a Azure Synapse Linkkel engedélyezett Azure Cosmos DB-tárolók adatait anélkül, hogy ez hatással lenne a tranzakciós számítási feladatok teljesítményére. Az Azure Cosmos DB elemzési tárolójának Lekérdezése a Sparkból a következő két lehetőség közül választhat:
    • Betöltés a Spark DataFrame-be
    • Spark-tábla létrehozása
  • A Synapse Apache Spark lehetővé teszi az adatok Azure Cosmos DB-be való betöltését is. Fontos megjegyezni, hogy az adatok mindig az Azure Cosmos DB-tárolókba kerülnek a tranzakciós tárolón keresztül. Ha Synapse Link engedélyezve van, a rendszer automatikusan szinkronizálja az új beszúrásokat, frissítéseket és törléseket az elemzési tárba.
  • A Synapse Apache Spark a Spark strukturált streamelését is támogatja az Azure Cosmos DB-vel forrásként és fogadóként.

Az alábbi szakaszok végigvezetik a fenti képességek szintaxisán. Az Azure Cosmos DB és az Apache Spark Azure Synapse Analyticshez való lekérdezéséről szóló Learn modult is megtekintheti. A Azure Synapse Analytics-munkaterület kézmozdulatai megkönnyítik az első lépéseket. A kézmozdulatok akkor jelennek meg, ha a jobb gombbal egy Azure Cosmos DB-tárolóra kattint a Synapse-munkaterület Adatok lapján. A kézmozdulatokkal gyorsan hozhat létre kódot, és testre szabhatja azt az igényei szerint. A kézmozdulatok az adatok egy kattintással való felderítéséhez is ideálisan használhatók.

Fontos

Tisztában kell lennie az elemzési séma néhány olyan korlátozásával, amelyek az adatbetöltési műveletek váratlan viselkedéséhez vezethetnek. Például a tranzakciós sémából csak az első 1000 tulajdonság érhető el az elemzési sémában, a szóközökkel rendelkező tulajdonságok nem érhetők el stb. Ha váratlan eredményeket tapasztal, további részletekért tekintse meg az elemzési tár sémakorlátozásait .

Azure Cosmos DB elemzési tár lekérdezése

Mielőtt megismerkedhet az Azure Cosmos DB elemzési tár lekérdezésének, a Spark DataFrame-be való betöltésének és a Spark-tábla létrehozásának két lehetséges lehetőségével, érdemes áttekinteni a felhasználói élményben tapasztalható különbségeket, hogy kiválaszthatja az igényeinek megfelelő lehetőséget.

A tapasztalatbeli különbség az, hogy az Azure Cosmos DB-tároló mögöttes adatváltozásainak automatikusan tükröződniük kell-e a Sparkban végzett elemzésben. Ha egy Spark DataFrame regisztrálva van, vagy létrehoz egy Spark-táblát egy tároló elemzési tárolóján, a rendszer lekéri az elemzési tárolóban lévő adatok aktuális pillanatképe körüli metaadatokat a Sparkba a későbbi elemzések hatékony leküldése érdekében. Fontos megjegyezni, hogy mivel a Spark egy lusta kiértékelési szabályzatot követ, kivéve, ha a Spark DataFrame-en egy műveletet hív meg, vagy egy SparkSQL-lekérdezést hajt végre a Spark-táblán, a rendszer nem kéri le a tényleges adatokat a mögöttes tároló elemzési tárolójából.

A Spark DataFrame-be való betöltéskor a beolvasott metaadatok a Spark-munkamenet teljes élettartama alatt gyorsítótárazva maradnak, így a DataFrame-en meghívott további műveletek kiértékelése a DataFrame létrehozásakor az elemzési tárba került pillanatkép alapján történik.

Ezzel szemben a Spark-táblák létrehozásakor a rendszer nem gyorsítótárazza a Sparkban az elemzési tár állapotának metaadatait, hanem újra betölti őket a Spark-táblán végrehajtott összes SparkSQL-lekérdezés végrehajtásakor.

Ezért választhat a Spark DataFrame betöltése és a Spark-táblázat létrehozása között aszerint, hogy a Spark-elemzést az elemzési tár rögzített pillanatképével vagy az elemzési tár legújabb pillanatképével összehasonlítva szeretné elvégezni.

Ha az elemzési lekérdezések gyakran használnak szűrőket, lehetősége van ezen mezők alapján particionálásra a jobb lekérdezési teljesítmény érdekében. Rendszeres időközönként végrehajthat particionálási feladatot egy Azure Synapse Spark-jegyzetfüzetből, hogy particionálást aktiváljon az elemzési tárban. Ez a particionált tároló a Azure Synapse munkaterülethez társított elsődleges ADLS Gen2-tárfiókra mutat. További információ: Az egyéni particionálás bemutatása és az egyéni particionálási cikkek konfigurálása .

Megjegyzés

A MongoDB-fiókokhoz készült Azure Cosmos DB-fiókok lekérdezéséhez tudjon meg többet az elemzési tár teljes hűségséma-ábrázolásáról és a használni kívánt kiterjesztett tulajdonságnevekről.

Megjegyzés

Vegye figyelembe, hogy az alábbi parancsok mindegyike options megkülönbözteti a kis- és nagybetűket. Például a while függvényt kell használnia Gatewaygateway , és hibaüzenetet ad vissza.

Betöltés a Spark DataFrame-be

Ebben a példában létrehoz egy Spark DataFrame-et, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha Spark-műveleteket invoktál a DataFrame-hez. Ez a művelet nem befolyásolja a tranzakciós tárolót.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark-tábla létrehozása

Ebben a példában létrehoz egy Spark-táblát, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha SparkSQL-lekérdezéseket invoktál a táblára. Ez a művelet nem érinti a tranzakciós tárolót, és nem jár adatáthelyezési művelettel. Ha úgy dönt, hogy törli ezt a Spark-táblát, a mögöttes Azure Cosmos DB-tároló és a megfelelő elemzési tár nem lesz hatással.

Ez a forgatókönyv kényelmesen használható a Spark-táblák külső eszközökkel történő újrafelhasználásához, valamint a futtatáshoz szükséges mögöttes adatokhoz való hozzáférés biztosításához.

A Spark-tábla létrehozásához szükséges szintaxis a következő:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Megjegyzés

Ha vannak olyan forgatókönyvek, amelyekben a mögöttes Azure Cosmos DB-tároló sémája idővel megváltozik; És ha azt szeretné, hogy a frissített séma automatikusan tükrözze a Spark-tábla lekérdezéseiben, ezt úgy érheti el, hogy true a spark.cosmos.autoSchemaMerge Spark-tábla beállításai között a beállításra van állítva.

Spark DataFrame írása Azure Cosmos DB-tárolóba

Ebben a példában egy Spark DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázisban kiépített kérelemegységeket.

A Python szintaxisa a következő:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Streamelt DataFrame betöltése tárolóból

Ebben a kézmozdulatban a Spark Streaming funkcióval tölt be adatokat egy tárolóból egy adatkeretbe. Az adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.

Megjegyzés

Ha külső kódtárakra szeretne hivatkozni a Synapse Apache Sparkban, itt talál további információt. Ha például Spark DataFrame-et szeretne a MongoDB-hez készült Azure Cosmos DB tárolóba betöltésre, használhatja a SparkHoz készült MongoDB-összekötőt.

Streamelési DataFrame betöltése az Azure Cosmos DB-tárolóból

Ebben a példában a Spark strukturált streamelési funkciójával adatokat tölt be egy Azure Cosmos DB-tárolóból egy Spark streamelési adatkeretbe az Azure Cosmos DB változáscsatorna-funkciójával. A Spark által használt ellenőrzőpont-adatokat a rendszer a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) tárolja.

Ha a /localReadCheckpointFolder mappa nincs létrehozva (az alábbi példában), az automatikusan létrejön. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy megosztott adatbázisban üzembe helyezett kérelemegységeket.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Streamelési DataFrame írása az Azure Cosmos DB-tárolóba

Ebben a példában egy streamelt DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy megosztott adatbázisban üzembe helyezett kérelemegységeket. Ha a /localWriteCheckpointFolder mappa nincs létrehozva (az alábbi példában), az automatikusan létrejön.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Következő lépések