您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Azure 门户根据更改跟踪信息,以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储Incrementally load data from Azure SQL Database to Azure Blob Storage using change tracking information using the Azure portal

适用于: Azure 数据工厂 Azure Synapse Analytics

在本教程中,你将创建一个带管道的 Azure 数据工厂,该管道根据 Azure SQL 数据库的源数据库中的“更改跟踪”信息将增量数据加载到 Azure Blob 存储。In this tutorial, you create an Azure Data Factory with a pipeline that loads delta data based on change tracking information in the source database in Azure SQL Database to an Azure blob storage.

在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 准备源数据存储Prepare the source data store
  • 创建数据工厂。Create a data factory.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和更改跟踪数据集。Create source, sink, and change tracking datasets.
  • 创建、运行和监视完整复制管道Create, run, and monitor the full copy pipeline
  • 在源表中添加或更新数据Add or update data in the source table
  • 创建、运行和监视增量复制管道Create, run, and monitor the incremental copy pipeline

概述Overview

在数据集成解决方案中,一种广泛使用的方案是在完成初始数据加载后以增量方式加载数据。In a data integration solution, incrementally loading data after initial data loads is a widely used scenario. 在某些情况下,可以通过某种方式(例如,使用 LastModifyTime、CreationTime 等属性)将源数据存储中某个时段的更改数据轻松地进行切分。In some cases, the changed data within a period in your source data store can be easily to sliced up (for example, LastModifyTime, CreationTime). 在某些情况下,没有明确的方式可以将增量数据从上一次处理过的数据中区分出来。In some cases, there is no explicit way to identify the delta data from last time you processed the data. 可以使用 Azure SQL 数据库、SQL Server 等数据存储支持的更改跟踪技术来确定增量数据。The Change Tracking technology supported by data stores such as Azure SQL Database and SQL Server can be used to identify the delta data. 本教程介绍如何将 Azure 数据工厂与 SQL 更改跟踪技术配合使用,通过增量方式将增量数据从 Azure SQL 数据库加载到 Azure Blob 存储中。This tutorial describes how to use Azure Data Factory with SQL Change Tracking technology to incrementally load delta data from Azure SQL Database into Azure Blob Storage. 有关 SQL 更改跟踪技术的更具体的信息,请参阅 SQL Server 中的更改跟踪For more concrete information about SQL Change Tracking technology, see Change tracking in SQL Server.

端到端工作流End-to-end workflow

下面是典型的端到端工作流步骤,用于通过更改跟踪技术以增量方式加载数据。Here are the typical end-to-end workflow steps to incrementally load data using the Change Tracking technology.

备注

Azure SQL 数据库和 SQL Server 都支持更改跟踪技术。Both Azure SQL Database and SQL Server support the Change Tracking technology. 本教程使用 Azure SQL 数据库作为源数据存储。This tutorial uses Azure SQL Database as the source data store. 此外,还可以使用 SQL Server 实例。You can also use a SQL Server instance.

  1. 首次加载历史数据(运行一次):Initial loading of historical data (run once):
    1. 在 Azure SQL 数据库的源数据库中启用更改跟踪技术。Enable Change Tracking technology in the source database in Azure SQL Database.
    2. 在数据库中获取 SYS_CHANGE_VERSION 的初始值,作为捕获更改数据的基线。Get the initial value of SYS_CHANGE_VERSION in the database as the baseline to capture changed data.
    3. 将完整数据从源数据库加载到 Azure Blob 存储中。Load full data from the source database into an Azure blob storage.
  2. 以增量方式按计划加载增量数据(在首次加载数据后定期运行):Incremental loading of delta data on a schedule (run periodically after the initial loading of data):
    1. 获取旧的和新的 SYS_CHANGE_VERSION 值。Get the old and new SYS_CHANGE_VERSION values.
    2. sys.change_tracking_tables 中已更改行(两个 SYS_CHANGE_VERSION 值之间)的主键与 源表 中的数据联接,以便加载增量数据,然后将增量数据移到目标位置。Load the delta data by joining the primary keys of changed rows (between two SYS_CHANGE_VERSION values) from sys.change_tracking_tables with data in the source table, and then move the delta data to destination.
    3. 更新 SYS_CHANGE_VERSION,以便下次进行增量加载。Update the SYS_CHANGE_VERSION for the delta loading next time.

