Поделиться через


Разработка пользовательского компонента преобразования с асинхронными выходами

Если преобразование не может вывести строки до того, как компонент получит все входные строки, или может создавать несколько выходных строк на одну входную, нужно использовать компонент с асинхронным выходом. Например, преобразование «Статистическая обработка» не может вычислить суммарное значение для всех строк, пока не считает все строки. Напротив, компонент с синхронными выходами можно использовать в случае, если каждая строка преобразуется, проходя через этот компонент. Можно изменять данные каждой строки на месте или добавлять к строке один или несколько новых столбцов, каждый из которых содержит значение для каждого входного ряда. Дополнительные сведения о различиях между синхронными и асинхронными выходами см. в разделе Основные сведения о синхронных и асинхронных преобразованиях.

Компоненты преобразования с асинхронными выходами уникальны, поскольку выступают одновременно в роли компонента-источника и целевого компонента. Такой компонент получает строки от вышестоящих компонентов и добавляет строки, получаемые нижестоящими компонентами. Никакой другой компонент потока данных не выполняет обе операции сразу.

Столбцы, полученные от вышестоящих компонентов и доступные компоненту с синхронными выходами, автоматически становятся доступны компонентам, нижестоящим по отношению к данному компоненту. Поэтому компоненту с синхронными выходами не нужно определять выходные столбцы, чтобы обеспечить следующему компоненту столбцы и строки. С другой стороны, компоненту с асинхронными выходами нужно определять выходные столбцы и предоставлять строки нижестоящим компонентам. Следовательно, компоненту с асинхронными выходами приходится выполнять больше задач, как во время разработки, так и во время выполнения, а разработчику этого компонента приходится писать больше кода.

Службы SQL Server Integration Services содержат несколько преобразований с асинхронными выходами. Например, преобразование «Сортировка» требует получения всех строк до того, как их можно будет сортировать, и использует для этого асинхронные выходы. После получения всех строк преобразование сортирует их и добавляет к выходу.

В данном разделе приводится подробное описание разработок преобразований с асинхронными выходами. Полный образец компонента преобразования с асинхронными выходами см. в образцах служб Integration Services в разделе Codeplex. Дополнительные сведения о разработке компонентов-источников см. в разделе Разработка пользовательского компонента источника.

Время разработки

Создание компонента

Свойство SynchronousInputID объекта IDTSOutput100 определяет, является ли его выход синхронным или асинхронным. Для создания асинхронного выхода добавьте выход к компоненту и задайте для свойства SynchronousInputID значение 0. Значение этого свойства определяет также, выделяет ли задача потока данных объекты PipelineBuffer для входа и выхода данного компонента или же выделяется один буфер, используемый обоими объектами сразу.

Далее приводится образец кода, создающий асинхронный выход в реализации метода ProvideComponentProperties.

using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;


namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]
    public class AsyncComponent : PipelineComponent
    {
        public override void ProvideComponentProperties()
        {
            // Call the base class, which adds a synchronous input
            // and output.
            base.ProvideComponentProperties();

            // Make the output asynchronous.
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
            output.SynchronousInputID = 0;
        }
    }
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime

<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _
Public Class AsyncComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Call the base class, which adds a synchronous input
        ' and output.
        Me.ProvideComponentProperties()

        ' Make the output asynchronous.
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
        output.SynchronousInputID = 0

    End Sub


End Class

Создание и настройка выходных столбцов

Как уже упоминалось, асинхронный компонент добавляет столбцы к своей коллекции выходных столбцов, чтобы предоставить столбцы нижележащим компонентам. Существует несколько методов времени разработки, которые можно использовать в зависимости от потребностей компонента. Например, для передачи всех столбцов из вышестоящих компонентов нижестоящим нужно переопределить метод OnInputPathAttached для добавления столбцов, поскольку это первый метод, в котором входные столбцы становятся доступны компоненту.

Если компонент создает выходные столбцы на основе столбцов, выбранных для входа, нужно переопределить метод SetUsageType, чтобы он выбирал выходные столбцы и указывал, как они должны использоваться.

Если компонент с асинхронными выходами создает выходные столбцы на основе столбцов, полученных от вышестоящих компонентов, и все доступные вышестоящие столбцы изменяются, компоненту следует изменять свою собственную коллекцию выходных столбцов. Эти изменения должны обнаруживаться компонентом во время работы метода Validate и исправляться при вызове метода ReinitializeMetaData.

