Interakcja z Azure Cosmos DB za pomocą Apache Spark w Azure Synapse Link

W tym artykule dowiesz się, jak wchodzić w interakcje z usługami Azure Cosmos DB przy użyciu usługi Synapse Apache Spark. Dzięki pełnej obsługi języka Scala, Python, SparkSQL i C# usługa Synapse Apache Spark jest centralnym elementem scenariuszy analizy, inżynierii danych, nauki o danych i eksploracji danych w układzie Azure Synapse Link for Azure Cosmos DB.

Podczas interakcji z usługą są obsługiwane następujące Azure Cosmos DB:

  • Program Synapse Apache Spark umożliwia analizowanie danych w kontenerach usługi Azure Cosmos DB, które są włączone za pomocą usługi Azure Synapse Link niemal w czasie rzeczywistym, bez wpływu na wydajność obciążeń transakcyjnych. Dostępne są następujące dwie opcje wykonywania zapytań Azure Cosmos DB magazynu analitycznego z platformy Spark:
    • Ładowanie do ramki danych platformy Spark
    • Tworzenie tabeli platformy Spark
  • Synapse Apache Spark umożliwia również pozysowanie danych do Azure Cosmos DB. Należy pamiętać, że dane są zawsze pozyskane do kontenerów Azure Cosmos DB za pośrednictwem magazynu transakcyjnego. Gdy Synapse Link jest włączona, wszystkie nowe wstawienia, aktualizacje i usunięcia są automatycznie synchronizowane z magazynem analitycznym.
  • Program Synapse Apache Spark obsługuje również przesyłanie strumieniowe ze strukturą platformy Spark Azure Cosmos DB z usługą jako źródłem, a także ujściem.

W poniższych sekcjach przedstawiono składnię powyższych możliwości. Gesty w Azure Synapse Analytics obszarze roboczym zostały zaprojektowane w celu zapewnienia łatwego w użyciu środowiska, które ułatwia rozpoczynanie pracy. Gesty są widoczne po kliknięciu prawym przyciskiem myszy kontenera Azure Cosmos DB na karcie Dane w obszarze roboczym synapse. Za pomocą gestów można szybko wygenerować kod i dostosować go do własnych potrzeb. Gesty są również idealne do odnajdywania danych jednym kliknięciem.

Ważne

Należy pamiętać o pewnych ograniczeniach w schemacie analitycznym, które mogą prowadzić do nieoczekiwanego zachowania operacji ładowania danych. Na przykład w schemacie analitycznym jest dostępnych tylko 1000 pierwszych właściwości ze schematu transakcyjnego, właściwości ze spacjami itp. Jeśli występują nieoczekiwane wyniki, sprawdź ograniczenia schematu magazynu analitycznego, aby uzyskać więcej szczegółów.

Wykonywanie zapytań Azure Cosmos DB magazynu analitycznego

Zanim dowiesz się więcej o dwóch możliwych opcjach wykonywania zapytań dotyczących magazynu analitycznego platformy Azure Cosmos DB, ładowania do ramki danych platformy Spark i tworzenia tabeli platformy Spark, warto poznać różnice w zakresie obsługi, aby wybrać opcję, która będzie odpowiedni dla Twoich potrzeb.

Różnica w doświadczeniach polega na tym, czy zmiany danych bazowych w kontenerze Azure Cosmos DB powinny być automatycznie odzwierciedlane w analizie wykonywanej na platforma Spark. W przypadku zarejestrowania ramki danych platformy Spark lub utworzenia tabeli platformy Spark w magazynie analitycznym kontenera metadane dotyczące bieżącej migawki danych w magazynie analitycznym są pobierane na platformę Spark w celu wydajnego wypychania kolejnych analiz. Należy pamiętać, że ponieważ platforma Spark jest zgodna z zasadami oceny z opóźnieniem, chyba że akcja jest wywoływana w ramce danych platformy Spark lub zapytanie SparkSQL jest wykonywane względem tabeli spark, rzeczywiste dane nie są pobierane z magazynu analitycznego bazowego kontenera.

W przypadku ładowania do ramki danych Spark pobrane metadane są buforowane przez okres istnienia sesji Spark. Tym samym kolejne akcje wywoływane dla ramki danych są oceniane względem migawki magazynu analitycznego z czasu utworzenia ramki danych.

Z drugiej strony w przypadku utworzenia tabeli Spark metadane stanu magazynu analitycznego nie są buforowane na platformie Spark i są ponownie ładowane przy każdym wykonaniu zapytania SparkSQL względem tabeli Spark.

W związku z tym możesz wybrać między ładowaniem do ramki danych Spark i utworzeniem tabeli Spark w zależności od tego, czy analiza platformy Spark ma być oceniana względem, odpowiednio, stałej migawki magazynu analitycznego, czy najnowszej migawki magazynu analitycznego.