高级解决方案High-level solution

在本教程中,请创建两个管道来执行下述两项操作:In this tutorial, you create two pipelines that perform the following two operations:

  1. 首次加载: 创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。Initial load: you create a pipeline with a copy activity that copies the entire data from the source data store (Azure SQL Database) to the destination data store (Azure Blob Storage).

    完整地加载数据

  2. 增量加载: 创建一个包含以下活动的管道并定期运行。Incremental load: you create a pipeline with the following activities, and run it periodically.

    1. 创建 两项查找活动,从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。Create two lookup activities to get the old and new SYS_CHANGE_VERSION from Azure SQL Database and pass it to copy activity.
    2. 创建 一项复制活动,将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。Create one copy activity to copy the inserted/updated/deleted data between the two SYS_CHANGE_VERSION values from Azure SQL Database to Azure Blob Storage.
    3. 创建 一项存储过程活动,更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。Create one stored procedure activity to update the value of SYS_CHANGE_VERSION for the next pipeline run.

    增量加载流程图

如果没有 Azure 订阅,请在开始之前创建一个免费帐户。If you don't have an Azure subscription, create a free account before you begin.

先决条件Prerequisites

  • Azure SQL 数据库Azure SQL Database. 将数据库用作 数据存储。You use the database as the source data store. 如果没有 Azure SQL 数据库,请参阅创建 Azure SQL 数据库一文获取创建步骤。If you don't have a database in Azure SQL Database, see the Create a database in Azure SQL Database article for steps to create one.
  • Azure 存储帐户Azure Storage account. 将 Blob 存储用作 接收器 数据存储。You use the blob storage as the sink data store. 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。If you don't have an Azure storage account, see the Create a storage account article for steps to create one. 创建名为 adftutorial 的容器。Create a container named adftutorial.

在 Azure SQL 数据库中创建数据源表Create a data source table in Azure SQL Database

  1. 启动 SQL Server Management Studio,连接到 SQL 数据库。Launch SQL Server Management Studio, and connect to SQL Database.

  2. 在“服务器资源管理器”中,右键单击你的 数据库,然后选择“新建查询”。In Server Explorer, right-click your database and choose the New Query.

  3. 针对数据库运行以下 SQL 命令,创建名为 data_source_table 的表作为数据源存储。Run the following SQL command against your database to create a table named data_source_table as data source store.

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
    
  4. 通过运行以下 SQL 查询,在数据库和源表 (data_source_table) 上启用 更改跟踪 机制:Enable Change Tracking mechanism on your database and the source table (data_source_table) by running the following SQL query:

    备注

    • 将 <your database name> 替换为你的 Azure SQL 数据库的名称,其中包含 data_source_table。Replace <your database name> with the name of the database in Azure SQL Database that has the data_source_table.
    • 在当前的示例中,更改的数据保留两天。The changed data is kept for two days in the current example. 如果每隔三天或三天以上加载更改的数据,则不会包括某些更改的数据。If you load the changed data for every three days or more, some changed data is not included. 需将 CHANGE_RETENTION 的值更改为更大的值。You need to either change the value of CHANGE_RETENTION to a bigger number. 或者,确保在两天内加载一次更改的数据。Alternatively, ensure that your period to load the changed data is within two days. 有关详细信息,请参阅对数据库启用更改跟踪For more information, see Enable change tracking for a database
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. 通过运行以下查询,创建一个新表并存储带默认值的 ChangeTracking_version:Create a new table and store the ChangeTracking_version with a default value by running the following query:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    备注

    如果对 SQL 数据库启用更改跟踪后数据并未更改,则更改跟踪版本的值为 0。If the data is not changed after you enabled the change tracking for SQL Database, the value of the change tracking version is 0.

  6. 运行以下查询,在数据库中创建存储过程。Run the following query to create a stored procedure in your database. 管道会调用此存储过程,以便更新上一步创建的表中的更改跟踪版本。The pipeline invokes this stored procedure to update the change tracking version in the table you created in the previous step.

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    
    BEGIN
    
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    
    END    
    

