자습서: Spark를 사용하여 Cosmos DB for NoSQL에 연결

적용 대상: NoSQL

이 자습서에서는 Azure Cosmos DB Spark 커넥터를 사용하여 Cosmos DB for NoSQL 계정에서 데이터를 읽거나 씁니다. 이 자습서에서는 Azure Databricks 및 Jupyter Notebook을 사용하여 Spark에서 API for NoSQL과 통합하는 방법을 설명합니다. Spark에서 지원하는 모든 언어 또는 인터페이스를 사용할 수 있더라도 이 자습서에서는 Python 및 Scala에 중점을 둡니다.

이 자습서에서는 다음을 하는 방법을 알아볼 수 있습니다.

  • Spark 및 Jupyter Notebook을 사용하여 API for NoSQL 계정에 연결
  • 데이터베이스 및 컨테이너 리소스 만들기
  • 컨테이너에 데이터 수집
  • 컨테이너의 데이터 쿼리
  • 컨테이너의 항목에 대한 일반적인 작업 수행

필수 조건

Spark 및 Jupyter를 사용하여 연결

기존 Azure Databricks 작업 영역을 사용하여 Apache Spark 3.4.x를 사용하여 Azure Cosmos DB for NoSQL 계정에 연결할 준비가 된 컴퓨팅 클러스터를 만듭니다.

  1. Azure Databricks 작업 영역을 엽니다.

  2. 작업 영역 인터페이스에서 새 클러스터를 만듭니다. 최소한 다음 설정을 사용하여 클러스터를 구성합니다.

    런타임 버전 13.3 LTS(Scala 2.12, Spark 3.4.1)
  3. 작업 영역 인터페이스를 사용하여 그룹 IDMaven Central에서 Mavencom.azure.cosmos.spark 패키지를 검색합니다. 클러스터에 azure-cosmos-spark_3-4 접두사가 추가된 아티팩트 ID를 사용하여 Spark 3.4 전용 패키지를 설치합니다.

  4. 마지막으로 새 Notebook을 만듭니다.

    기본적으로 Notebook은 최근에 만든 클러스터에 연결됩니다.

  5. Notebook 내에서 NoSQL 계정 엔드포인트, 데이터베이스 이름 및 컨테이너 이름에 대한 OLTP 구성 설정을 합니다.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

데이터베이스 및 컨테이너 만들기

카탈로그 API를 사용하여 데이터베이스 및 컨테이너와 같은 계정 리소스를 관리합니다. 그런 다음 OLTP를 사용하여 컨테이너 리소스 내의 데이터를 관리할 수 있습니다.

  1. Spark를 사용하여 NoSQL 리소스에 대한 API를 관리하도록 카탈로그 API를 구성합니다.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. CREATE DATABASE IF NOT EXISTS를 사용하여 cosmicworks라는 새 데이터베이스를 만듭니다.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. CREATE TABLE IF NOT EXISTS를 사용하여 products라고 명명된 새 컨테이너를 만듭니다. 파티션 키 경로를 /category로 설정하고 초당 최대 1000 요청 단위 처리량(RU/s)을 사용하여 자동 스케일링 처리량을 사용하도록 설정해야 합니다.

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. 계층적 파티션 키를 사용하여 employees라는 이름의 또 다른 컨테이너를 만들고 /organization, /department, /team을 해당 순서로 파티션 키 경로 집합으로 합니다. 또한 처리량을 수동 분량 400RU/s로 설정합니다.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Notebook 셀을 실행하여 데이터베이스 및 컨테이너가 API for NoSQL 계정 내에서 생성되는지 확인합니다.

데이터 수집

샘플 데이터 세트를 만든 다음 OLTP를 사용하여 해당 데이터를 API for NoSQL 컨테이너로 수집합니다.

  1. 샘플 데이터 집합을 만듭니다.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. spark.createDataFrame 및 이전에 저장된 OLTP 구성을 사용하여 대상 컨테이너에 샘플 데이터를 추가합니다.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

쿼리 데이터

OLTP 데이터를 데이터 프레임에 로드하여 데이터에 대한 일반적인 쿼리를 수행합니다. 다양한 구문 필터 또는 쿼리 데이터를 사용할 수 있습니다.

  1. spark.read를 사용하여 OLTP 데이터를 데이터 프레임 개체에 로드합니다. 이 자습서의 앞부분에서 사용한 것과 동일한 구성을 사용합니다. 또한 spark.cosmos.read.inferSchema.enabled를 true로 설정하여 Spark 커넥터가 기존 항목을 샘플링하여 스키마를 유추할 수 있도록 합니다.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. printSchema를 사용하여 데이터 프레임에 로드된 데이터의 스키마를 렌더링합니다.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. quantity 열이 20보다 작은 데이터 행을 렌더링합니다. whereshow 함수를 사용하여 이 쿼리를 수행합니다.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. clearance 열이 true인 첫 번째 데이터 행을 렌더링합니다. filter 함수를 사용하여 이 쿼리를 수행합니다.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. 필터 또는 잘림 없이 5개의 데이터 행을 렌더링합니다. show 함수를 사용하여 렌더링되는 행의 모양과 수를 사용자 지정합니다.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. 이 원시 NoSQL 쿼리 문자열을 사용하여 데이터를 쿼리합니다. SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

일반 작업 수행

Spark에서 API for NoSQL 데이터 작업을 하는 경우 부분 업데이트를 수행하거나 원시 JSON으로 데이터 작업을 할 수 있습니다.

  1. 항목의 부분 업데이트를 수행하려면 다음 단계를 수행합니다.

    1. 기존 config 구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 쓰기 전략을 ItemPatch로 구성하여 대량 지원을 사용하지 않도록 설정하고, 열 및 매핑된 작업을 설정하고, 마지막으로 기본 작업 유형을 Set으로 설정합니다.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. 이 패치 작업의 일부로서 대상으로 지정할 항목 파티션 키 및 고유 식별자에 대한 변수를 만듭니다.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. 패치 개체 집합을 만들어 대상 항목을 지정하고 수정해야 하는 필드를 지정합니다.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. 패치 개체 집합을 사용하여 데이터 프레임을 만들고 write를 사용해 패치 작업을 수행합니다.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. 쿼리를 실행하여 패치 작업의 결과를 검토합니다. 이제 다른 변경 내용 없이 항목의 이름을 Yamba New Surfboard로 지정해야 합니다.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. 원시 JSON 데이터를 사용하려면 다음 단계를 수행합니다.

    1. 기존 config 구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 대상 컨테이너를 employees로 변경하고 원시 JSON 데이터를 사용하도록 contacts 열/필드를 구성합니다.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. 컨테이너에 수집할 직원 집합을 만듭니다.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. 데이터 프레임을 만들고 write를 사용하여 직원 데이터를 수집합니다.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. show를 사용하여 데이터 프레임의 데이터를 렌더링합니다. 출력에서 contacts 열이 원시 JSON인지 확인합니다.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

다음 단계