准备查询变更数据Prepare to Query for the Change Data

适用于:Applies to: 是SQL ServerSQL Server(所有支持的版本)yesSQL ServerSQL Server (all supported versions) 是 Azure 数据工厂中的 SSIS Integration RuntimeSSIS Integration Runtime in Azure Data Factoryyes Azure 数据工厂中的 SSIS Integration RuntimeSSIS Integration Runtime in Azure Data Factory适用于:Applies to: 是SQL ServerSQL Server(所有支持的版本)yesSQL ServerSQL Server (all supported versions) 是 Azure 数据工厂中的 SSIS Integration RuntimeSSIS Integration Runtime in Azure Data Factoryyes Azure 数据工厂中的 SSIS Integration RuntimeSSIS Integration Runtime in Azure Data Factory

在用于执行变更数据增量加载的 Integration ServicesIntegration Services 包的控制流中,第三个任务(即最后一个任务)是准备查询变更数据和添加数据流任务。In the control flow of an Integration ServicesIntegration Services package that performs an incremental load of change data, the third and final task is to prepare to query for the change data and add a Data Flow task.

备注

控制流的第二个任务是确保所选间隔的变更数据已准备就绪。The second task for the control flow is to ensure that the change data for the selected interval is ready. 有关此任务的详细信息,请参阅 确定变更数据是否已准备就绪For more information about this task, see Determine Whether the Change Data Is Ready. 有关设计控制流的总体过程的说明,请参阅变更数据捕获 (SSIS)For a description of the overall process of designing the control flow, see Change Data Capture (SSIS).

设计注意事项Design Considerations

为了检索变更数据,您将调用 Transact-SQL 表值函数,该函数将间隔的端点接受为输入参数并返回指定间隔的变更数据。To retrieve the change data, you will call a Transact-SQL table-valued function that accepts the endpoints of the interval as input parameters and returns change data for the specified interval. 数据流中的源组件调用此函数。A source component in the data flow calls this function. 有关此源组件的信息,请参阅 检索和了解变更数据For information about this source component, see Retrieve and Understand the Change Data.

最常用的 Integration ServicesIntegration Services 源组件(包括 OLE DB 源、ADO 源和 ADO NET 源)无法为表值函数派生参数信息。The most frequently used Integration ServicesIntegration Services source components, including the OLE DB source, the ADO source, and the ADO NET source, cannot derive parameter information for a table-valued function. 因此,大多数源不能直接调用参数化的函数。Therefore, most sources cannot call a parameterized function directly.

您可以使用以下两个设计选项将输入参数传递到函数:You have two design options for passing the input parameters to the function:

  • 将参数化查询组合为字符串Assemble the parameterized query as a string. 可以使用脚本任务或执行 SQL 任务将动态的 SQL 字符串与硬编码的参数值组合为字符串。You can use a Script task or an Execute SQL task to assemble a dynamic SQL string with parameter values hard-coded into the string. 然后,将该字符串存储在包变量中,并用它来设置源组件的 SqlCommand 属性。Then, you can store this string in a package variable and use it to set the SqlCommand property of a source component. 此方法之所以成功,是因为源组件不再需要参数信息。This approach succeeds because the source component no longer requires parameter information.

    备注

    预编译的脚本产生的开销小于执行 SQL 任务。A precompiled script incurs less overhead than an Execute SQL task.

  • 使用参数化的包装Use a parameterized wrapper. 此外,还可以以包装的形式创建一个参数化的存储过程,以调用参数化的表值函数。Alternatively, you can create a parameterized stored procedure as a wrapper that calls the parameterized table-valued function. 此方法之所以成功,是因为源组件可以成功地为存储过程派生参数信息。This approach succeeds because a source component can successfully derive parameter information for a stored procedure.

本主题使用第一个设计选项,即将参数化查询组合为字符串。This topic uses the first design option and assembles a parameterized query as a string.

准备查询Preparing the Query

将输入参数的值连接到单个查询字符串中之前,必须设置查询所需的包变量。Before you can concatenate the values of the input parameters into a single query string, you have to set up the package variables that the query needs.

设置包变量To set up package variables

  • SQL Server Data Tools (SSDT)SQL Server Data Tools (SSDT)中的 “变量” 窗口中,创建一个数据类型为 string 的变量以保存执行 SQL 任务返回的查询字符串。In SQL Server Data Tools (SSDT)SQL Server Data Tools (SSDT), in the Variables window, create a variable with a string data type to hold the query string returned by the Execute SQL Task.

    以下示例使用变量名 SqlDataQuery。This example uses the variable name, SqlDataQuery.

