Share via


資料流程元件的執行階段方法

適用於:SQL Server Azure Data Factory 中的 SSIS Integration Runtime

在執行階段,資料流程工作會檢查元件的順序、準備執行計畫,以及管理執行工作計畫的工作者執行緒集區。 此工作會從來源載入資料列、透過轉換處理它們,然後將它們儲存到目的地。

方法執行的順序

在資料流程元件的執行期間,會呼叫 PipelineComponent 基底類別中的方法子集。 方法與呼叫方法的順序永遠都是相同的,但 PrimeOutputProcessInput 方法例外。 會根據元件的 IDTSInput100IDTSOutput100 物件之存在與否及組態來呼叫這兩個方法。

下列清單按照元件執行期間呼叫方法的順序來顯示它們。 請注意,在呼叫 PrimeOutput 時,永遠會在 ProcessInput 的前面呼叫它。

PrimeOutput 方法

當元件至少有一個輸出透過 PrimeOutput 物件附加至下游元件,而且輸出的 IDTSPath100 屬性是零時,便會呼叫 SynchronousInputID 方法。 PrimeOutput 方法是針對來源元件以及具有非同步輸出的轉換所呼叫。 與以下所述的 ProcessInput 方法不同的是,每個需要 PrimeOutput 方法的元件都只會呼叫它一次。

ProcessInput 方法

如果元件至少有一個輸入是由 ProcessInput 物件附加至上游元件,則會呼叫 IDTSPath100 方法。 ProcessInput 方法是針對目的地元件以及具有同步輸出的轉換所呼叫。 會重複呼叫 ProcessInput,直到不再從上游元件處理資料列為止。

處理輸入和輸出

在執行階段,資料流程元件會執行下列工作:

  • 來源元件會加入資料列。

  • 具有同步輸出的轉換元件會接收來源元件所提供的資料列。

  • 具有非同步輸出的轉換元件會接收資料列並加入資料列。

  • 目的地元件會接收資料列,然後將它們載入目的地中。

在執行期間,資料流程工作會配置 PipelineBuffer 物件,這些物件包含定義在元件序列之輸出資料行集合中的所有資料行。 例如,如果資料流程序列中四個元件的每一個都會將一個輸出資料行加入至其輸出資料行集合,則提供給每個元件的緩衝區都會包含四個資料行,其中每個元件的每個輸出資料行都有一個資料行。 因為這個行為的緣故,元件有時會接收包含未使用之資料行的緩衝區。

因為元件所接收的緩衝區可能包含此元件將不會使用的資料行,所以您必須在資料流程工作提供給此元件的緩衝區中,找到要在元件的輸入與輸出資料行集合中使用的資料行。 您可以使用 FindColumnByLineageID 屬性的 BufferManager 方法來完成這項動作。 基於效能的考量,這個工作通常會在 PreExecute 方法期間執行,而不是在 PrimeOutputProcessInput 中執行。

PreExecutePrimeOutput 方法之前,會先呼叫 ProcessInput,而且是在 BufferManager 變成可供元件使用之後,元件第一次有機會執行這個工作。 在這個方法期間,元件應該在緩衝區中找到其資料行,並在內部儲存這項資訊,以便在 PrimeOutputProcessInput 方法中可以使用這些資料行。

下列程式碼範例示範在 PreExecute 期間,具有同步輸出的轉換元件如何在緩衝區中找到其輸入資料行。

private int []bufferColumnIndex;  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
    bufferColumnIndex = new int[input.InputColumnCollection.Count];  
  
    for( int x=0; x < input.InputColumnCollection.Count; x++)  
    {  
        IDTSInputColumn100 column = input.InputColumnCollection[x];  
        bufferColumnIndex[x] = BufferManager.FindColumnByLineageID( input.Buffer, column.LineageID);  
    }  
}  
Dim bufferColumnIndex As Integer()  
  
    Public Overrides Sub PreExecute()  
  
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
        ReDim bufferColumnIndex(input.InputColumnCollection.Count)  
  
        For x As Integer = 0 To input.InputColumnCollection.Count  
  
            Dim column As IDTSInputColumn100 = input.InputColumnCollection(x)  
            bufferColumnIndex(x) = BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID)  
  
        Next  
  
    End Sub  

