Sviluppo di un componente di trasformazione personalizzato con output asincroniDeveloping a Custom Transformation Component with Asynchronous Outputs

Utilizzare un componente con output asincroni quando una trasformazione non può inviare righe all'output prima che il componente non abbia ricevuto tutte le righe di input oppure quando la trasformazione non produce esattamente una riga di output per ogni riga ricevuta come input.You use a component with asynchronous outputs when a transform cannot output rows until the component has received all its input rows, or when the transformation does not produce exactly one output row for each row received as input. La trasformazione Aggregazione, ad esempio, non può calcolare una somma tra righe prima di aver letto tutte le righe.The Aggregate transformation, for example, cannot calculate a sum across rows until it has read all the rows. Viceversa, è possibile utilizzare un componente con output sincroni quando si modifica ogni riga di dati passata.In contrast, you can use a component with synchronous outputs any time when you modify each row of data as it passes through. È possibile modificare i dati per ogni riga sul posto oppure creare una o più nuove colonne aggiuntive, ognuna delle quali con un valore per ogni riga di input.You can modify the data for each row in place, or you can create one or more new columns, each of which has a value for every one of the input rows. Per ulteriori informazioni sulla differenza tra componenti sincroni e asincroni, vedere comprensione sincrona e asincrona trasformazioni.For more information about the difference between synchronous and asynchronous components, see Understanding Synchronous and Asynchronous Transformations.

I componenti di trasformazione con output asincroni sono univoci, perché agiscono contemporaneamente da componenti di destinazione e di origine.Transformation components with asynchronous outputs are unique because they act as both destination and source components. Questo tipo di componente riceve righe dai componenti a monte e aggiunge righe che vengono utilizzate dai componenti a valle.This kind of component receives rows from upstream components, and adds rows that are consumed by downstream components. Nessun altro componente del flusso di dati esegue entrambe queste operazioni.No other data flow component performs both of these operations.

Le colonne dei componenti a monte che sono disponibili per un componente con output sincroni sono automaticamente disponibili per i componenti a valle del componente.The columns from upstream components that are available to a component with synchronous outputs are automatically available to components downstream from the component. Pertanto, un componente con output sincroni non deve necessariamente definire colonne di output per fornire colonne e righe al componente successivo.Therefore, a component with synchronous outputs does not have to define any output columns to provide columns and rows to the next component. I componenti con output asincroni, al contrario, devono definire colonne di output e fornire righe ai componenti a valle.Components with asynchronous outputs, on the other hand, must define output columns and provide rows to downstream components. Pertanto, un componente con output asincroni deve eseguire più attività durante la fase di progettazione e di esecuzione, mentre lo sviluppatore deve implementare più codice.Therefore a component with asynchronous outputs has more tasks to perform during both design and execution time, and the component developer has more code to implement.

SQL ServerSQL Server Integration ServicesIntegration Services sono incluse diverse trasformazioni con output asincroni. Integration ServicesIntegration Services contains several transformations with asynchronous outputs. La trasformazione Ordinamento, ad esempio, richiede che tutte le righe siano presenti al fine di poter essere ordinate e ottiene tale risultato tramite output asincroni.For example, the Sort transformation requires all its rows before it can sort them, and achieves this by using asynchronous outputs. Dopo aver ricevuto tutte le righe, la trasformazione le ordina e le aggiunge all'output.After it has received all its rows, it sorts them and adds them to its output.

In questa sezione viene illustrato in dettaglio come sviluppare trasformazioni con output asincroni.This section explains in detail how to develop transformations with asynchronous outputs. Per ulteriori informazioni sullo sviluppo di componenti di origine, vedere lo sviluppo di un componente di origine personalizzato.For more information about source component development, see Developing a Custom Source Component.

Fase di progettazioneDesign Time

Creazione del componenteCreating the Component

La proprietà SynchronousInputID sull'oggetto IDTSOutput100 identifica se un output è sincrono o asincrono.The SynchronousInputID property on the IDTSOutput100 object identifies whether an output is synchronous or asynchronous. Per creare un output asincrono, aggiungere l'output al componente e impostare SynchronousInputID su zero.To create an asynchronous output, add the output to the component and set the SynchronousInputID to zero. L'impostazione di questa proprietà determina anche se l'attività Flusso di dati alloca oggetti PipelineBuffer sia per l'input che per l'output del componente o se viene allocato un singolo buffer condiviso tra i due oggetti.Setting this property also determines whether the data flow task allocates PipelineBuffer objects for both the input and output of the component, or whether a single buffer is allocated and shared between the two objects.

