자습서: Spark를 사용하여 Cosmos DB for NoSQL에 연결

적용 대상: NoSQL

이 자습서에서는 Azure Cosmos DB Spark 커넥터를 사용하여 Cosmos DB for NoSQL 계정에서 데이터를 읽거나 씁니다. 이 자습서에서는 Azure Databricks 및 Jupyter Notebook을 사용하여 Spark에서 API for NoSQL과 통합하는 방법을 설명합니다. Spark에서 지원하는 모든 언어 또는 인터페이스를 사용할 수 있더라도 이 자습서에서는 Python 및 Scala에 중점을 둡니다.

이 자습서에서는 다음을 하는 방법을 알아볼 수 있습니다.

  • Spark 및 Jupyter Notebook을 사용하여 API for NoSQL 계정에 연결
  • 데이터베이스 및 컨테이너 리소스 만들기
  • 컨테이너에 데이터 수집
  • 컨테이너의 데이터 쿼리
  • 컨테이너의 항목에 대한 일반적인 작업 수행

필수 조건

Spark 및 Jupyter를 사용하여 연결

기존 Azure Databricks 작업 영역을 사용하여 Apache Spark 3.4.x를 사용하여 Azure Cosmos DB for NoSQL 계정에 연결할 준비가 된 컴퓨팅 클러스터를 만듭니다.

  1. Azure Databricks 작업 영역을 엽니다.

  2. 작업 영역 인터페이스에서 새 클러스터를 만듭니다. 최소한 다음 설정을 사용하여 클러스터를 구성합니다.

    런타임 버전 13.3 LTS(Scala 2.12, Spark 3.4.1)
  3. 작업 영역 인터페이스를 사용하여 그룹 IDMaven Central에서 Mavencom.azure.cosmos.spark 패키지를 검색합니다. 클러스터에 azure-cosmos-spark_3-4 접두사가 추가된 아티팩트 ID를 사용하여 Spark 3.4 전용 패키지를 설치합니다.

  4. 마지막으로 새 Notebook을 만듭니다.

    기본적으로 Notebook은 최근에 만든 클러스터에 연결됩니다.

  5. Notebook 내에서 NoSQL 계정 엔드포인트, 데이터베이스 이름 및 컨테이너 이름에 대한 OLTP 구성 설정을 합니다.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"

데이터베이스 및 컨테이너 만들기

카탈로그 API를 사용하여 데이터베이스 및 컨테이너와 같은 계정 리소스를 관리합니다. 그런 다음 OLTP를 사용하여 컨테이너 리소스 내의 데이터를 관리할 수 있습니다.

  1. Spark를 사용하여 NoSQL 리소스에 대한 API를 관리하도록 카탈로그 API를 구성합니다.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
  2. CREATE DATABASE IF NOT EXISTS를 사용하여 cosmicworks라는 새 데이터베이스를 만듭니다.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
  3. CREATE TABLE IF NOT EXISTS를 사용하여 products라고 명명된 새 컨테이너를 만듭니다. 파티션 키 경로를 /category로 설정하고 초당 최대 1000 요청 단위 처리량(RU/s)을 사용하여 자동 스케일링 처리량을 사용하도록 설정해야 합니다.

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
  4. 계층적 파티션 키를 사용하여 employees라는 이름의 또 다른 컨테이너를 만들고 /organization, /department, /team을 해당 순서로 파티션 키 경로 집합으로 합니다. 또한 처리량을 수동 분량 400RU/s로 설정합니다.

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
  5. Notebook 셀을 실행하여 데이터베이스 및 컨테이너가 API for NoSQL 계정 내에서 생성되는지 확인합니다.

데이터 수집

샘플 데이터 세트를 만든 다음 OLTP를 사용하여 해당 데이터를 API for NoSQL 컨테이너로 수집합니다.

  1. 샘플 데이터 집합을 만듭니다.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
  2. spark.createDataFrame 및 이전에 저장된 OLTP 구성을 사용하여 대상 컨테이너에 샘플 데이터를 추가합니다.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
    // Ingest sample data
      .toDF("id", "category", "name", "quantity", "price", "clearance")

쿼리 데이터

OLTP 데이터를 데이터 프레임에 로드하여 데이터에 대한 일반적인 쿼리를 수행합니다. 다양한 구문 필터 또는 쿼리 데이터를 사용할 수 있습니다.

  1. spark.read를 사용하여 OLTP 데이터를 데이터 프레임 개체에 로드합니다. 이 자습서의 앞부분에서 사용한 것과 동일한 구성을 사용합니다. 또한 spark.cosmos.read.inferSchema.enabled를 true로 설정하여 Spark 커넥터가 기존 항목을 샘플링하여 스키마를 유추할 수 있도록 합니다.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .option("spark.cosmos.read.inferSchema.enabled", "true")
  2. printSchema를 사용하여 데이터 프레임에 로드된 데이터의 스키마를 렌더링합니다.

    # Render schema    
    // Render schema    
  3. quantity 열이 20보다 작은 데이터 행을 렌더링합니다. whereshow 함수를 사용하여 이 쿼리를 수행합니다.

    # Render filtered data    
    df.where("quantity < 20") \
    // Render filtered data
    df.where("quantity < 20")
  4. clearance 열이 true인 첫 번째 데이터 행을 렌더링합니다. filter 함수를 사용하여 이 쿼리를 수행합니다.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
  5. 필터 또는 잘림 없이 5개의 데이터 행을 렌더링합니다. show 함수를 사용하여 렌더링되는 행의 모양과 수를 사용자 지정합니다.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
  6. 이 원시 NoSQL 쿼리 문자열을 사용하여 데이터를 쿼리합니다. SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)

일반 작업 수행

Spark에서 API for NoSQL 데이터 작업을 하는 경우 부분 업데이트를 수행하거나 원시 JSON으로 데이터 작업을 할 수 있습니다.

  1. 항목의 부분 업데이트를 수행하려면 다음 단계를 수행합니다.

    1. 기존 config 구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 쓰기 전략을 ItemPatch로 구성하여 대량 지원을 사용하지 않도록 설정하고, 열 및 매핑된 작업을 설정하고, 마지막으로 기본 작업 유형을 Set으로 설정합니다.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
    2. 이 패치 작업의 일부로서 대상으로 지정할 항목 파티션 키 및 고유 식별자에 대한 변수를 만듭니다.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
    3. 패치 개체 집합을 만들어 대상 항목을 지정하고 수정해야 하는 필드를 지정합니다.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
    4. 패치 개체 집합을 사용하여 데이터 프레임을 만들고 write를 사용해 패치 작업을 수행합니다.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
      // Create data frame
        .toDF("id", "category", "name")
    5. 쿼리를 실행하여 패치 작업의 결과를 검토합니다. 이제 다른 변경 내용 없이 항목의 이름을 Yamba New Surfboard로 지정해야 합니다.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
  2. 원시 JSON 데이터를 사용하려면 다음 단계를 수행합니다.

    1. 기존 config 구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 대상 컨테이너를 employees로 변경하고 원시 JSON 데이터를 사용하도록 contacts 열/필드를 구성합니다.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
    2. 컨테이너에 수집할 직원 집합을 만듭니다.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
    3. 데이터 프레임을 만들고 write를 사용하여 직원 데이터를 수집합니다.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
      // Ingest data
        .toDF("id", "organization", "department", "team", "name", "contacts")
    4. show를 사용하여 데이터 프레임의 데이터를 렌더링합니다. 출력에서 contacts 열이 원시 JSON인지 확인합니다.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")