创建包变量后,可以使用脚本任务或执行 SQL 任务连接输入参数的值。With the package variable created, you can use either a Script task or Execute SQL task to concatenate the values of the input parameters. 下面的两个过程介绍如何配置这些组件。The following two procedures describe how to configure these components.

使用脚本任务连接查询字符串To use a Script task to concatenate the query string

  1. “控制流” 选项卡上,在包中的 For 循环容器后添加一个脚本任务,然后将 For 循环容器连接到该任务。On the Control Flow tab, add a Script task to the package after the For Loop container and connect the For Loop container to the task.

    备注

    此过程假定包从一个表中执行增量加载。This procedure assumes that the package performs an incremental load from a single table. 如果包从多个表加载并且具有包含多个子包的父包,则将该任务作为第一个组件添加到各子包中。If the package loads from multiple tables and has a parent package with multiple child packages, this task would be added as the first component to each child package. 有关详细信息,请参阅 执行多个表的增量加载For more information, see Perform an Incremental Load of Multiple Tables.

  2. “脚本任务编辑器” 中的 “脚本” 页上,选择以下选项:In the Script Task Editor, on the Script page, select the following options:

    1. 对于 ReadOnlyVariables,从列表中选择 User::DataReadyUser::ExtractStartTimeUser::ExtractEndTimeFor ReadOnlyVariables, select User::DataReady, User::ExtractStartTime, and User::ExtractEndTime from the.

    2. 对于 ReadWriteVariables,从列表中选择 User::SqlDataQuery。For ReadWriteVariables, select User::SqlDataQuery from the list.

  3. “脚本任务编辑器”“脚本” 页上,单击 “编辑脚本” 以打开脚本开发环境。In the Script Task Editor, on the Script page, click Edit Script to open the script development environment.

  4. 在 Main 过程中,输入下面的代码段之一:In the Main procedure, enter one of the following code segments:

    • 如果使用 C# 进行编程,请输入下面的代码行:If you are programming in C#, enter the following lines of code:

      int dataReady;  
      System.DateTime extractStartTime;  
      System.DateTime extractEndTime;  
      string sqlDataQuery;  
      
      dataReady = (int)Dts.Variables["DataReady"].Value;  
      extractStartTime = (System.DateTime)Dts.Variables["ExtractStartTime"].Value;  
      extractEndTime = (System.DateTime)Dts.Variables["ExtractEndTime"].Value;  
      
      if (dataReady == 2)  
        {  
        sqlDataQuery = "SELECT * FROM CDCSample.uf_Customer('" + string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractStartTime) + "', '" + string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) + "')";  
        }  
      else  
        {  
        sqlDataQuery = "SELECT * FROM CDCSample.uf_Customer(null" + ", '" + string.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) + "')";  
        }  
      
      Dts.Variables["SqlDataQuery"].Value = sqlDataQuery;  
      

      - 或 -- or -

    • 如果您是使用 Visual BasicVisual Basic进行编程,请输入下面的代码行:If you are programming in Visual BasicVisual Basic, enter the following lines of code:

      Dim dataReady As Integer  
      Dim extractStartTime As Date  
      Dim extractEndTime As Date  
      Dim sqlDataQuery As String  
      
      dataReady = CType(Dts.Variables("DataReady").Value, Integer)  
      extractStartTime = CType(Dts.Variables("ExtractStartTime").Value, Date)  
      extractEndTime = CType(Dts.Variables("ExtractEndTime").Value, Date)  
      
      If dataReady = 2 Then  
        sqlDataQuery = "SELECT * FROM CDCSample.uf_Customer('" & _  
            String.Format("{0:yyyy-MM-dd hh:mm:ss}", extractStartTime) & _  
            "', '" & _  
            String.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) & _  
            "')"  
      Else  
        sqlDataQuery = "SELECT * FROM CDCSample.uf_Customer(null" & _  
            ", '" & _  
            String.Format("{0:yyyy-MM-dd hh:mm:ss}", extractEndTime) & _  
            "')"  
      End If  
      
      Dts.Variables("SqlDataQuery").Value = sqlDataQuery  
      
      
  5. 保留从脚本执行过程返回 DtsExecResult.Success 的默认代码行。Leave the default line of code which returns DtsExecResult.Success from the execution of the script.

  6. 关闭脚本开发环境和 “脚本任务编辑器”Close the script development environment and the Script Task Editor.