Azure PowerShellAzure PowerShell

备注

本文已经过更新,以便使用 Azure Az PowerShell 模块。This article has been updated to use the Azure Az PowerShell module. 若要与 Azure 交互,建议使用的 PowerShell 模块是 Az PowerShell 模块。The Az PowerShell module is the recommended PowerShell module for interacting with Azure. 若要开始使用 Az PowerShell 模块,请参阅安装 Azure PowerShellTo get started with the Az PowerShell module, see Install Azure PowerShell. 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 AzTo learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.

如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。Install the latest Azure PowerShell modules by following instructions in How to install and configure Azure PowerShell.

创建数据工厂Create a data factory

  1. 启动 Microsoft EdgeGoogle Chrome Web 浏览器。Launch Microsoft Edge or Google Chrome web browser. 目前,仅 Microsoft Edge 和 Google Chrome Web 浏览器支持数据工厂 UI。Currently, Data Factory UI is supported only in Microsoft Edge and Google Chrome web browsers.

  2. 在左侧菜单中,选择“创建资源” > “数据 + 分析” > “数据工厂”:On the left menu, select Create a resource > Data + Analytics > Data Factory:

    在“新建”窗格中选择“数据工厂”

  3. 在“新建数据工厂”页中,输入 ADFTutorialDataFactory 作为 名称In the New data factory page, enter ADFTutorialDataFactory for the name.

    “新建数据工厂”页

    Azure 数据工厂的名称必须全局唯一。The name of the Azure Data Factory must be globally unique. 如果收到错误,请更改数据工厂的名称(例如改为 yournameADFTutorialDataFactory),并重新尝试创建。If you receive the following error, change the name of the data factory (for example, yournameADFTutorialDataFactory) and try creating again. 有关数据工厂项目命名规则,请参阅数据工厂 - 命名规则一文。See Data Factory - Naming Rules article for naming rules for Data Factory artifacts.

    数据工厂名“ADFTutorialDataFactory”不可用Data factory name “ADFTutorialDataFactory” is not available

  4. 选择要在其中创建数据工厂的 Azure 订阅Select your Azure subscription in which you want to create the data factory.

  5. 对于 资源组,请执行以下步骤之一:For the Resource Group, do one of the following steps:

    • 选择“使用现有资源组”,并从下拉列表选择现有的资源组。 Select Use existing, and select an existing resource group from the drop-down list.

    • 选择“新建”,并输入资源组的名称。Select Create new, and enter the name of a resource group.

      若要了解有关资源组的详细信息,请参阅 使用资源组管理 Azure 资源To learn about resource groups, see Using resource groups to manage your Azure resources.

  6. 选择“V2 (预览)”作为 版本Select V2 (Preview) for the version.

  7. 选择数据工厂的 位置Select the location for the data factory. 下拉列表中仅显示支持的位置。Only locations that are supported are displayed in the drop-down list. 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

  8. 选择“固定到仪表板”。Select Pin to dashboard.

  9. 单击“创建”。Click Create.

  10. 在仪表板上,你会看状态如下的以下磁贴:“正在部署数据工厂”。On the dashboard, you see the following tile with status: Deploying data factory.

    “正在部署数据工厂”磁贴

  11. 创建完成后,可以看到图中所示的“数据工厂”页。After the creation is complete, you see the Data Factory page as shown in the image.

    数据工厂主页

  12. 单击“创作和监视”磁贴,在单独的选项卡中启动 Azure 数据工厂用户界面 (UI)。Click Author & Monitor tile to launch the Azure Data Factory user interface (UI) in a separate tab.

  13. 在“入门”页的左侧面板中,切换到“编辑”选项卡,如下图所示:In the get started page, switch to the Edit tab in the left panel as shown in the following image:

    “创建管道”按钮

创建链接服务Create linked services

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在本部分中,创建 Azure 存储帐户和 Azure SQL 数据库中数据库的链接服务。In this section, you create linked services to your Azure Storage account and your database in Azure SQL Database.

创建 Azure 存储链接服务。Create Azure Storage linked service.