Uwaga

Aby odpytować interfejs API Azure Cosmos DB kont bazy danych Mongo DB, dowiedz się więcej o pełnej reprezentacji schematu wierności w magazynie analitycznym i rozszerzonych nazwach właściwości, które mają być używane.

Ładowanie do ramki danych platformy Spark

W tym przykładzie utworzysz ramę danych platformy Spark, która wskazuje Azure Cosmos DB magazynu analitycznego. Następnie możesz przeprowadzić dodatkową analizę, invoking Spark actions against the DataFrame (Akcje platformy Spark względem ramki danych). Ta operacja nie ma wpływu na magazyn transakcyjny.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Równoważna składnia języka Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Tworzenie tabeli platformy Spark

W tym przykładzie utworzysz tabelę platformy Spark, która wskazuje Azure Cosmos DB magazynu analitycznego. Następnie możesz przeprowadzić dodatkową analizę, wywołując zapytania SparkSQL względem tabeli. Ta operacja nie ma wpływu na magazyn transakcyjny ani nie powoduje żadnego ruchu danych. Jeśli zdecydujesz się usunąć tę tabelę platformy Spark, nie Azure Cosmos DB kontenera i odpowiedniego magazynu analitycznego.

Ten scenariusz jest wygodny do ponownego użycia tabel platformy Spark za pomocą narzędzi innych firm i zapewnienia ułatwień dostępu do danych bazowych w czasie uruchamiania.

Składnia tworzenia tabeli platformy Spark jest następująca:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Uwaga

W scenariuszach, w których schemat podstawowego kontenera usługi Azure Cosmos DB zmienia się i chcesz, aby zaktualizowany schemat był automatycznie odzwierciedlany w zapytaniach względem tabeli Spark, możesz w tym celu ustawić opcję spark.cosmos.autoSchemaMerge na wartość true w opcjach tabeli Spark.

Pisanie ramki danych platformy Spark w Azure Cosmos DB kontenera

W tym przykładzie zapiszesz ramę danych platformy Spark w Azure Cosmos DB kontenerze. Ta operacja będzie mieć wpływ na wydajność obciążeń transakcyjnych i będzie zużywać jednostki żądań aprowowane w kontenerze Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Równoważna składnia języka Scala wyglądałaby następująco:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Ładowanie ramki danych przesyłania strumieniowego z kontenera

W tym gestie użyjesz funkcji przesyłania strumieniowego platformy Spark, aby załadować dane z kontenera do ramki danych. Dane będą przechowywane na podstawowym koncie data lake (i systemie plików) połączonym z obszarem roboczym.

Uwaga

Jeśli chcesz odwoływać się do bibliotek zewnętrznych w programie Synapse Apache Spark, dowiedz się więcej tutaj. Jeśli na przykład chcesz pozysować ramce danych platformy Spark do kontenera interfejsu API usługi Cosmos DB dla bazy danych Mongo DB, możesz skorzystać z łącznika bazy danych Mongo DB dla platformy Spark tutaj.

Ładowanie ramki danych przesyłania strumieniowego z Azure Cosmos DB kontenera

W tym przykładzie użyjesz funkcji przesyłania strumieniowego ze strukturą platformy Spark, aby załadować dane z kontenera usługi Azure Cosmos DB do ramki danych przesyłania strumieniowego Platformy Spark przy użyciu funkcji zestawienia zmian w Azure Cosmos DB. Dane punktu kontrolnego używane przez firmę Spark będą przechowywane na podstawowym koncie data lake (i systemie plików), które nałączono z obszarem roboczym.

Jeśli folder /localReadCheckpointFolder nie został utworzony (w poniższym przykładzie), zostanie utworzony automatycznie. Ta operacja będzie mieć wpływ na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowowanych w kontenerze Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Równoważna składnia języka Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Zapis ramki danych przesyłania strumieniowego Azure Cosmos DB kontenera

W tym przykładzie zapiszesz ramę danych przesyłania strumieniowego w kontenerze Azure Cosmos DB danych. Ta operacja będzie mieć wpływ na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowowanych w kontenerze Azure Cosmos DB lub udostępnionej bazie danych. Jeśli folder /localWriteCheckpointFolder nie został utworzony (w poniższym przykładzie), zostanie utworzony automatycznie.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

streamQuery = dfStream\
        .writeStream\
        .format("cosmos.oltp")\
        .outputMode("append")\
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .option("spark.synapse.linkedService", "<enter linked service name>")\
        .option("spark.cosmos.container", "<enter container name>")\
        .option("spark.cosmos.connection.mode", "gateway")\
        .start()

streamQuery.awaitTermination()

Równoważna składnia języka Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("checkpointLocation", "/localWriteCheckpointFolder").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("spark.cosmos.connection.mode", "gateway").
            start()

query.awaitTermination()

Następne kroki