Share via


開始使用 Azure Cosmos DB 分析存放區中的異動資料擷取

適用於:NoSQL MongoDB

使用 Azure Cosmos DB 分析存放區中的異動資料擷取 (CDC) 作為 Azure Data FactoryAzure Synapse Analytics 的來源,以擷取資料的特定變更。

注意

請注意,還無法在資料流程上使用適用於 Azure Cosmos DB for MongoDB API 的連結服務介面。 不過,除非支援 Mongo 連結服務,否則您將能夠搭配使用帳戶的文件端點與 "Azure Cosmos DB for NoSQL" 連結服務介面,以作為因應措施。 在 NoSQL 連結服務上,選擇 [手動輸入] 以提供 Cosmos DB 帳戶資訊,並使用帳戶的文件端點 (例如:https://[your-database-account-uri].documents.azure.com:443/),而不是 MongoDB 端點 (例如:mongodb://[your-database-account-uri].mongo.cosmos.azure.com:10255/)

必要條件

啟用分析存放區

首先,在帳戶層級啟用 Azure Synapse Link,然後針對適合您工作負載的容器啟用分析存放區。

  1. 啟用 Azure Synapse Link:啟用適用於 Azure Cosmos DB 帳戶的 Azure Synapse Link

  2. 啟用容器的分析存放區:

    選項 指南
    針對特定新容器啟用 為新容器啟用 Azure Synapse Link
    針對特定現有容器啟用 為現有容器啟用 Azure Synapse Link

使用資料流程建立目標 Azure 資源

分析存放區的異動資料擷取功能可以透過 Azure Data FactoryAzure Synapse Analytics 的資料流程功能取得。 在此指南中,使用 Azure Data Factory。

重要

您也可以使用 Azure Synapse Analytics。 首先,如果您還沒有 Azure Synapse 工作區,則請建立 Azure Synapse 工作區。 在新建立的工作區內,選取 [開發] 索引標籤,並選取 [新增資源],然後選取 [資料流程]

  1. 如果您還沒有 Azure Data Factory,則請建立 Azure Data Factory

    提示

    可能的話,請在 Azure Cosmos DB 帳戶所在的相同區域中建立資料處理站。

  2. 啟動新建立的資料處理站。

  3. 在資料處理站中,選取 [資料流程] 索引標籤,然後選取 [新增資料流程]

  4. 提供新建立資料流程的唯一名稱。 在此範例中,資料流程命名為 cosmoscdc

    Screnshot of a new data flow with the name cosmoscdc.

設定分析存放區容器的來源設定

現在建立和設定來源,以從 Azure Cosmos DB 帳戶的分析存放區流動資料。

  1. 選取新增來源

    Screenshot of the add source menu option.

  2. 在 [輸出資料流名稱] 欄位中,輸入 cosmos

    Screenshot of naming the newly created source cosmos.

  3. 在 [來源類型] 區段中,選取 [內嵌]

    Screenshot of selecting the inline source type.

  4. 在 [資料集] 欄位中,選取 [Azure - Azure Cosmos DB for NoSQL]

    Screenshot of selecting Azure Cosmos DB for NoSQL as the dataset type.

  5. 為您的帳戶建立名為 cosmoslinkedservice 的新連結服務。 在 [新增連結服務] 快顯對話方塊中選取現有的 Azure Cosmos DB for NoSQL 帳戶,然後選取 [確定]。 在此範例中,我們選取預先存在且名為 msdocs-cosmos-source 的 Azure Cosmos DB for NoSQL 帳戶,以及名為 cosmicworks 的資料庫。

    Screenshot of the New linked service dialog with an Azure Cosmos DB account selected.

  6. 針對存放區類型,選取 [分析]

    Screenshot of the analytical option selected for a linked service.

  7. 選取 [來源選項] 索引標籤。

  8. 在 [來源選項] 內,選取您的目標容器,然後啟用 [資料流程偵錯]。 在此範例中,容器命名為 products

    Screenshot of a source container selected named products.

  9. 選取 [資料流程偵錯]。 在 [開啟資料流程偵錯] 快顯對話方塊中,保留預設選項,然後選取 [確定]

    Screenshot of the toggle option to enable data flow debug.

  10. [來源選項] 索引標籤也包含您可能想要啟用的其他選項。 此表描述這些選項:

選項 描述
擷取中繼更新 如果您想要擷取項目的變更歷程記錄 (包括異動資料擷取讀取之間的中繼變更),則請啟用此選項。
擷取刪除 啟用此選項以擷取使用者已刪除記錄,並將其套用至接收器。 無法在 Azure 資料總管和 Azure Cosmos DB 接收器上套用刪除。
擷取交易存放區 TTL 啟用此選項以擷取 Azure Cosmos DB 交易存放區 (存留時間) TTL 已刪除記錄,並套用至接收器。 無法在 Azure 資料總管和 Azure Cosmos DB 接收器上套用 TTL 刪除。
Batchsize (以位元組為單位) 此設定實際上是 GB。 如果您想要批次處理異動資料擷取摘要,則請以 GB 為單位來指定大小
額外設定 額外 Azure Cosmos DB 分析存放區設定和其值。 (例如:spark.cosmos.allowWhiteSpaceInFieldNames -> true)

使用來源選項

當您核取任一 Capture intermediate updatesCapture DeltesCapture Transactional store TTLs 選項時,CDC 程序將會使用下列值來建立和填入接收中的 __usr_opType 欄位:

Description 選項
1 UPDATE 擷取中繼更新
2 INSERT 插入沒有選項,預設為開啟
3 USER_DELETE 擷取刪除
4 TTL_DELETE 擷取交易存放區 TTL

如果您必須區分 TTL 已刪除記錄與使用者或應用程式所刪除的文件,則請檢查 Capture intermediate updatesCapture Transactional store TTLs 選項。 然後,您必須適應 CDC 程序或應用程式或是查詢,以根據您的業務需求所需內容來使用 __usr_opType

提示

如果需要下游取用者在核取「擷取中繼更新」選項來還原更新順序,則系統時間戳記 _ts 欄位可以用作排序欄位。

建立與設定更新和刪除作業的接收器設定

首先,建立直接 Azure Blob 儲存體接收器,然後將接收器設定為只篩選特定作業的資料。

  1. 建立 Azure Blob 儲存體帳戶和容器 (如果您還沒有的話)。 在接下來的範例中,我們將使用名為 msdocsblobstorage 的帳戶以及名為 output 的容器。

    提示

    可能的話,請在 Azure Cosmos DB 帳戶所在的相同區域中建立儲存體帳戶。

  2. 回到 Azure Data Factory,針對從 cosmos 來源所擷取的異動資料建立新的接收器。

    Screenshot of adding a new sink that's connected to the existing source.

  3. 提供接收器的唯一名稱。 在此範例中,接收器命名為 storage

    Screenshot of naming the newly created sink storage.

  4. 在 [接收器類型] 區段中,選取 [內嵌]。 在 [資料集] 欄位中,選取 [差異]

    Screenshot of selecting and Inline Delta dataset type for the sink.

  5. 使用 Azure Blob 儲存體,為您的帳戶建立名為 storagelinkedservice 的新連結服務。 在 [新增連結服務] 快顯對話方塊中選取現有的 Azure Blob 儲存體帳戶,然後選取 [確定]。 在此範例中,我們選取預先存在且名為 msdocsblobstorage 的 Azure Blob 儲存體帳戶。

    Screenshot of the service type options for a new Delta linked service.

    Screenshot of the New linked service dialog with an Azure Blob Storage account selected.

  6. 選取 [設定] 索引標籤。

  7. 在 [設定] 內,將 [資料夾路徑] 設定為 Blob 容器的名稱。 在此範例中,容器名稱為 output

    Screenshot of the blob container named output set as the sink target.

  8. 找出 [Update 方法] 區段,然後將選取範圍變更為只允許 deleteupdate 作業。 此外,使用欄位 {_rid} 作為唯一識別碼,將 [索引鍵資料行] 指定為 [資料行清單]

    Screenshot of update methods and key columns being specified for the sink.

  9. 選取 [驗證],以確保您尚未發生任何錯誤或遺漏。 然後,選取 [發佈] 以發佈資料流程。

    Screenshot of the option to validate and then publish the current data flow.

排程異動資料擷取執行

發佈資料流程之後,您可以新增管線來移動和轉換資料。

  1. 建立新管線。 提供管線的唯一名稱。 在此範例中,管線命名為 cosmoscdcpipeline

    Screenshot of the new pipeline option within the resources section.

  2. 在 [活動] 區段中,展開 [移動和轉換] 選項,然後選取 [資料流程]

    Screenshot of the data flow activity option within the activities section.

  3. 提供資料流程活動的唯一名稱。 在此範例中,活動命名為 cosmoscdcactivity

  4. 在 [設定] 索引標籤中,選取您稍早在本指南中建立且名為 cosmoscdc 的資料流程。 然後,根據工作負載的資料量和所需延遲,來選取計算大小。

    Screenshot of the configuration settings for both the data flow and compute size for the activity.

    提示

    針對大於 100 GB 的增量資料大小,建議使用具有 32 個核心計數 (+16 個驅動程式核心) 的「自訂」大小。

  5. 選取 [新增觸發程序]。 排程此管線以適合您工作負載的步調來執行。 在此範例中,管線設定成每五分鐘執行一次。

    Screenshot of the add trigger button for a new pipeline.

    Screenshot of a trigger configuration based on a schedule, starting in the year 2023, that runs every five minutes.

    注意

    異動資料擷取執行的最低週期性時間範圍是一分鐘。

  6. 選取 [驗證],以確保您尚未發生任何錯誤或遺漏。 然後,選取 [發佈] 以發佈管線。

  7. 使用 Azure Cosmos DB 分析存放區異動資料擷取,來觀察放入 Azure Blob 儲存體容器中作為資料流程輸出的資料。

    Screnshot of the output files from the pipeline in the Azure Blob Storage container.

    注意

    初始叢集啟動時間最多可能需要三分鐘的時間。 若要避免後續異動資料擷取執行中的叢集啟動時間,請設定資料流程叢集「存留時間」值。 如需整合執行階段和 TTL 的詳細資訊,請參閱 Azure Data Factory 中的整合執行階段

並行作業

來源選項中的批次大小,或是接收器緩慢擷取變更資料流的情況,可能會導致同時執行多個作業。 若要避免這種情況,請在 [管線] 設定中將 [並行] 選項設定為 1,以確保在目前執行完成之前不會觸發新的執行。

下一步