在此步骤中,请将 Azure 存储帐户链接到数据工厂。In this step, you link your Azure Storage Account to the data factory.

  1. 依次单击“连接”、“+ 新建”。 Click Connections, and click + New.

    “新建连接”按钮

  2. 在“新建链接服务”窗口中,选择“Azure Blob 存储”,然后单击“继续”。In the New Linked Service window, select Azure Blob Storage, and click Continue.

    选择“Azure Blob 存储”

  3. 在“新建链接服务”窗口中执行以下步骤:In the New Linked Service window, do the following steps:

    1. 输入 AzureStorageLinkedService 作为 名称Enter AzureStorageLinkedService for Name.
    2. 对于“存储帐户名称”,请选择自己的 Azure 存储帐户。Select your Azure Storage account for Storage account name.
    3. 单击“ 保存”。Click Save.

    Azure 存储帐户设置

创建 Azure SQL 数据库链接服务Create Azure SQL Database linked service.

在此步骤中,请将数据库链接到数据工厂。In this step, you link your database to the data factory.

  1. 依次单击“连接”、“+ 新建”。Click Connections, and click + New.

  2. 在“新建链接服务”窗口中,选择“Azure SQL 数据库”,然后单击“继续”。In the New Linked Service window, select Azure SQL Database, and click Continue.

  3. 在“新建链接服务”窗口中执行以下步骤:In the New Linked Service window, do the following steps:

    1. 对于“名称”字段,请输入 AzureSqlDatabaseLinkedServiceEnter AzureSqlDatabaseLinkedService for the Name field.

    2. 对于“服务器名称”字段,请选择你的服务器。Select your server for the Server name field.

    3. 对于“数据库名称”字段,请选择你的数据库。Select your database for the Database name field.

    4. 对于“用户名”字段,请输入用户的名称。Enter name of the user for the User name field.

    5. 对于“密码”字段,请输入用户的密码。Enter password for the user for the Password field.

    6. 单击“测试连接”以测试连接。Click Test connection to test the connection.

    7. 单击“保存”保存链接服务。Click Save to save the linked service.

      Azure SQL 数据库链接服务设置

创建数据集Create datasets

在此步骤中,请创建表示数据源和数据目标的数据集,In this step, you create datasets to represent data source, data destination. 并创建用于存储 SYS_CHANGE_VERSION 的位置。and the place to store the SYS_CHANGE_VERSION.

创建用于表示源数据的数据集Create a dataset to represent source data

在此步骤中,请创建一个代表源数据的数据集。In this step, you create a dataset to represent the source data.

  1. 在树状视图中,依次单击“+”(加号)、“数据集”。In the treeview, click + (plus), and click Dataset.

    “新建数据集”菜单

  2. 选择“Azure SQL 数据库”,然后单击“完成”。 Select Azure SQL Database, and click Finish.

    源数据集类型 - Azure SQL 数据库

  3. 此时会显示用于配置数据集的新选项卡。You see a new tab for configuring the dataset. 树状视图中也会显示数据集。You also see the dataset in the treeview. 在“属性”窗口中,将数据集的名称更改为 SourceDatasetIn the Properties window, change the name of the dataset to SourceDataset.

    源数据集名称

  4. 切换到“连接”选项卡,然后执行以下步骤:Switch to the Connection tab, and do the following steps:

    1. 为“链接服务”选择“AzureSqlDatabaseLinkedService”。Select AzureSqlDatabaseLinkedService for Linked service.
    2. 为“表”选择“[dbo].[data_source_table]”。Select [dbo].[data_source_table] for Table.

    源连接

创建一个数据集,用于表示复制到接收器数据存储的数据。Create a dataset to represent data copied to sink data store.

