Tutorial: Herstellen einer Verbindung mit Azure Cosmos DB for NoSQL mithilfe von Spark

GILT FÜR: NoSQL

In diesem Tutorial verwenden Sie den Azure Cosmos DB Spark-Connector, um Daten aus einem Azure Cosmos DB for NoSQL-Konto zu lesen oder zu schreiben. In diesem Tutorial werden Azure Databricks und ein Jupyter Notebook verwendet, um zu veranschaulichen, wie sie die API für NoSQL mithilfe von Spark integrieren können. Dieses Tutorial konzentriert sich auf Python und Scala. Sie können aber jede beliebige Sprache oder Schnittstelle verwenden, die von Spark unterstützt wird.

In diesem Tutorial lernen Sie Folgendes:

  • Herstellen einer Verbindung mit einer API für ein NoSQL-Konto mithilfe von Spark und einem Jupyter Notebook
  • Erstellen der Datenbank- und der Containerressource
  • Erfassen von Daten im Container
  • Abfragen von Daten im Container
  • Ausführen allgemeiner Vorgänge für Elemente im Container

Voraussetzungen

  • Ein vorhandenes Azure Cosmos DB for NoSQL-Konto
  • Ein vorhandener Azure Databricks-Arbeitsbereich

Verbindung mithilfe von Spark und Jupyter herstellen

Verwenden Sie Ihren vorhandenen Azure Databricks-Arbeitsbereich, um einen Computecluster zu erstellen, der Apache Spark 3.4.x verwenden kann, um eine Verbindung mit Ihrem Azure Cosmos DB for NoSQL-Konto herzustellen.

  1. Öffnen Sie Ihren Azure Databricks-Arbeitsbereich.

  2. Erstellen Sie auf der Oberfläche des Arbeitsbereichs einen neuen Cluster. Konfigurieren Sie den Cluster mit den folgenden (minimalen) Einstellungen:

    Wert
    Laufzeitversion 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Verwenden Sie die Oberfläche des Arbeitsbereichs, um in Maven Central nach Maven-Paketen mit der Gruppen-IDcom.azure.cosmos.spark zu suchen. Installieren Sie das für Spark 3.4 spezifische Paket (der Artefakt-ID des Clusters hat das Präfix azure-cosmos-spark_3-4).

  4. Erstellen Sie zuletzt neues Notebook.

    Tipp

    Standardmäßig wird das Notebook an den zuletzt erstellten Cluster angefügt.

  5. Legen Sie im Notebook die OLTP-Konfigurationseinstellungen für den NoSQL-Kontoendpunkt, den Datenbanknamen und den Containernamen fest.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Erstellen einer Datenbank und eines Containers

Verwenden Sie die Katalog-API, um Kontoressourcen wie Datenbanken und Container zu verwalten. Anschließend können Sie OLTP verwenden, um Daten innerhalb der Containerressource(n) zu verwalten.

  1. Konfigurieren Sie mithilfe von Spark die Katalog-API zum Verwalten der API für NoSQL-Ressourcen.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Erstellen Sie mithilfe von „CREATE DATABASE IF NOT EXISTS“ eine neue Datenbank namens „cosmicworks“.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Erstellen Sie mithilfe von CREATE TABLE IF NOT EXISTS einen neuen Container namens products. Stellen Sie sicher, dass Sie den Partitionsschlüsselpfad auf /category festlegen und automatischen Durchsatz mit einem Maximum von 1000 Anforderungseinheiten pro Sekunde (RU/s) aktivieren.

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Erstellen Sie mithilfe einer hierarchischen Partitionsschlüsselkonfiguration einen weiteren Container namens employees mit /organization, /departmentund /team als Gruppe von Partitionsschlüsselpfaden in dieser festgelegten Reihenfolge. Legen Sie außerdem den Durchsatz auf eine manuelle Menge von 400 RU/s fest.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Führen Sie die Notebookzelle(n) aus, um zu überprüfen, ob die Datenbank und die Container in Ihrer API für das NoSQL-Konto erstellt werden.

Einlesen von Daten

