비동기 출력을 사용하여 사용자 지정 변환 구성 요소 개발Developing a Custom Transformation Component with Asynchronous Outputs

변환이 구성 요소에서 입력 행을 모두 받을 때까지 행을 출력할 수 없거나 변환이 입력으로 받은 각 행에 대해 출력 행을 정확히 하나만 생성하지 않는 경우에는 비동기 출력을 사용하는 구성 요소를 사용합니다.You use a component with asynchronous outputs when a transform cannot output rows until the component has received all its input rows, or when the transformation does not produce exactly one output row for each row received as input. 예를 들어 집계 변환은 행을 모두 읽기 전까지는 행 합계를 계산할 수 없습니다.The Aggregate transformation, for example, cannot calculate a sum across rows until it has read all the rows. 반면 각 데이터 행이 전달될 때 해당 행을 수정하는 경우에는 언제든지 동기 출력을 사용하는 구성 요소를 사용할 수 있습니다.In contrast, you can use a component with synchronous outputs any time when you modify each row of data as it passes through. 각 행의 데이터를 현재 위치에서 수정하거나 각 입력 행의 값을 각각 포함하는 새 행을 하나 이상 만들 수 있습니다.You can modify the data for each row in place, or you can create one or more new columns, each of which has a value for every one of the input rows. 동기 및 비동기 구성 요소 간의 차이 대 한 자세한 내용은 참조 이해 동기 및 비동기 변환합니다.For more information about the difference between synchronous and asynchronous components, see Understanding Synchronous and Asynchronous Transformations.

비동기 출력을 사용하는 변환 구성 요소는 대상 구성 요소와 원본 구성 요소 모두로 작동하므로 고유합니다.Transformation components with asynchronous outputs are unique because they act as both destination and source components. 이러한 종류의 구성 요소는 업스트림 구성 요소에서 행을 받고 다운스트림 구성 요소에서 사용되는 행을 추가합니다.This kind of component receives rows from upstream components, and adds rows that are consumed by downstream components. 다른 데이터 흐름 구성 요소는 이 두 작업을 모두 수행하지 않습니다.No other data flow component performs both of these operations.

동기 출력을 사용하는 구성 요소에서 사용할 수 있는 업스트림 구성 요소의 열은 해당 구성 요소의 다운스트림 구성 요소에서도 자동으로 사용할 수 있습니다.The columns from upstream components that are available to a component with synchronous outputs are automatically available to components downstream from the component. 따라서 동기 출력을 사용하는 구성 요소에서는 다음 구성 요소에 열과 행을 제공하기 위해 출력 열을 정의할 필요가 없습니다.Therefore, a component with synchronous outputs does not have to define any output columns to provide columns and rows to the next component. 반면 비동기 출력을 사용하는 구성 요소에서는 출력 열을 제공하고 다운스트림 구성 요소에 행을 제공해야 합니다.Components with asynchronous outputs, on the other hand, must define output columns and provide rows to downstream components. 따라서 비동기 출력을 사용하는 구성 요소에는 디자인 및 실행 시 수행해야 할 태스크가 더 많이 있으며 해당 구성 요소 개발자는 더 많은 코드를 구현해야 합니다.Therefore a component with asynchronous outputs has more tasks to perform during both design and execution time, and the component developer has more code to implement.

SQL ServerSQL Server Integration ServicesIntegration Services 비동기 출력을 사용 하는 몇몇 변환이 포함 되어 있습니다. Integration ServicesIntegration Services contains several transformations with asynchronous outputs. 예를 들어 정렬 변환의 경우에는 행을 정렬하기 전에 모든 행이 있어야 하며 행을 정렬하는 데는 비동기 출력이 사용됩니다.For example, the Sort transformation requires all its rows before it can sort them, and achieves this by using asynchronous outputs. 정렬 변환에서는 행을 모두 받은 후 이를 정렬하고 출력에 추가합니다.After it has received all its rows, it sorts them and adds them to its output.

이 섹션에서는 비동기 출력을 사용하는 변환의 개발 방법을 자세히 설명합니다.This section explains in detail how to develop transformations with asynchronous outputs. 원본 구성 요소 개발에 대 한 자세한 내용은 참조 사용자 지정 원본 구성 요소 개발합니다.For more information about source component development, see Developing a Custom Source Component.