在此步骤中,请创建一个数据集,代表从源数据存储复制的数据。In this step, you create a dataset to represent the data that is copied from the source data store. 在执行先决条件中的步骤时,你已在 Azure Blob 存储中创建了 adftutorial 容器。You created the adftutorial container in your Azure Blob Storage as part of the prerequisites. 创建容器(如果不存在),或者将容器设置为现有容器的名称。Create the container if it does not exist (or) set it to the name of an existing one. 在本教程中,输出文件名是使用表达式 @CONCAT('Incremental-', pipeline().RunId, '.txt') 动态生成的。In this tutorial, the output file name is dynamically generated by using the expression: @CONCAT('Incremental-', pipeline().RunId, '.txt').

  1. 在树状视图中,依次单击“+”(加号)、“数据集”。 In the treeview, click + (plus), and click Dataset.

    “新建数据集”菜单

  2. 选择“Azure Blob 存储”,然后单击“完成”。Select Azure Blob Storage, and click Finish.

    接收器数据集类型 - Azure Blob 存储

  3. 此时会显示用于配置数据集的新选项卡。You see a new tab for configuring the dataset. 树状视图中也会显示数据集。You also see the dataset in the treeview. 在“属性”窗口中,将数据集的名称更改为 SinkDatasetIn the Properties window, change the name of the dataset to SinkDataset.

    接收器数据集 - 名称

  4. 在“属性”窗口中切换到“连接”选项卡,然后执行以下步骤:Switch to the Connection tab in the Properties window, and do the following steps:

    1. 为“链接服务”选择“AzureStorageLinkedService”。Select AzureStorageLinkedService for Linked service.

    2. 输入 adftutorial/incchgtracking 作为 filePath文件夹 部分。Enter adftutorial/incchgtracking for folder part of the filePath.

    3. 输入 @CONCAT('Incremental-', pipeline().RunId, '.txt') 作为 filePath 的文件部分。Enter @CONCAT('Incremental-', pipeline().RunId, '.txt') for file part of the filePath.

      接收器数据集 - 连接

创建用于表示更改跟踪数据的数据集Create a dataset to represent change tracking data

在此步骤中,请创建用于存储更改跟踪版本的数据集。In this step, you create a dataset for storing the change tracking version. 在执行先决条件中的步骤时,你已创建了 table_store_ChangeTracking_version 表。You created the table table_store_ChangeTracking_version as part of the prerequisites.

  1. 在树状视图中,依次单击“+”(加号)、“数据集”。In the treeview, click + (plus), and click Dataset.

  2. 选择“Azure SQL 数据库”,然后单击“完成”。Select Azure SQL Database, and click Finish.

  3. 此时会显示用于配置数据集的新选项卡。You see a new tab for configuring the dataset. 树状视图中也会显示数据集。You also see the dataset in the treeview. 在“属性”窗口中,将数据集的名称更改为 ChangeTrackingDatasetIn the Properties window, change the name of the dataset to ChangeTrackingDataset.

  4. 切换到“连接”选项卡,然后执行以下步骤:Switch to the Connection tab, and do the following steps:

    1. 为“链接服务”选择“AzureSqlDatabaseLinkedService”。Select AzureSqlDatabaseLinkedService for Linked service.
    2. 为“表”选择“[dbo].[table_store_ChangeTracking_version]”。Select [dbo].[table_store_ChangeTracking_version] for Table.

创建用于完整复制的管道Create a pipeline for the full copy

在这一步,请创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。In this step, you create a pipeline with a copy activity that copies the entire data from the source data store (Azure SQL Database) to the destination data store (Azure Blob Storage).

  1. 依次单击左窗格中的“+”(加号)、“管道”。Click + (plus) in the left pane, and click Pipeline.

    屏幕截图显示数据工厂的“管道”选项。

  2. 此时会显示用于配置管道的新选项卡。You see a new tab for configuring the pipeline. 树状视图中也会显示管道。You also see the pipeline in the treeview. 在“属性”窗口中,将管道的名称更改为 FullCopyPipelineIn the Properties window, change the name of the pipeline to FullCopyPipeline.

    屏幕截图显示已输入名称的管道。

  3. 在“活动”工具箱中展开“数据流”,将“复制”活动拖放到管道设计器图面,然后设置名称 FullCopyActivityIn the Activities toolbox, expand Data Flow, and drag-drop the Copy activity to the pipeline designer surface, and set the name FullCopyActivity.

    完整复制活动 - 名称

  4. 切换到“源”选项卡,为“源数据集”字段选择“SourceDataset”。Switch to the Source tab, and select SourceDataset for the Source Dataset field.

    “复制”活动 - 源

  5. 切换到“接收器”选项卡,为“接收器数据集”字段选择“SinkDataset”。 Switch to the Sink tab, and select SinkDataset for the Sink Dataset field.

    复制活动 - 接收器

  6. 若要验证管道定义,请单击工具栏中的“验证”。To validate the pipeline definition, click Validate on the toolbar. 确认没有任何验证错误。Confirm that there is no validation error. 单击 >> 关闭“管道验证报告”。Close the Pipeline Validation Report by clicking >>.

    验证管道

  7. 若要发布实体(链接服务、数据集和管道),请单击“发布”。To publish entities (linked services, datasets, and pipelines), click Publish. 等待发布成功。Wait until the publishing succeeds.

    屏幕截图显示已标注“全部发布”按钮的数据工厂。

  8. 等待“已成功发布”消息出现。Wait until you see the Successfully published message.

    发布成功

  9. 此外,可以通过单击左侧的“显示通知”按钮来查看通知。You can also see notifications by clicking the Show Notifications button on the left. 若要关闭通知窗口,请单击“X”。To close the notifications window, click X.

    显示通知