Erstellen Sie ein Beispieldataset, und verwenden Sie dann OLTP, um diese Daten mithilfe der API für NoSQL-Container zu erfassen.

  1. Erstellen Sie ein Beispieldataset.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Verwenden Sie spark.createDataFrame und die zuvor gespeicherte OLTP-Konfiguration, um Beispieldaten zum Zielcontainer hinzuzufügen.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Daten abfragen

Laden Sie OLTP-Daten in einen Datenrahmen, um gängige Abfragen für die Daten auszuführen. Sie können verschiedene Syntaxfilter oder Abfragedaten verwenden.

  1. Verwenden Sie spark.read, um die OLTP-Daten in ein DataFrame-Objekt zu laden. Verwenden Sie dieselbe Konfiguration, die Sie bereits zuvor in diesem Tutorial verwendet haben. Legen Sie außerdem spark.cosmos.read.inferSchema.enabled auf „wahr“ fest, damit der Spark-Connector das Schema ableiten kann, indem er vorhandene Elemente sampelt.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Rendern Sie mithilfe von printSchema das Schema der im Datenrahmen geladenen Daten.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Rendern Sie Datenzeilen, in denen die Spalte quantity kleiner als 20 ist. Verwenden Sie die Funktionen where und show, um diese Abfrage auszuführen.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Rendern Sie die erste Datenzeile, in der die Spalte clearance wahr ist. Verwenden Sie die Funktion filter, um diese Abfrage auszuführen.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Rendern Sie fünf Datenzeilen ohne Filter oder Kürzung. Verwenden Sie die Funktion show, um die Darstellung und Anzahl der gerenderten Zeilen anzupassen.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Fragen Sie Ihrer Daten mithilfe dieser unformatierten NoSQL-Abfragezeichenfolge ab: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800.

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Ausführen gängiger Vorgänge

Wenn Sie in Spark mit der API für NoSQL-Daten arbeiten, können Sie Teilupdates ausführen oder mit JSON-Rohdaten arbeiten.

  1. Führen Sie die folgenden Schritte aus, um ein Teilupdate eines Elements auszuführen:

    1. Kopieren Sie die vorhandene Konfigurationsvariable config, und ändern Sie die Eigenschaften in der neuen Kopie. Konfigurieren Sie insbesondere die Schreibstrategie für ItemPatch, deaktivieren Sie die Unterstützung für Massenvorgänge, legen Sie die Spalten und zugeordneten Vorgänge fest, und legen Sie schließlich den Standardvorgangstyp auf Set fest.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Erstellen Sie Variablen für den Elementpartitionsschlüssel und den eindeutigen Bezeichner, auf den Sie im Rahmen dieses Patchvorgangs als Ziel verwenden möchten.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Erstellen Sie eine Reihe von Patchobjekten, um das Zielelement anzugeben, und geben Sie Felder an, die geändert werden sollen.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Erstellen Sie mithilfe der Gruppe von Patchobjekten einen Datenrahmen, und verwenden Sie write, um den Patchvorgang auszuführen.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Führen Sie eine Abfrage aus, um die Ergebnisse des Patchvorgangs zu überprüfen. Das Element sollte jetzt Yamba New Surfboard benannt werden, ohne dass weitere Änderungen auftreten.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Führen Sie die folgenden Schritte aus, um mit JSON-Rohdaten zu arbeiten:

    1. Kopieren Sie die vorhandene Konfigurationsvariable config, und ändern Sie die Eigenschaften in der neuen Kopie. Ändern Sie insbesondere den Zielcontainer in employees, und konfigurieren Sie die Spalte/das Feld contacts so, dass JSON-Rohdaten verwendet werden.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Erstellen Sie eine Gruppe von Mitarbeitern, die im Container erfasst werden sollen.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Erstellen Sie einen Datenrahmen, und verwenden Sie write, um die Mitarbeiterdaten zu erfassen.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Rendern Sie die Daten aus dem Datenframe mithilfe von show. Sie erkennen, dass die Spalte contacts in der Ausgabe aus JSON-Rohdaten besteht.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Nächster Schritt