디자인 타임Design Time

구성 요소 만들기Creating the Component

SynchronousInputID 개체의 IDTSOutput100 속성은 출력이 동기적인지 비동기적인지를 식별합니다.The SynchronousInputID property on the IDTSOutput100 object identifies whether an output is synchronous or asynchronous. 비동기 출력을 만들려면 구성 요소에 출력을 추가하고 SynchronousInputID를 0으로 설정합니다.To create an asynchronous output, add the output to the component and set the SynchronousInputID to zero. 이 속성을 설정하면 데이터 흐름 태스크에서 구성 요소의 입력과 출력 모두에 대해 PipelineBuffer 개체를 할당할지, 아니면 단일 버퍼만 할당하여 두 개체 간에 이를 공유할지도 결정됩니다.Setting this property also determines whether the data flow task allocates PipelineBuffer objects for both the input and output of the component, or whether a single buffer is allocated and shared between the two objects.

다음 예제 코드에서는 ProvideComponentProperties 구현에서 비동기 출력을 만드는 구성 요소를 보여 줍니다.The following sample code shows a component that creates an asynchronous output in its ProvideComponentProperties implementation.

using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  

namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]  
    public class AsyncComponent : PipelineComponent  
    {  
        public override void ProvideComponentProperties()  
        {  
            // Call the base class, which adds a synchronous input  
            // and output.  
            base.ProvideComponentProperties();  

            // Make the output asynchronous.  
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
            output.SynchronousInputID = 0;  
        }  
    }  
}  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  

<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _  
Public Class AsyncComponent  
    Inherits PipelineComponent  

    Public Overrides Sub ProvideComponentProperties()  

        ' Call the base class, which adds a synchronous input  
        ' and output.  
        Me.ProvideComponentProperties()  

        ' Make the output asynchronous.  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
        output.SynchronousInputID = 0  

    End Sub  

End Class  

출력 열 만들기 및 구성Creating and Configuring Output Columns

앞에서 설명한 대로 비동기 구성 요소는 출력 열 컬렉션에 열을 추가하여 다운스트림 구성 요소에 열을 제공합니다.As mentioned earlier, an asynchronous component adds columns to its output column collection to provide columns to downstream components. 디자인 타임에 선택할 수 있는 메서드는 구성 요소의 필요에 따라 몇 가지가 있습니다.There are several design-time methods to choose from, depending on the needs of the component. 예를 들어 모든 열을 업스트림 구성 요소에서 다운스트림 구성 요소로 전달하려는 경우에는 구성 요소에서 입력 열을 사용할 수 있는 첫 번째 메서드가 OnInputPathAttached 메서드이므로 이 메서드를 재정의하여 열을 추가합니다.For example, if you want to pass all the columns from the upstream components to the downstream components, you would override the OnInputPathAttached method to add the columns, because this is the first method in which the input columns are available to the component.

구성 요소에서 입력에 대해 선택된 열을 기준으로 출력 열을 만드는 경우에는 SetUsageType 메서드를 재정의하여 출력 열을 선택하고 해당 열의 사용 방법을 나타냅니다.If the component creates output columns based on the columns selected for its input, override the SetUsageType method to select the output columns and to indicate how they will be used.

비동기 출력을 사용하는 구성 요소에서 업스트림 구성 요소의 열을 기준으로 출력 열을 만들며 사용 가능한 업스트림 구성 요소가 변경되는 경우에는 구성 요소에서 출력 열 컬렉션을 업데이트해야 합니다.If a component with asynchronous outputs creates output columns based on the columns from upstream components, and the available upstream columns change, the component should update its output column collection. 구성 요소에서는 Validate가 실행될 때 이러한 변경 내용을 검색하고 ReinitializeMetaData가 실행될 때 이를 수정해야 합니다.These changes should be detected by the component during Validate, and fixed during ReinitializeMetaData.

참고