运行完整的复制管道Run the full copy pipeline

在工具栏中单击管道对应的“触发器”,然后单击“立即触发”。Click Trigger on the toolbar for the pipeline, and click Trigger Now.

屏幕截图显示了从“触发器”菜单中选择的“立即触发”选项。

监视完整的复制管道Monitor the full copy pipeline

  1. 单击左侧的“监视”选项卡。Click the Monitor tab on the left. 可以在列表中查看管道运行及其状态。You see the pipeline run in the list and its status. 若要刷新列表,请单击“刷新”。To refresh the list, click Refresh. 使用“操作”列中的链接可以查看与管道运行关联的活动运行,以及重新运行管道。The links in the Actions column let you view activity runs associated with the pipeline run and to rerun the pipeline.

    屏幕截图显示了数据工厂的管道运行。

  2. 若要查看与管道运行关联的活动运行,请单击“操作”列中的“查看活动运行”链接。To view activity runs associated with the pipeline run, click the View Activity Runs link in the Actions column. 该管道中只有一个活动,因此列表中只显示了一个条目。There is only one activity in the pipeline, so you see only one entry in the list. 若要切换回到管道运行视图,请单击顶部的“管道”链接。To switch back to the pipeline runs view, click Pipelines link at the top.

    屏幕截图显示了已标注“管道”链接的数据工厂的活动运行。

查看结果Review the results

可以在 adftutorial 容器的 incchgtracking 文件夹中看到名为 incremental-<GUID>.txt 的文件。You see a file named incremental-<GUID>.txt in the incchgtracking folder of the adftutorial container.

来自完整复制的输出文件

该文件应包含数据库中的数据:The file should have the data from your database:

1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22

向源表中添加更多数据Add more data to the source table

对数据库运行以下查询来添加和更新行。Run the following query against your database to add a row and update a row.

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

创建用于增量复制的管道Create a pipeline for the delta copy