使用执行 SQL 任务连接查询字符串To use an Execute SQL task to concatenate the query string

  1. “控制流” 选项卡上,在 For 循环容器之后向包添加一个执行 SQL 任务,然后将 For 循环容器连接到该任务。On the Control Flow tab, add an Execute SQL task to the package after the For Loop container and connect the For Loop container to this task.

    备注

    此过程假定包从一个表中执行增量加载。This procedure assumes that the package performs an incremental load from a single table. 如果包从多个表加载并且具有包含多个子包的父包,则将该任务作为第一个组件添加到各子包中。If the package loads from multiple tables and has a parent package with multiple child packages, this task would be added as the first component to each child package. 有关详细信息,请参阅 执行多个表的增量加载For more information, see Perform an Incremental Load of Multiple Tables.

  2. “执行 SQL 任务编辑器” 中的 “常规” 页上,选择以下选项:In the Execute SQL Task Editor, on the General page, select the following options:

    1. 对于 ResultSet,选择 “单行”For ResultSet, select Single row.

    2. 配置到源数据库的有效连接。Configure a valid connection to the source database.

    3. 对于 SQLSourceType,选择 “直接输入”For SQLSourceType, select Direct input.

    4. 对于 SQLStatement,输入以下 SQL 语句:For SQLStatement, enter the following SQL statement:

      declare @ExtractStartTime datetime,  
      @ExtractEndTime datetime,   
      @DataReady int  
      
      select @DataReady = ?,   
      @ExtractStartTime = ?,   
      @ExtractEndTime = ?  
      
      if @DataReady = 2  
      select N'select * from CDCSample.uf_Customer'  
      + N'('''+ convert(nvarchar(30),@ExtractStartTime,120)  
      + ''', '''  
      + convert(nvarchar(30),@ExtractEndTime,120) + ''') '   
      as SqlDataQuery  
      else  
      select N'select * from CDCSample.uf_Customer'  
      + N'(null, '''  
      + convert(nvarchar(30),@ExtractEndTime,120)  
      + ''') '  
      as SqlDataQuery  
      
      

      备注

      上述示例中的 else 子句通过为开始日期和时间传递 null 值来生成查询,用于变更数据的首次加载。The else clause in this sample generates a query for the initial load of change data by passing a null value for the starting date and time. 此示例没有涉及到一种情况:必须将启用变更数据捕获功能之前所做的变更上传到数据仓库。This sample does not address the scenario in which changes that were made before change data capture was enabled also have to be uploaded to the data warehouse.

  3. “执行 SQL 任务编辑器”“参数映射” 页上,进行以下映射:On the Parameter Mapping page of the Execute SQL Task Editor, do the following mapping:

    1. 将 DataReady 变量映射到参数 0。Map the DataReady variable to parameter 0.

    2. 将 ExtractStartTime 变量映射到参数 1。Map the ExtractStartTime variable to parameter 1.

    3. 将 ExtractEndTime 变量映射到参数 2。Map the ExtractEndTime variable to parameter 2.

  4. “执行 SQL 任务编辑器”“结果集” 页上,将结果名称映射到 SqlDataQuery 变量。On the Result Set page of the Execute SQL Task Editor, map the Result Name to the SqlDataQuery variable.

    该结果名称是返回的单列的名称 SqlDataQuery。The Result Name is the name of the single column that is returned, SqlDataQuery.

上述过程配置的任务使用输入参数的硬编码字符串值准备查询字符串。The previous procedures configure a task that prepares a query string with hard-coded string values for the input parameters. 下面的代码是此查询字符串的示例:The following code is an example of such a query string:

select * from CDCSample. uf_Customer('2007-06-11 14:21:58', '2007-06-12 14:21:58')

添加数据流任务Adding a Data Flow Task

设计包的控制流的最后一步是添加数据流任务。The last step in designing the control flow for the package is to add a Data Flow task.

添加数据流任务和完成控制流To add a Data Flow task and complete the control flow

  • “控制流” 选项卡上,添加一个数据流任务,并连接用于连接查询字符串的任务。On the Control Flow tab, add a Data Flow task and connect the task that concatenated the query string.

下一步Next Step

在准备查询字符串和配置数据流任务之后,下一步就是创建用于从数据库检索变更数据的表值函数。After you prepare the query string and configure the Data Flow task, the next step is create the table-valued function that will retrieve the change data from the database.

下一个主题: 创建函数以检索变更数据Next topic: Create the Function to Retrieve the Change Data