출력 열이 출력 열 컬렉션에서 제거되면 데이터 흐름에서 해당 열을 참조하는 다운스트림 구성 요소가 부정적인 영향을 받습니다.When an output column is removed from the output column collection, downstream components in the data flow that reference the column are adversely affected. 다운스트림 구성 요소가 손상되지 않도록 하려면 출력 열을 제거하거나 다시 만들지 않고 해당 열을 복구해야 합니다.The output column must be repaired without removing and recreating the column to prevent breaking the downstream components. 예를 들어 열의 데이터 형식이 변경된 경우에는 데이터 형식을 업데이트해야 합니다.For example, if the data type of the column has changed, you must update the data type.

다음 코드 예에서는 업스트림 구성 요소의 사용 가능한 각 열에 대해 해당 출력 열 컬렉션에 출력 열을 추가하는 구성 요소를 보여 줍니다.The following code example shows a component that adds an output column to its output column collection for each column available from the upstream component.

public override void OnInputPathAttached(int inputID)  
{  
   IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);  
   IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
   IDTSVirtualInput100 vInput = input.GetVirtualInput();  

   foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)  
   {  
      IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();  
      outCol.Name = vCol.Name;  
      outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);  
   }  
}  
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)  

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
    Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()  

    For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection  

        Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()  
        outCol.Name = vCol.Name  
        outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)  

    Next  
End Sub  

런타임Run Time

또한 비동기 출력을 사용하는 구성 요소에서는 런타임에 다른 유형의 구성 요소와는 다른 일련의 메서드를 실행합니다.Components with asynchronous outputs also execute a different sequence of methods at run time than other types of components. 먼저 이러한 구성 요소는 PrimeOutputProcessInput 메서드에 대한 호출을 모두 받는 유일한 구성 요소입니다.First, they are the only components that receive a call to both the PrimeOutput and the ProcessInput methods. 또한 비동기 출력을 사용하는 구성 요소에서는 처리를 시작하기 전에 들어오는 모든 행에 액세스해야 하므로 모든 행을 읽기 전까지 입력 행을 내부에 캐시해야 합니다.Components with asynchronous outputs also require access to all the incoming rows before they can start processing; therefore, they must cache the input rows internally until all rows have been read. 마지막으로 비동기 출력을 사용하는 구성 요소는 다른 구성 요소와 달리 입력 버퍼와 출력 버퍼를 모두 받습니다.Finally, unlike other components, components with asynchronous outputs receive both an input buffer and an output buffer.

버퍼 이해Understanding the Buffers

구성 요소에서는 ProcessInput이 실행될 때 입력 버퍼를 받습니다.The input buffer is received by the component during ProcessInput. 이 버퍼는 업스트림 구성 요소에서 버퍼에 추가한 행을 포함합니다.This buffer contains the rows added to the buffer by upstream components. 또한 업스트림 구성 요소의 출력에서 제공되었지만 비동기 구성 요소의 입력 컬렉션에 추가되지는 않은 열과 구성 요소의 입력 열도 포함합니다.The buffer also contains the columns of the component's input, in addition to the columns that were provided in the output of an upstream component but were not added to the asynchronous component's input collection.

PrimeOutput에서 구성 요소에 제공되는 출력 버퍼는 처음에는 행을 포함하지 않습니다.The output buffer, which is provided to the component in PrimeOutput, does not initially contain rows. 구성 요소는 이 버퍼에 행을 추가하고, 버퍼가 가득 차면 다운스트림 구성 요소에 버퍼를 제공합니다.The component adds rows to this buffer and provides the buffer to downstream components when it is full. 출력 버퍼는 다른 다운스트림 구성 요소에서 자체 출력에 추가한 모든 열과 구성 요소의 출력 열 컬렉션에 정의된 열을 포함합니다.The output buffer contains the columns defined in the component's output column collection, in addition to any columns that other downstream components have added to their outputs.

이와 달리 동기 출력을 사용하는 구성 요소에서는 단일 공유 버퍼를 받습니다.This is different behavior from that of components with synchronous outputs, which receive a single shared buffer. 동기 출력을 사용하는 구성 요소의 공유 버퍼는 업스트림 및 다운스트림 구성 요소의 출력에 추가된 열과 구성 요소의 입력 및 출력 열을 모두 포함합니다.The shared buffer of a component with synchronous outputs contains both the input and output columns of the component, in addition to columns added to the outputs of upstream and downstream components.

행 처리Processing Rows

입력 행 캐시Caching Input Rows