ПримечаниеПримечание

Если выходной столбец удалить из коллекции выходных столбцов, это негативно повлияет на использующие его нижележащие компоненты потока данных. Выходной столбец нужно исправить, а не удалять и не создавать заново, чтобы не нарушить работы нижестоящих компонентов. Например, если тип данных столбца изменился, нужно обновить тип данных.

В следующем примере кода показан компонент, добавляющий столбец в коллекцию выходных столбцов, по одному на каждый столбец, получаемый от вышестоящего компонента.

public override void OnInputPathAttached(int inputID)
{
   IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);
   IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
   IDTSVirtualInput100 vInput = input.GetVirtualInput();

   foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)
   {
      IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();
      outCol.Name = vCol.Name;
      outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);
   }
}
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)
    Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()

    For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection

        Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()
        outCol.Name = vCol.Name
        outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)

    Next
End Sub

Время выполнения

Компоненты с асинхронным выходом также исполняют методы в последовательности, отличной от других типов компонентов, во время выполнения. Во-первых, это единственные компоненты, в которых вызываются оба метода: PrimeOutput и ProcessInput. Компоненты с асинхронным выходом также требуют доступа ко всем входящим строкам, прежде чем смогут начать обработку; поэтому они должны сохранять входные строки в своем внутреннем кэше, пока все строки не будут прочитаны. И наконец, в отличие от других компонентов, компоненты с асинхронным выходом получают и входной, и выходной буфер.

Основные сведения о буферах

Компонент получает входной буфер при вызове метода ProcessInput. Этот буфер содержит строки, добавленные туда вышестоящими компонентами. Буфер также содержит столбцы входа компонента, помимо столбцов, предоставленных в качестве выхода вышестоящего компонента, но не добавленных к входной коллекции асинхронного компонента.

Выходной буфер, предоставленный компоненту в методе PrimeOutput, вначале не содержит строк. Компонент добавляет строки к этому буферу и, когда буфер заполнится, предоставляет его нижестоящим компонентам. Выходной буфер содержит столбцы, определенные в коллекции выходных столбцов компонента, помимо столбцов, добавленных к выводам других нижестоящих компонентов.

Не так ведут себя компоненты с синхронными выходами — они получают единый буфер с общим доступом. Общий буфер компонента с синхронными выходами содержит и входные, и выходные столбцы компонента, помимо столбцов, добавленных к выходам вышестоящих и нижестоящих компонентов.

Обработка строк

Кэширование входных строк

При создании компонента с асинхронными выходами есть три способа добавления строк к выходному буферу. Можно добавлять их по мере получения входных строк, кэшировать их, пока компонент не получил все строки от вышестоящего компонента, или добавлять их в какой-то другой момент, подходящий для данного конкретного компонента. Выбор метода зависит от потребностей данного компонента. Например, компонент сортировки не может провести сортировку, пока не получит все строки от вышестоящих компонентов. Поэтому он ждет, пока все строки будут прочитаны, прежде чем добавить их в выходной буфер.

Строки, полученные во входном буфере, должны сохраняться во внутреннем кэше компонента, пока тот не будет готов их обработать. Получаемые из буфера строки могут кэшироваться в таблице данных, многомерном массиве или иной внутренней структуре. Пример компонента, сохраняющего входные строки буфера во внутреннем кэше, пока все они не будут прочитаны, см. в разделе Readme_Remove Duplicates Component Sample.

Добавление выходных строк

Независимо от того, в какой момент строки добавляются к выходному буферу — по мере получения или после получения всех строк, — добавление производится вызовом метода AddRow выходного буфера. Добавив строку, нужно задать значения всех столбцов новой строки.

Поскольку в выходном буфере иногда бывает больше столбцов, чем в коллекции выходных столбцов компонента, до присвоения значения очередному столбцу буфера нужно узнать индекс этого столбца. Метод FindColumnByLineageID свойства BufferManager возвращает индекс столбца буферной строки с соответствующим идентификатором журнала преобразований. Этот индекс затем используется для присвоения значения столбцу в буфере.

Метод PreExecute, вызываемый до методов PrimeOutput и ProcessInput, — первый метод, в котором доступно свойство BufferManager, и предоставляет первую возможность определить индексы столбцов во входном и выходном буферах.

Образец

В следующем образце демонстрируется простой компонент преобразования с асинхронными выходами, добавляющий строки к выходному буферу по мере их получения. В этом образце показаны не все методы и возможности, описанные в данном разделе. Он демонстрирует важные методы, с помощью которых должен переопределяться любой создаваемый пользователем компонент преобразования с асинхронными выходами, но не содержит кода для проверки во время разработки. Кроме того, код метода ProcessInput предполагает, что коллекция выходных столбцов содержит по одному столбцу на каждый столбец коллекции входных столбцов. Полный образец компонента преобразования с асинхронными выходами см. в разделе Readme_Remove Duplicates Component Sample.

using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace Microsoft.Samples.SqlServer.Dts
{
   [DtsPipelineComponent(DisplayName = "AsynchronousOutput")]
   public class AsynchronousOutput : PipelineComponent
   {
      PipelineBuffer outputBuffer;
      int[] inputColumnBufferIndexes;
      int[] outputColumnBufferIndexes;

      public override void ProvideComponentProperties()
      {
         // Let the base class add the input and output objects.
         base.ProvideComponentProperties();

         // Name the input and output, and make the
         // output asynchronous.
         ComponentMetaData.InputCollection[0].Name = "Input";
         ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";
         ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;
      }
      public override void PreExecute()
      {
         IDTSInput100 input = ComponentMetaData.InputCollection[0];
         IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

         inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];
         outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];

         for (int col = 0; col < input.InputColumnCollection.Count; col++)
            inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);

         for (int col = 0; col < output.OutputColumnCollection.Count; col++)
            outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);

      }

      public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)
      {
         if (buffers.Length != 0)
            outputBuffer = buffers[0];
      }
      public override void ProcessInput(int inputID, PipelineBuffer buffer)
      {
            // Advance the buffer to the next row.
            while (buffer.NextRow())
            {
               // Add a row to the output buffer.
               outputBuffer.AddRow();
               for (int x = 0; x < inputColumnBufferIndexes.Length; x++)
               {
                  // Copy the data from the input buffer column to the output buffer column.
                  outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];
               }
            }
         if (buffer.EndOfRowset)
         {
            // EndOfRowset on the input buffer is true.
            // Set EndOfRowset on the output buffer.
            outputBuffer.SetEndOfRowset();
         }
      }
   }
}
Imports System
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper

Namespace Microsoft.Samples.SqlServer.Dts

    <DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _
    Public Class AsynchronousOutput


        Inherits PipelineComponent

        Private outputBuffer As PipelineBuffer
        Private inputColumnBufferIndexes As Integer()
        Private outputColumnBufferIndexes As Integer()

        Public Overrides Sub ProvideComponentProperties()

            ' Let the base class add the input and output objects.
            Me.ProvideComponentProperties()

            ' Name the input and output, and make the
            ' output asynchronous.
            ComponentMetaData.InputCollection(0).Name = "Input"
            ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"
            ComponentMetaData.OutputCollection(0).SynchronousInputID = 0
        End Sub

        Public Overrides Sub PreExecute()

            Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
            Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)

            ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)
            ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)

            For col As Integer = 0 To input.InputColumnCollection.Count
                inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)
            Next

            For col As Integer = 0 To output.OutputColumnCollection.Count
                outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)
            Next

        End Sub
        Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())

            If buffers.Length <> 0 Then
                outputBuffer = buffers(0)
            End If

        End Sub

        Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)

                ' Advance the buffer to the next row.
                While (buffer.NextRow())

                    ' Add a row to the output buffer.
                    outputBuffer.AddRow()
                    For x As Integer = 0 To inputColumnBufferIndexes.Length

                        ' Copy the data from the input buffer column to the output buffer column.
                        outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))

                    Next
                End While

            If buffer.EndOfRowset = True Then
                ' EndOfRowset on the input buffer is true.
                ' Set the end of row set on the output buffer.
                outputBuffer.SetEndOfRowset()
            End If
        End Sub
    End Class
End Namespace
Значок служб Integration Services (маленький) Будьте в курсе новых возможностей служб Integration Services

Чтобы загружать новейшую документацию, статьи, образцы и видеоматериалы от корпорации Майкрософт, а также лучшие решения от участников сообщества, посетите страницу служб Integration Services на сайтах MSDN или TechNet:

Чтобы получать автоматические уведомления об этих обновлениях, подпишитесь на RSS-каналы, предлагаемые на этой странице.