Nel codice di esempio seguente è illustrato un componente che crea un output asincrono nell'implementazione di ProvideComponentProperties.The following sample code shows a component that creates an asynchronous output in its ProvideComponentProperties implementation.

using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  

namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "AsyncComponent",ComponentType = ComponentType.Transform)]  
    public class AsyncComponent : PipelineComponent  
    {  
        public override void ProvideComponentProperties()  
        {  
            // Call the base class, which adds a synchronous input  
            // and output.  
            base.ProvideComponentProperties();  

            // Make the output asynchronous.  
            IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
            output.SynchronousInputID = 0;  
        }  
    }  
}  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  

<DtsPipelineComponent(DisplayName:="AsyncComponent", ComponentType:=ComponentType.Transform)> _  
Public Class AsyncComponent  
    Inherits PipelineComponent  

    Public Overrides Sub ProvideComponentProperties()  

        ' Call the base class, which adds a synchronous input  
        ' and output.  
        Me.ProvideComponentProperties()  

        ' Make the output asynchronous.  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
        output.SynchronousInputID = 0  

    End Sub  

End Class  

Creazione e configurazione di colonne di outputCreating and Configuring Output Columns

Come accennato in precedenza, un componente asincrono aggiunge colonne alla propria raccolta di colonne di output per fornire colonne ai componenti a valle.As mentioned earlier, an asynchronous component adds columns to its output column collection to provide columns to downstream components. Sono disponibili diversi metodi della fase di progettazione tra cui scegliere, a seconda dei requisiti del componente.There are several design-time methods to choose from, depending on the needs of the component. Se ad esempio si desidera passare tutte le colonne dai componenti a monte ai componenti a valle, è necessario eseguire l'override del metodo OnInputPathAttached per aggiungere le colonne, perché si tratta del primo metodo in cui le colonne di input sono disponibili per il componente.For example, if you want to pass all the columns from the upstream components to the downstream components, you would override the OnInputPathAttached method to add the columns, because this is the first method in which the input columns are available to the component.

Se il componente crea colonne di output in base alle colonne selezionate per il proprio input, eseguire l'override del metodo SetUsageType per selezionare le colonne di output e indicare come verranno utilizzate.If the component creates output columns based on the columns selected for its input, override the SetUsageType method to select the output columns and to indicate how they will be used.

Se un componente con output asincroni crea colonne di output in base alle colonne dei componenti a monte e le colonne disponibili a valle cambiano, il componente deve aggiornare la propria raccolta di colonne di output.If a component with asynchronous outputs creates output columns based on the columns from upstream components, and the available upstream columns change, the component should update its output column collection. Queste modifiche devono essere rilevate dal componente durante Validate e corrette durante ReinitializeMetaData.These changes should be detected by the component during Validate, and fixed during ReinitializeMetaData.

Nota

Quando una colonna di output viene rimossa dalla raccolta di colonne di output, i componenti a valle del flusso di dati che fanno riferimento a tale colonna possono essere influenzati negativamente.When an output column is removed from the output column collection, downstream components in the data flow that reference the column are adversely affected. È necessario correggere la colonna di output senza rimuoverla e ricrearla per evitare l'interruzione dei componenti a valle.The output column must be repaired without removing and recreating the column to prevent breaking the downstream components. Se ad esempio il tipo di dati della colonna è stato modificato, è necessario aggiornarlo.For example, if the data type of the column has changed, you must update the data type.

Nell'esempio di codice seguente è illustrato un componente che aggiunge una colonna di output alla propria raccolta di colonne di output per ogni colonna disponibile del componente a monte.The following code example shows a component that adds an output column to its output column collection for each column available from the upstream component.