비동기 출력을 사용하는 구성 요소를 작성할 때 출력 버퍼에 행을 추가하는 세 가지 방법 중 하나를 선택할 수 있습니다.When you write a component with asynchronous outputs, you have three options for adding rows to the output buffer. 즉, 입력 행을 받을 때 행을 추가하거나, 구성 요소가 업스트림 구성 요소에서 모든 행을 받을 때까지 행을 캐시하거나, 구성 요소에 적절한 시기에 행을 추가할 수 있습니다.You can add them as input rows are received, you can cache them until the component has received all the rows from the upstream component, or you can add them when it is appropriate to do so for the component. 구성 요소의 요구 사항에 따라 다른 방법을 선택합니다.The method that you choose depends on the requirements of the component. 예를 들어 정렬 구성 요소의 경우에는 업스트림 행을 모두 받은 후에야 행을 정렬할 수 있습니다.For example, the Sort component requires that all the upstream rows be received before they can be sorted. 따라서 모든 행을 읽을 때까지 기다렸다가 출력 버퍼에 행을 추가해야 합니다.Therefore, it waits until all rows have been read before adding rows to the output buffer.

입력 버퍼에서 받은 행은 구성 요소에서 처리할 준비가 될 때까지 내부에 캐시되어야 합니다.The rows that are received in the input buffer must be cached internally by the component until it is ready to process them. 들어오는 버퍼 행은 데이터 테이블, 다차원 배열 또는 그 밖의 내부 구조에 캐시할 수 있습니다.The incoming buffer rows can be cached in a data table, a multidimensional array, or any other internal structure.

출력 행 추가Adding Output Rows

행을 받을 때 출력 버퍼에 행을 추가하든 모든 행을 받은 후에 출력 버퍼에 행을 추가하든 항상 출력 버퍼에서 AddRow 메서드를 호출해야 합니다.Whether you add rows to the output buffer as they are received or after receiving all of the rows, you do so by calling the AddRow method on the output buffer. 행을 추가한 후에는 새 행에서 각 열의 값을 설정합니다.After you have added the row, you set the values of each column in the new row.

출력 버퍼에는 구성 요소의 출력 열 컬렉션에 있는 것보다 많은 열이 있는 경우가 종종 있으므로 열 값을 설정하려면 먼저 버퍼에서 적절한 열의 인덱스를 찾아야 합니다.Because there are sometimes more columns in the output buffer than in the output column collection of the component, you must locate the index of the appropriate column in the buffer before you can set its value. FindColumnByLineageID 속성의 BufferManager 메서드는 버퍼 행에서 지정된 계보 ID를 갖는 열 인덱스를 반환하며, 이 인덱스는 버퍼 열에 값을 할당하는 데 사용됩니다.The FindColumnByLineageID method of the BufferManager property returns the index of the column in the buffer row with the specified lineage ID, which is then used to assign the value to the buffer column.

PreExecute 메서드는 PrimeOutput 메서드나 ProcessInput 메서드보다 먼저 호출되므로 BufferManager 속성을 사용할 수 있으면서 입력 및 출력 버퍼에서 열 인덱스를 찾을 수 있는 첫 번째 메서드는 이 메서드입니다.The PreExecute method, which is called before the PrimeOutput method or the ProcessInput method, is the first method where the BufferManager property is available, and the first opportunity to locate the indexes of the columns in the input and output buffers.

예제Sample

다음 예제에서는 비동기 출력을 사용하며 행을 받을 때 출력 버퍼에 행을 추가하는 간단한 변환 구성 요소를 보여 줍니다.The following sample shows a simple transformation component with asynchronous outputs that adds rows to the output buffer as they are received. 이 예제는 이 항목에 설명된 메서드 및 기능의 일부를 보여 줍니다.This sample does not demonstrate all the methods and functionality discussed in this topic. 여기에서는 비동기 출력을 사용하는 모든 사용자 지정 변환 구성 요소에서 재정의해야 하는 중요한 메서드를 보여 주지만 디자인 타임 유효성 검사를 위한 코드는 포함하지 않습니다.It demonstrates the important methods that every custom transformation component with asynchronous outputs must override, but does not contain code for design-time validation. 또한 ProcessInput의 코드에서는 출력 열 컬렉션에 입력 열 컬렉션의 각 열에 대한 하나씩의 열이 있는 것으로 가정합니다.Also, the code in ProcessInput assumes that the output column collection has one column for each column in the input column collection.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime.Wrapper;  

namespace Microsoft.Samples.SqlServer.Dts  
{  
   [DtsPipelineComponent(DisplayName = "AsynchronousOutput")]  
   public class AsynchronousOutput : PipelineComponent  
   {  
      PipelineBuffer outputBuffer;  
      int[] inputColumnBufferIndexes;  
      int[] outputColumnBufferIndexes;  

      public override void ProvideComponentProperties()  
      {  
         // Let the base class add the input and output objects.  
         base.ProvideComponentProperties();  

         // Name the input and output, and make the  
         // output asynchronous.  
         ComponentMetaData.InputCollection[0].Name = "Input";  
         ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";  
         ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;  
      }  
      public override void PreExecute()  
      {  
         IDTSInput100 input = ComponentMetaData.InputCollection[0];  
         IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  

         inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];  
         outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];  

         for (int col = 0; col < input.InputColumnCollection.Count; col++)  
            inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);  

         for (int col = 0; col < output.OutputColumnCollection.Count; col++)  
            outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);  

      }  

      public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)  
      {  
         if (buffers.Length != 0)  
            outputBuffer = buffers[0];  
      }  
      public override void ProcessInput(int inputID, PipelineBuffer buffer)  
      {  
            // Advance the buffer to the next row.  
            while (buffer.NextRow())  
            {  
               // Add a row to the output buffer.  
               outputBuffer.AddRow();  
               for (int x = 0; x < inputColumnBufferIndexes.Length; x++)  
               {  
                  // Copy the data from the input buffer column to the output buffer column.  
                  outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];  
               }  
            }  
         if (buffer.EndOfRowset)  
         {  
            // EndOfRowset on the input buffer is true.  
            // Set EndOfRowset on the output buffer.  
            outputBuffer.SetEndOfRowset();  
         }  
      }  
   }  
}  
Imports System  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  

Namespace Microsoft.Samples.SqlServer.Dts  

    <DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _  
    Public Class AsynchronousOutput  

        Inherits PipelineComponent  

        Private outputBuffer As PipelineBuffer  
        Private inputColumnBufferIndexes As Integer()  
        Private outputColumnBufferIndexes As Integer()  

        Public Overrides Sub ProvideComponentProperties()  

            ' Let the base class add the input and output objects.  
            Me.ProvideComponentProperties()  

            ' Name the input and output, and make the  
            ' output asynchronous.  
            ComponentMetaData.InputCollection(0).Name = "Input"  
            ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"  
            ComponentMetaData.OutputCollection(0).SynchronousInputID = 0  
        End Sub  

        Public Overrides Sub PreExecute()  

            Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
            Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  

            ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)  
            ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)  

            For col As Integer = 0 To input.InputColumnCollection.Count  
                inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)  
            Next  

            For col As Integer = 0 To output.OutputColumnCollection.Count  
                outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)  
            Next  

        End Sub  
        Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())  

            If buffers.Length <> 0 Then  
                outputBuffer = buffers(0)  
            End If  

        End Sub  

        Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  

                ' Advance the buffer to the next row.  
                While (buffer.NextRow())  

                    ' Add a row to the output buffer.  
                    outputBuffer.AddRow()  
                    For x As Integer = 0 To inputColumnBufferIndexes.Length  

                        ' Copy the data from the input buffer column to the output buffer column.  
                        outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))  

                    Next  
                End While  

            If buffer.EndOfRowset = True Then  
                ' EndOfRowset on the input buffer is true.  
                ' Set the end of row set on the output buffer.  
                outputBuffer.SetEndOfRowset()  
            End If  
        End Sub  
    End Class  
End Namespace  

참고 항목See Also

동기 출력을 사용 하는 사용자 지정 변환 구성 요소 개발 Developing a Custom Transformation Component with Synchronous Outputs
동기 및 비동기 변환 이해 Understanding Synchronous and Asynchronous Transformations
스크립트 구성 요소를 사용 하 여 비동기 변환 만들기Creating an Asynchronous Transformation with the Script Component