在此步骤中,请创建一个包含以下活动的管道并定期运行。In this step, you create a pipeline with the following activities, and run it periodically. 查找活动 从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。The lookup activities get the old and new SYS_CHANGE_VERSION from Azure SQL Database and pass it to copy activity. 复制活动 将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。The copy activity copies the inserted/updated/deleted data between the two SYS_CHANGE_VERSION values from Azure SQL Database to Azure Blob Storage. 存储过程活动 更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。The stored procedure activity updates the value of SYS_CHANGE_VERSION for the next pipeline run.

  1. 在数据工厂 UI 中,切换到“编辑”选项卡。依次单击左窗格中的“+”(加号)、“管道”。In the Data Factory UI, switch to the Edit tab. Click + (plus) in the left pane, and click Pipeline.

    屏幕截图显示了如何在数据工厂中创建管道。

  2. 此时会显示用于配置管道的新选项卡。You see a new tab for configuring the pipeline. 树状视图中也会显示管道。You also see the pipeline in the treeview. 在“属性”窗口中,将管道的名称更改为 IncrementalCopyPipelineIn the Properties window, change the name of the pipeline to IncrementalCopyPipeline.

    管道名称

  3. 在“活动”工具箱中展开“常规”, 将 查找 活动拖放到管道设计器图面。Expand General in the Activities toolbox, and drag-drop the Lookup activity to the pipeline designer surface. 将活动的名称设置为 LookupLastChangeTrackingVersionActivitySet the name of the activity to LookupLastChangeTrackingVersionActivity. 此活动获取在上次复制操作中使用的、存储在 table_store_ChangeTracking_version 表中的更改跟踪版本。This activity gets the change tracking version used in the last copy operation that is stored in the table table_store_ChangeTracking_version.

    屏幕截图显示了包含查找活动的管道。

  4. 在“属性”窗口中切换到“设置”,为“源数据集”字段选择“ChangeTrackingDataset”。Switch to the Settings in the Properties window, and select ChangeTrackingDataset for the Source Dataset field.

    屏幕截图显示了“属性”窗口中的“设置”选项卡。

  5. 将“查找”活动从“活动”工具箱拖放到管道设计器图面。Drag-and-drop the Lookup activity from the Activities toolbox to the pipeline designer surface. 将活动的名称设置为 LookupCurrentChangeTrackingVersionActivitySet the name of the activity to LookupCurrentChangeTrackingVersionActivity. 此活动获取当前的更改跟踪版本。This activity gets the current change tracking version.

    屏幕截图显示了包含两个查找活动的管道。

  6. 在“属性”窗口中切换到“设置”选项卡,然后执行以下步骤:Switch to the Settings in the Properties window, and do the following steps:

    1. 对于“源数据集”字段,请选择“SourceDataset”。Select SourceDataset for the Source Dataset field.

    2. 为“使用查询”选择“查询”。Select Query for Use Query.

    3. 为“查询”输入以下 SQL 查询。Enter the following SQL query for Query.

      SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion
      

      屏幕截图显示了已添加到“属性”窗口中的“设置”选项卡的查询。

  7. 在“活动”工具箱中展开“数据流”,将“复制”活动拖放到管道设计器图面。In the Activities toolbox, expand Data Flow, and drag-drop the Copy activity to the pipeline designer surface. 将活动的名称设置为 IncrementalCopyActivitySet the name of the activity to IncrementalCopyActivity. 此活动将上次跟踪版本与当前更改跟踪版本之间的数据复制到目标数据存储。This activity copies the data between last change tracking version and the current change tracking version to the destination data store.

    复制活动 - 名称

  8. 在“属性”窗口中切换到“源”选项卡,然后执行以下步骤:Switch to the Source tab in the Properties window, and do the following steps:

    1. 为“源数据集”选择“SourceDataset”。Select SourceDataset for Source Dataset.

    2. 为“使用查询”选择“查询”。Select Query for Use Query.

    3. 为“查询”输入以下 SQL 查询。Enter the following SQL query for Query.

      select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
      

      复制活动 - 源设置

  9. 切换到“接收器”选项卡,为“接收器数据集”字段选择“SinkDataset”。Switch to the Sink tab, and select SinkDataset for the Sink Dataset field.

    复制活动 - 接收器设置

  10. 逐个地 将两个“查找”活动连接到“复制”活动Connect both Lookup activities to the Copy activity one by one. 将附加到“查找”活动的 绿色 按钮拖放到“复制”活动。Drag the green button attached to the Lookup activity to the Copy activity.

    连接“查找”和“复制”活动

  11. 将“存储过程”活动从“活动”工具箱拖放到管道设计器图面。Drag-and-drop the Stored Procedure activity from the Activities toolbox to the pipeline designer surface. 将活动的名称设置为 StoredProceduretoUpdateChangeTrackingActivitySet the name of the activity to StoredProceduretoUpdateChangeTrackingActivity. 此活动更新 table_store_ChangeTracking_version 表中的更改跟踪版本。This activity updates the change tracking version in the table_store_ChangeTracking_version table.

    存储过程活动 - 名称

  12. 切换到“SQL 帐户”*选项卡,为“链接服务”选择“AzureSqlDatabaseLinkedService”。Switch to the SQL Account* tab, and select AzureSqlDatabaseLinkedService for Linked service.

    存储过程活动 - SQL 帐户

  13. 切换到“存储过程”选项卡,然后执行以下步骤:Switch to the Stored Procedure tab, and do the following steps:

    1. 至于“存储过程名称”,请选择 Update_ChangeTracking_VersionFor Stored procedure name, select Update_ChangeTracking_Version.

    2. 选择“导入参数”。Select Import parameter.

    3. 在“存储过程参数”部分,指定以下参数值:In the Stored procedure parameters section, specify following values for the parameters:

      名称Name 类型Type Value
      CurrentTrackingVersionCurrentTrackingVersion Int64Int64 @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
      TableNameTableName 字符串String @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}

      存储过程活动 - 参数

  14. 将“复制”活动连接到存储过程活动Connect the Copy activity to the Stored Procedure Activity. 将附加到“复制”活动的 绿色 按钮拖放到“存储过程”活动。Drag-and-drop the green button attached to the Copy activity to the Stored Procedure activity.

    连接“复制”和“存储过程”活动

  15. 在工具栏中单击“验证”。Click Validate on the toolbar. 确认没有任何验证错误。Confirm that there are no validation errors. 单击 >> 关闭“管道验证报告”窗口。Close the Pipeline Validation Report window by clicking >>.

    “验证”按钮

  16. 单击“全部发布”按钮,将实体(链接服务、数据集和管道)发布到数据工厂服务。Publish entities (linked services, datasets, and pipelines) to the Data Factory service by clicking the Publish All button. 等到“发布成功”消息出现。Wait until you see the Publishing succeeded message.

    屏幕截图显示了数据工厂的“全部发布”按钮。