加入資料列

透過將資料列加入至 PipelineBuffer 物件,元件會提供資料列給下游元件。 資料流程工作會提供輸出緩衝區的陣列,每個連接至下游元件的 IDTSOutput100 物件都有一個輸出緩衝區,以做為 PrimeOutput 方法的參數。 具有非同步輸出的來源元件與轉換元件會將資料列加入至緩衝區,並且會在完成加入資料列時呼叫 SetEndOfRowset 方法。 資料流程工作會管理提供給元件的輸出緩衝區,而當緩衝區變成完整時,便會自動將緩衝區中的資料列移到下一個元件。 每個元件都會呼叫 PrimeOutput 方法一次,不像 ProcessInput 方法一樣重複呼叫。

下列程式碼範例示範在 PrimeOutput 方法期間,元件會如何將資料列加入至其輸出緩衝區,然後呼叫 SetEndOfRowset 方法。

public override void PrimeOutput( int outputs, int []outputIDs,PipelineBuffer []buffers)  
{  
    for( int x=0; x < outputs; x++ )  
    {  
        IDTSOutput100 output = ComponentMetaData.OutputCollection.GetObjectByID( outputIDs[x]);  
        PipelineBuffer buffer = buffers[x];  
  
        // TODO: Add rows to the output buffer.  
    }  
    foreach( PipelineBuffer buffer in buffers )  
    {  
        /// Notify the data flow task that no more rows are coming.  
        buffer.SetEndOfRowset();  
    }  
}  
public overrides sub PrimeOutput( outputs as Integer , outputIDs() as Integer ,buffers() as PipelineBuffer buffers)  
  
    For x As Integer = 0 To outputs.MaxValue  
  
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.GetObjectByID(outputIDs(x))  
        Dim buffer As PipelineBuffer = buffers(x)  
  
        ' TODO: Add rows to the output buffer.  
  
    Next  
  
    For Each buffer As PipelineBuffer In buffers  
  
        ' Notify the data flow task that no more rows are coming.  
        buffer.SetEndOfRowset()  
  
    Next  
  
End Sub  

如需開發將資料列新增至輸出緩衝區之元件的詳細資訊,請參閱開發自訂來源元件開發具有非同步輸出的自訂轉換元件

接收資料列

元件會從 PipelineBuffer 物件中的上游元件接收資料列。 資料流程工作會提供 PipelineBuffer 物件,該物件包含上游元件以 ProcessInput 方法的參數形式加入至資料流程的資料列。 這個輸入緩衝區可用以檢查和修改緩衝區中的資料列與資料行,但是無法用以加入或是移除資料列。 ProcessInput 方法會重複呼叫,直到沒有其他可用的緩衝區為止。 上次呼叫它時,EndOfRowset 屬性是 true。 您可以使用 NextRow 方法,使緩衝區前進到下一個資料列,以逐一查看緩衝區中的資料列集合。 當緩衝區位於集合中的最後一個資料列時,此方法會傳回 false。 除非您在已處理最後一個資料列之後還必須執行其他動作,否則不必檢查 EndOfRowset 屬性。

下列文字將示範使用 NextRow 方法和 EndOfRowset 屬性的正確模式:

while (buffer.NextRow())

{

// Do something with each row.

}

if (buffer.EndOfRowset)

{

// Optionally, do something after all rows have been processed.

}

下列程式碼範例示範元件如何在 ProcessInput 方法期間,處理輸入緩衝區中的資料列。

public override void ProcessInput( int inputID, PipelineBuffer buffer )  
{  
    {  
        IDTSInput100 input = ComponentMetaData.InputCollection.GetObjectByID(inputID);  
        while( buffer.NextRow())  
        {  
            // TODO: Examine the columns in the current row.  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection.GetObjectByID(inputID)  
        While buffer.NextRow() = True  
  
            ' TODO: Examine the columns in the current row.  
        End While  
  
End Sub  

如需開發接收輸入緩衝區中資料列之元件的詳細資訊,請參閱開發自訂目的地元件開發具有同步輸出的自訂轉換元件

另請參閱

資料流程元件的設計階段方法