public override void OnInputPathAttached(int inputID)  
{  
   IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);  
   IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  
   IDTSVirtualInput100 vInput = input.GetVirtualInput();  

   foreach (IDTSVirtualInputColumn100 vCol in vInput.VirtualInputColumnCollection)  
   {  
      IDTSOutputColumn100 outCol = output.OutputColumnCollection.New();  
      outCol.Name = vCol.Name;  
      outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage);  
   }  
}  
Public Overrides Sub OnInputPathAttached(ByVal inputID As Integer)  

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)  
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  
    Dim vInput As IDTSVirtualInput100 = input.GetVirtualInput()  

    For Each vCol As IDTSVirtualInputColumn100 In vInput.VirtualInputColumnCollection  

        Dim outCol As IDTSOutputColumn100 = output.OutputColumnCollection.New()  
        outCol.Name = vCol.Name  
        outCol.SetDataTypeProperties(vCol.DataType, vCol.Length, vCol.Precision, vCol.Scale, vCol.CodePage)  

    Next  
End Sub  

Fase di esecuzioneRun Time

I componenti con output asincroni eseguono anche una sequenza di metodi della fase di esecuzione diversa rispetto agli altri tipi di componenti.Components with asynchronous outputs also execute a different sequence of methods at run time than other types of components. Innanzitutto, si tratta degli unici componenti che ricevono una chiamata ai metodi PrimeOutput e ProcessInput.First, they are the only components that receive a call to both the PrimeOutput and the ProcessInput methods. I componenti con output asincroni richiedono inoltre l'accesso a tutte le righe in ingresso prima che possano iniziare l'elaborazione. Pertanto, devono memorizzare le righe di input nella cache interna finché non sono state lette tutte le righe.Components with asynchronous outputs also require access to all the incoming rows before they can start processing; therefore, they must cache the input rows internally until all rows have been read. Infine, a differenza di altri componenti, i componenti con output asincroni ricevono sia un buffer di input che un buffer di output.Finally, unlike other components, components with asynchronous outputs receive both an input buffer and an output buffer.

Informazioni sui bufferUnderstanding the Buffers

Il componente riceve il buffer di input durante ProcessInput.The input buffer is received by the component during ProcessInput. Questo buffer contiene le righe aggiunte dai componenti a monte.This buffer contains the rows added to the buffer by upstream components. Contiene inoltre le colonne dell'input del componente, oltre alle colonne fornite nell'output di un componente a monte ma che non sono state aggiunte alla raccolta di input del componente asincrono.The buffer also contains the columns of the component's input, in addition to the columns that were provided in the output of an upstream component but were not added to the asynchronous component's input collection.

Il buffer di output, fornito al componente in PrimeOutput, inizialmente non contiene righe.The output buffer, which is provided to the component in PrimeOutput, does not initially contain rows. Il componente aggiunge righe a questo buffer e lo fornisce ai componenti a valle quando è pieno.The component adds rows to this buffer and provides the buffer to downstream components when it is full. Il buffer di output contiene le colonne definite nella raccolta di colonne di output del componente, oltre alle eventuali colonne aggiunte da altri componenti a valle ai propri output.The output buffer contains the columns defined in the component's output column collection, in addition to any columns that other downstream components have added to their outputs.

Questo comportamento è diverso da quello dei componenti con output sincroni, che ricevono un singolo buffer condiviso.This is different behavior from that of components with synchronous outputs, which receive a single shared buffer. Il buffer condiviso di un componente con output sincroni contiene sia le colonne di input che di output del componente, oltre alle colonne aggiunte agli output dei componenti a monte e a valle.The shared buffer of a component with synchronous outputs contains both the input and output columns of the component, in addition to columns added to the outputs of upstream and downstream components.

Elaborazione di righeProcessing Rows

Memorizzazione nella cache di righe di inputCaching Input Rows

Quando si scrive un componente con output asincroni, è possibile scegliere tra tre diversi modi per aggiungere righe al buffer di output.When you write a component with asynchronous outputs, you have three options for adding rows to the output buffer. È possibile aggiungerle non appena vengono ricevute righe di input, memorizzarle nella cache finché il componente non ha ricevuto tutte le righe dal componente a monte oppure aggiungerle quando è appropriato per il componente.You can add them as input rows are received, you can cache them until the component has received all the rows from the upstream component, or you can add them when it is appropriate to do so for the component. Il metodo scelto dipende dai requisiti del componente.The method that you choose depends on the requirements of the component. Ad esempio, il componente Ordinamento richiede che tutte le righe a monte vengano ricevute prima che sia possibile ordinarle.For example, the Sort component requires that all the upstream rows be received before they can be sorted. Pertanto, attende che vengano lette tutte le righe prima di aggiungere righe al buffer di output.Therefore, it waits until all rows have been read before adding rows to the output buffer.

Il componente deve memorizzare nella cache interna le righe ricevute nel buffer di input finché non è pronto a elaborarle.The rows that are received in the input buffer must be cached internally by the component until it is ready to process them. Le righe del buffer in ingresso possono essere memorizzate nella cache in una tabella di dati, una matrice multidimensionale o qualsiasi altra struttura interna.The incoming buffer rows can be cached in a data table, a multidimensional array, or any other internal structure.

Aggiunta di righe di outputAdding Output Rows

Sia che le righe vengano aggiunte al buffer di output non appena vengono ricevute o dopo la ricezione di tutte, questa operazione viene eseguita chiamando il metodo AddRow sul buffer di output.Whether you add rows to the output buffer as they are received or after receiving all of the rows, you do so by calling the AddRow method on the output buffer. Dopo aver aggiunto la nuova riga, impostare i valori di ogni colonna al suo interno.After you have added the row, you set the values of each column in the new row.

Poiché a volte il buffer di output contiene più colonne rispetto alla raccolta di colonne di output del componente, è necessario individuare l'indice della colonna appropriata nel buffer prima di impostarne il valore.Because there are sometimes more columns in the output buffer than in the output column collection of the component, you must locate the index of the appropriate column in the buffer before you can set its value. Il metodo FindColumnByLineageID della proprietà BufferManager restituisce l'indice della colonna nella riga del buffer con l'ID di derivazione specificato, che viene quindi utilizzato per assegnare il valore alla colonna di buffer.The FindColumnByLineageID method of the BufferManager property returns the index of the column in the buffer row with the specified lineage ID, which is then used to assign the value to the buffer column.

Il metodo PreExecute, chiamato prima del metodo PrimeOutput o del metodo ProcessInput, è il primo metodo in cui è disponibile la proprietà BufferManager e rappresenta la prima opportunità per individuare gli indici delle colonne dei buffer di input e di output.The PreExecute method, which is called before the PrimeOutput method or the ProcessInput method, is the first method where the BufferManager property is available, and the first opportunity to locate the indexes of the columns in the input and output buffers.

EsempioSample

Nell'esempio seguente è illustrato un componente di trasformazione semplice con output asincroni che aggiunge righe al buffer di output non appena vengono ricevute.The following sample shows a simple transformation component with asynchronous outputs that adds rows to the output buffer as they are received. Nell'esempio non vengono dimostrati tutti i metodi e le funzionalità descritti nell'argomento.This sample does not demonstrate all the methods and functionality discussed in this topic. Vengono descritti i metodi importanti di cui ogni componente di trasformazione personalizzato con output asincroni deve eseguire l'override, ma non è incluso codice per la convalida in fase di progettazione.It demonstrates the important methods that every custom transformation component with asynchronous outputs must override, but does not contain code for design-time validation. Inoltre, nel codice del metodo ProcessInput si presuppone che nella raccolta di colonne di output sia inclusa un'unica colonna per ogni colonna presente nella raccolta di colonne di input.Also, the code in ProcessInput assumes that the output column collection has one column for each column in the input column collection.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime.Wrapper;  

namespace Microsoft.Samples.SqlServer.Dts  
{  
   [DtsPipelineComponent(DisplayName = "AsynchronousOutput")]  
   public class AsynchronousOutput : PipelineComponent  
   {  
      PipelineBuffer outputBuffer;  
      int[] inputColumnBufferIndexes;  
      int[] outputColumnBufferIndexes;  

      public override void ProvideComponentProperties()  
      {  
         // Let the base class add the input and output objects.  
         base.ProvideComponentProperties();  

         // Name the input and output, and make the  
         // output asynchronous.  
         ComponentMetaData.InputCollection[0].Name = "Input";  
         ComponentMetaData.OutputCollection[0].Name = "AsyncOutput";  
         ComponentMetaData.OutputCollection[0].SynchronousInputID = 0;  
      }  
      public override void PreExecute()  
      {  
         IDTSInput100 input = ComponentMetaData.InputCollection[0];  
         IDTSOutput100 output = ComponentMetaData.OutputCollection[0];  

         inputColumnBufferIndexes = new int[input.InputColumnCollection.Count];  
         outputColumnBufferIndexes = new int[output.OutputColumnCollection.Count];  

         for (int col = 0; col < input.InputColumnCollection.Count; col++)  
            inputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[col].LineageID);  

         for (int col = 0; col < output.OutputColumnCollection.Count; col++)  
            outputColumnBufferIndexes[col] = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection[col].LineageID);  

      }  

      public override void PrimeOutput(int outputs, int[] outputIDs, PipelineBuffer[] buffers)  
      {  
         if (buffers.Length != 0)  
            outputBuffer = buffers[0];  
      }  
      public override void ProcessInput(int inputID, PipelineBuffer buffer)  
      {  
            // Advance the buffer to the next row.  
            while (buffer.NextRow())  
            {  
               // Add a row to the output buffer.  
               outputBuffer.AddRow();  
               for (int x = 0; x < inputColumnBufferIndexes.Length; x++)  
               {  
                  // Copy the data from the input buffer column to the output buffer column.  
                  outputBuffer[outputColumnBufferIndexes[x]] = buffer[inputColumnBufferIndexes[x]];  
               }  
            }  
         if (buffer.EndOfRowset)  
         {  
            // EndOfRowset on the input buffer is true.  
            // Set EndOfRowset on the output buffer.  
            outputBuffer.SetEndOfRowset();  
         }  
      }  
   }  
}  
Imports System  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  

Namespace Microsoft.Samples.SqlServer.Dts  

    <DtsPipelineComponent(DisplayName:="AsynchronousOutput")> _  
    Public Class AsynchronousOutput  

        Inherits PipelineComponent  

        Private outputBuffer As PipelineBuffer  
        Private inputColumnBufferIndexes As Integer()  
        Private outputColumnBufferIndexes As Integer()  

        Public Overrides Sub ProvideComponentProperties()  

            ' Let the base class add the input and output objects.  
            Me.ProvideComponentProperties()  

            ' Name the input and output, and make the  
            ' output asynchronous.  
            ComponentMetaData.InputCollection(0).Name = "Input"  
            ComponentMetaData.OutputCollection(0).Name = "AsyncOutput"  
            ComponentMetaData.OutputCollection(0).SynchronousInputID = 0  
        End Sub  

        Public Overrides Sub PreExecute()  

            Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
            Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)  

            ReDim inputColumnBufferIndexes(input.InputColumnCollection.Count)  
            ReDim outputColumnBufferIndexes(output.OutputColumnCollection.Count)  

            For col As Integer = 0 To input.InputColumnCollection.Count  
                inputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(col).LineageID)  
            Next  

            For col As Integer = 0 To output.OutputColumnCollection.Count  
                outputColumnBufferIndexes(col) = BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(col).LineageID)  
            Next  

        End Sub  
        Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())  

            If buffers.Length <> 0 Then  
                outputBuffer = buffers(0)  
            End If  

        End Sub  

        Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  

                ' Advance the buffer to the next row.  
                While (buffer.NextRow())  

                    ' Add a row to the output buffer.  
                    outputBuffer.AddRow()  
                    For x As Integer = 0 To inputColumnBufferIndexes.Length  

                        ' Copy the data from the input buffer column to the output buffer column.  
                        outputBuffer(outputColumnBufferIndexes(x)) = buffer(inputColumnBufferIndexes(x))  

                    Next  
                End While  

            If buffer.EndOfRowset = True Then  
                ' EndOfRowset on the input buffer is true.  
                ' Set the end of row set on the output buffer.  
                outputBuffer.SetEndOfRowset()  
            End If  
        End Sub  
    End Class  
End Namespace  

Vedere ancheSee Also

Sviluppo di un componente di trasformazione personalizzato con output sincroni Developing a Custom Transformation Component with Synchronous Outputs
Informazioni sulle trasformazioni sincrone e asincrone Understanding Synchronous and Asynchronous Transformations
Creazione di una trasformazione asincrona con il componente ScriptCreating an Asynchronous Transformation with the Script Component