运行增量复制管道Run the incremental copy pipeline

  1. 在工具栏中单击管道对应的“触发器”,然后单击“立即触发”。Click Trigger on the toolbar for the pipeline, and click Trigger Now.

    屏幕截图显示了一个包含活动的管道以及从“触发器”菜单中选择的“立即触发”选项。

  2. 在“管道运行”窗口中选择“完成”。 In the Pipeline Run window, select Finish.

监视增量复制管道Monitor the incremental copy pipeline

  1. 单击左侧的“监视”选项卡。Click the Monitor tab on the left. 可以在列表中查看管道运行及其状态。You see the pipeline run in the list and its status. 若要刷新列表,请单击“刷新”。To refresh the list, click Refresh. 使用“操作”列中的链接可以查看与管道运行关联的活动运行,以及重新运行管道。The links in the Actions column let you view activity runs associated with the pipeline run and to rerun the pipeline.

    屏幕截图显示了数据工厂的管道运行,包括你的管道。

  2. 若要查看与管道运行关联的活动运行,请单击“操作”列中的“查看活动运行”链接。To view activity runs associated with the pipeline run, click the View Activity Runs link in the Actions column. 该管道中只有一个活动,因此列表中只显示了一个条目。There is only one activity in the pipeline, so you see only one entry in the list. 若要切换回到管道运行视图,请单击顶部的“管道”链接。To switch back to the pipeline runs view, click Pipelines link at the top.

    屏幕截图显示了数据工厂的管道运行,其中几个已标记为“成功”。

查看结果Review the results

可以在 adftutorial 容器的 incchgtracking 文件夹中看到第二个文件。You see the second file in the incchgtracking folder of the adftutorial container.

来自增量复制的输出文件

该文件应该只包含数据库中的增量数据。The file should have only the delta data from your database. U 的记录是数据库中的更新行,带 I 的记录是添加的行。The record with U is the updated row in the database and I is the one added row.

1,update,10,2,U
6,new,50,1,I

前三个列是 data_source_table 中的更改数据。The first three columns are changed data from data_source_table. 最后两个列是更改跟踪系统表中的元数据。The last two columns are the metadata from change tracking system table. 第四列是每个更改行的 SYS_CHANGE_VERSION。The fourth column is the SYS_CHANGE_VERSION for each changed row. 第五列是操作:U = update(更新),I = insert(插入)。The fifth column is the operation: U = update, I = insert. 如需详细了解更改跟踪信息,请参阅 CHANGETABLEFor details about the change tracking information, see CHANGETABLE.

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

后续步骤Next steps

继续查看以下教程,了解如何仅基于 LastModifiedDate 来复制新的和更改的文件:Advance to the following tutorial to learn about copying new and changed files only based on their LastModifiedDate: