Développement d'un composant de destination personnalisé

S’applique à :SQL Server SSIS Integration Runtime dans Azure Data Factory

Microsoft SQL Server Integration Services offre aux développeurs la capacité d’écrire des composants de destination personnalisés qui peuvent se connecter à n’importe quelle source de données personnalisée et y stocker des données. Les composants de destination personnalisés sont utiles lorsque vous devez vous connecter à des sources de données qui ne sont pas accessibles via l'un des composants sources existants inclus dans Integration Services.

Les composants de destination possèdent une ou plusieurs entrées et zéro sortie. Au moment de la conception, ils créent et configurent des connexions et lisent les métadonnées des colonnes à partir de la source de données externe. Pendant l'exécution, ils se connectent à leur source de données externe et y ajoutent des lignes provenant de composants situés en amont du flux de données. Si la source de données externe existe avant l'exécution du composant, le composant de destination doit également s'assurer que les types de données des colonnes que le composant reçoit correspondent aux types de données des colonnes au niveau de la source de données externe.

Cette section explique en détail comment développer des composants de destination et fournit des exemples de code pour clarifier des concepts importants. Pour une vue d’ensemble du développement de composants de flux de données, consultez Développement d’un composant de flux de données personnalisé.

Moment de la conception

Pour implémenter les fonctionnalités au moment de la conception d'un composant de destination, vous devez spécifier une connexion à une source de données externe et confirmer que le composant a été correctement configuré. Par définition, un composant de destination possède une entrée et éventuellement une sortie d'erreur.

Création du composant

Les composants de destination se connectent à des sources de données externes à l'aide d'objets ConnectionManager définis dans un package. Le composant de destination indique qu’il a besoin d’un gestionnaire de connexions au concepteur SSIS et aux utilisateurs du composant, en ajoutant un élément à la collection RuntimeConnectionCollection de ComponentMetaData. Cette collection remplit deux rôles : d'abord, elle publie le besoin d'un gestionnaire de connexions auprès du concepteur SSIS ; puis, lorsque l'utilisateur a sélectionné ou créé un gestionnaire de connexions, elle contient une référence au gestionnaire de connexions dans le package utilisé par le composant. Lorsqu’un objet IDTSRuntimeConnection100 est ajouté à la collection, l’Éditeur avancé affiche l’onglet Propriétés de connexion pour inviter l’utilisateur à sélectionner ou créer une connexion dans le package utilisée par le composant.

L'exemple de code suivant affiche une implémentation de ProvideComponentProperties qui ajoute une entrée, puis ajoute un objet IDTSRuntimeConnection100 à RuntimeConnectionCollection.

using System;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
using Microsoft.SqlServer.Dts.Runtime;  
  
namespace Microsoft.Samples.SqlServer.Dts  
{  
    [DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]  
    public class DestinationComponent : PipelineComponent   
    {  
        public override void ProvideComponentProperties()  
        {  
            // Reset the component.  
            base.RemoveAllInputsOutputsAndCustomProperties();  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll();  
  
            IDTSInput100 input = ComponentMetaData.InputCollection.New();  
            input.Name = "Input";  
  
            IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();  
            connection.Name = "ADO.net";  
        }  
    }  
}  
Imports System  
Imports System.Data  
Imports System.Data.SqlClient  
Imports Microsoft.SqlServer.Dts.Pipeline  
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper  
Imports Microsoft.SqlServer.Dts.Runtime  
  
Namespace Microsoft.Samples.SqlServer.Dts  
  
    <DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _  
    Public Class DestinationComponent  
        Inherits PipelineComponent  
  
        Public Overrides Sub ProvideComponentProperties()  
  
            ' Reset the component.  
            Me.RemoveAllInputsOutputsAndCustomProperties()  
            ComponentMetaData.RuntimeConnectionCollection.RemoveAll()  
  
            Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()  
            input.Name = "Input"  
  
            Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()  
            connection.Name = "ADO.net"  
  
        End Sub  
    End Class  
End Namespace  

Connexion à une source de données externe

Une fois la connexion ajoutée à la collection RuntimeConnectionCollection, vous pouvez remplacer la méthode AcquireConnections pour établir une connexion à la source de données externe. Cette méthode est appelée au moment de la conception et au moment de l'exécution. Le composant doit établir une connexion au gestionnaire de connexions spécifié par la connexion au moment de l'exécution, et par la suite, à la source de données externe. Une fois la connexion établie, le composant doit la mettre en cache en interne et la libérer lorsque ReleaseConnections est appelé. Les développeurs substituent cette méthode et libèrent la connexion établie par le composant pendant l'exécution de la méthode AcquireConnections. Les méthodes ReleaseConnections et AcquireConnections sont appelées au moment de la conception et au moment de l'exécution.

L'exemple de code suivant montre un composant qui établit une connexion ADO.NET dans la méthode AcquireConnections, puis ferme la connexion dans ReleaseConnections.

using Microsoft.SqlServer.Dts.Runtime.Wrapper;  
  
private SqlConnection sqlConnection;  
  
public override void AcquireConnections(object transaction)  
{  
    if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)  
    {  
        ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);  
        ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;  
  
        if (cmado == null)  
            throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");  
  
        sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;  
        sqlConnection.Open();  
    }  
}  
  
public override void ReleaseConnections()  
{  
    if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)  
        sqlConnection.Close();  
}  
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper  
  
Private sqlConnection As SqlConnection  
  
Public Overrides Sub AcquireConnections(ByVal transaction As Object)  
  
    If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then  
  
        Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)  
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)  
  
        If IsNothing(cmado) Then  
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")  
        End If  
  
        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)  
        sqlConnection.Open()  
  
    End If  
End Sub  
  
Public Overrides Sub ReleaseConnections()  
  
    If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then  
        sqlConnection.Close()  
    End If  
  
End Sub  

Validation du composant

Les développeurs de composants de destination doivent procéder à une opération de validation décrite dans Validation de composants. De plus, ils doivent vérifier que les propriétés de type de données des colonnes définies dans la collection de colonnes d'entrée du composant correspondent aux colonnes au niveau de la source de données externe. Il est parfois impossible ou déconseillé de vérifier les colonnes d'entrée par rapport à la source de données externe, notamment lorsque le composant ou le concepteur SSIS est déconnecté, ou lorsque les allers-retours au serveur ne sont pas acceptables. Dans ce cas, il est toujours possible de valider les colonnes dans la collection de colonnes d'entrée à l'aide de la propriété ExternalMetadataColumnCollection de l'objet d'entrée.

Cette collection existe sur les objets d'entrée et de sortie et doit être remplie par le développeur de composants à partir des colonnes au niveau de la source de données externe. Cette collection permet de valider les colonnes d’entrée lorsque le concepteur SSIS est hors connexion, lorsque le composant est déconnecté ou lorsque la propriété ValidateExternalMetadata a la valeur false.

L'exemple de code suivant ajoute une colonne de métadonnées externe basée sur une colonne d'entrée existante.

private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)  
{  
    // Set the properties of the external metadata column.  
    IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();  
    externalColumn.Name = inputColumn.Name;  
    externalColumn.Precision = inputColumn.Precision;  
    externalColumn.Length = inputColumn.Length;  
    externalColumn.DataType = inputColumn.DataType;  
    externalColumn.Scale = inputColumn.Scale;  
  
    // Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID;  
}  
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)  
  
    ' Set the properties of the external metadata column.  
    Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()  
    externalColumn.Name = inputColumn.Name  
    externalColumn.Precision = inputColumn.Precision  
    externalColumn.Length = inputColumn.Length  
    externalColumn.DataType = inputColumn.DataType  
    externalColumn.Scale = inputColumn.Scale  
  
    ' Map the external column to the input column.  
    inputColumn.ExternalMetadataColumnID = externalColumn.ID  
  
End Sub  

Moment de l'exécution

Pendant l'exécution, le composant de destination reçoit un appel à la méthode ProcessInput chaque fois qu'un PipelineBuffer saturé est disponible à partir du composant en amont. Cette méthode est appelée à plusieurs reprises jusqu’à ce qu’il n’y ait plus de mémoire tampon disponible et que la propriété EndOfRowset prenne la valeur true. Pendant l'exécution de cette méthode, les composants de destination lisent les colonnes et les lignes dans la mémoire tampon et les ajoutent à la source de données externe.

Localisation des colonnes dans le tampon

La mémoire tampon d'entrée d'un composant contient toutes les colonnes définies dans les collections de colonnes de sortie des composants situés en amont du composant dans le flux de données. Par exemple, si un composant source fournit trois colonnes dans sa sortie, et que le composant suivant ajoute une colonne de sortie supplémentaire, la mémoire tampon fournie au composant de destination contient quatre colonnes, même si le composant de destination n'écrit que deux colonnes.

L'ordre des colonnes dans la mémoire tampon d'entrée n'est pas défini par l'index de la colonne dans la collection de colonnes d'entrée du composant de destination. Des colonnes peuvent être placées de manière fiable dans une ligne de la mémoire tampon à l'aide de la méthode FindColumnByLineageID de BufferManager. Cette méthode recherche la colonne avec l'ID de lignage spécifié dans la mémoire tampon spécifiée et retourne son emplacement dans la ligne. Les index des colonnes d'entrée sont généralement localisés lors de l'exécution de la méthode PreExecute et mis en cache afin d'être utilisés ultérieurement par le développeur lors de l'exécution de la méthode ProcessInput.

L'exemple de code suivant recherche l'emplacement des colonnes d'entrée dans la mémoire tampon pendant l'exécution de la méthode PreExecute et les stocke dans un tableau. Le tableau est ensuite utilisé au cours de l'exécution de la méthode ProcessInput pour lire les valeurs des colonnes dans la mémoire tampon.

int[] cols;  
  
public override void PreExecute()  
{  
    IDTSInput100 input = ComponentMetaData.InputCollection[0];  
  
    cols = new int[input.InputColumnCollection.Count];  
  
    for (int x = 0; x < input.InputColumnCollection.Count; x++)  
    {  
        cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);  
    }  
}  
Private cols As Integer()  
  
Public Overrides Sub PreExecute()  
  
    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)  
  
    ReDim cols(input.InputColumnCollection.Count)  
  
    For x As Integer = 0 To input.InputColumnCollection.Count  
  
        cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)  
    Next x  
  
End Sub  

Traitement de lignes

Une fois que les colonnes d'entrée ont été localisées dans la mémoire tampon, elles peuvent être lues et écrites dans la source de données externe.

Pendant que le composant de destination écrit des lignes dans la source de données externe, vous pouvez mettre à jour les compteurs de performance « Lignes lues » ou « Octets BLOB lus » en appelant la méthode IncrementPipelinePerfCounter. Pour plus d’informations, consultez Compteurs de performances.

L'exemple suivant présente un composant qui lit des lignes à partir de la mémoire tampon fournie dans ProcessInput. La recherche des index des colonnes dans la mémoire tampon a été effectuée au cours de l'exécution de la méthode PreExecute dans l'exemple de code précédent.

public override void ProcessInput(int inputID, PipelineBuffer buffer)  
{  
        while (buffer.NextRow())  
        {  
            foreach (int col in cols)  
            {  
                if (!buffer.IsNull(col))  
                {  
                    //  TODO: Read the column data.  
                }  
            }  
        }  
}  
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)  
  
        While (buffer.NextRow())  
  
            For Each col As Integer In cols  
  
                If buffer.IsNull(col) = False Then  
  
                    '  TODO: Read the column data.  
                End If  
  
            Next col  
        End While  
End Sub  

Exemple

L'exemple suivant montre un composant de destination simple qui utilise un gestionnaire de connexions de fichiers pour enregistrer les données binaires du flux de données dans des fichiers. Cet exemple ne contient pas toutes les méthodes et fonctionnalités présentées dans cette rubrique. Il illustre les méthodes importantes que chaque composant de destination personnalisé doit substituer, mais ne contient pas de code pour la validation au moment de la conception.

using System;  
using System.IO;  
using Microsoft.SqlServer.Dts.Pipeline;  
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;  
  
namespace BlobDst  
{  
  [DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]  
  public class BlobDst : PipelineComponent  
  {  
    string m_DestDir;  
    int m_FileNameColumnIndex = -1;  
    int m_BlobColumnIndex = -1;  
  
    public override void ProvideComponentProperties()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection.New();  
      input.Name = "BLOB Extractor Destination Input";  
      input.HasSideEffects = true;  
  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();  
      conn.Name = "FileConnection";  
    }  
  
    public override void AcquireConnections(object transaction)  
    {  
      IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];  
      m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);  
  
      if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')  
        m_DestDir += "\\";  
    }  
  
    public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)  
    {  
      IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);  
      IDTSCustomProperty100 custProp;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsFileName";  
      custProp.Value = (object)false;  
  
      custProp = inputColumn.CustomPropertyCollection.New();  
      custProp.Name = "IsBLOB";  
      custProp.Value = (object)false;  
  
      return inputColumn;  
    }  
  
    public override void PreExecute()  
    {  
      IDTSInput100 input = ComponentMetaData.InputCollection[0];  
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;  
      IDTSCustomProperty100 custProp;  
  
      foreach (IDTSInputColumn100 column in inputColumns)  
      {  
        custProp = column.CustomPropertyCollection["IsFileName"];  
        if ((bool)custProp.Value == true)  
        {  
          m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
  
        custProp = column.CustomPropertyCollection["IsBLOB"];  
        if ((bool)custProp.Value == true)  
        {  
          m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);  
        }  
      }  
    }  
  
    public override void ProcessInput(int inputID, PipelineBuffer buffer)  
    {  
      while (buffer.NextRow())  
      {  
        string strFileName = buffer.GetString(m_FileNameColumnIndex);  
        int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);  
        byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);  
  
        strFileName = TranslateFileName(strFileName);  
  
        // Make sure directory exists before creating file.  
        FileInfo fi = new FileInfo(strFileName);  
        if (!fi.Directory.Exists)  
          fi.Directory.Create();  
  
        // Write the data to the file.  
        FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);  
        fs.Write(blobData, 0, blobLength);  
        fs.Close();  
      }  
    }  
  
    private string TranslateFileName(string fileName)  
    {  
      if (fileName.Length > 2 && fileName[1] == ':')  
        return m_DestDir + fileName.Substring(3, fileName.Length - 3);  
      else  
        return m_DestDir + fileName;  
    }  
  }  
}  
Imports System   
Imports System.IO   
Imports Microsoft.SqlServer.Dts.Pipeline   
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper   
Namespace BlobDst   
  
 <DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _   
 Public Class BlobDst   
 Inherits PipelineComponent   
   Private m_DestDir As String   
   Private m_FileNameColumnIndex As Integer = -1   
   Private m_BlobColumnIndex As Integer = -1   
  
   Public  Overrides Sub ProvideComponentProperties()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New   
     input.Name = "BLOB Extractor Destination Input"   
     input.HasSideEffects = True   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New   
     conn.Name = "FileConnection"   
   End Sub   
  
   Public  Overrides Sub AcquireConnections(ByVal transaction As Object)   
     Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)   
     m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)   
     If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then   
       m_DestDir += "\"   
     End If   
   End Sub   
  
   Public  Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100   
     Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)   
     Dim custProp As IDTSCustomProperty100   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsFileName"   
     custProp.Value = CType(False, Object)   
     custProp = inputColumn.CustomPropertyCollection.New   
     custProp.Name = "IsBLOB"   
     custProp.Value = CType(False, Object)   
     Return inputColumn   
   End Function   
  
   Public  Overrides Sub PreExecute()   
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)   
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection   
     Dim custProp As IDTSCustomProperty100   
     For Each column As IDTSInputColumn100 In inputColumns   
       custProp = column.CustomPropertyCollection("IsFileName")   
       If CType(custProp.Value, Boolean) = True Then   
         m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
       custProp = column.CustomPropertyCollection("IsBLOB")   
       If CType(custProp.Value, Boolean) = True Then   
         m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)   
       End If   
     Next   
   End Sub   
  
   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)   
     While buffer.NextRow   
       Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)   
       Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)   
       Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)   
       strFileName = TranslateFileName(strFileName)   
       Dim fi As FileInfo = New FileInfo(strFileName)   
       ' Make sure directory exists before creating file.  
       If Not fi.Directory.Exists Then   
         fi.Directory.Create   
       End If   
       ' Write the data to the file.  
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)   
       fs.Write(blobData, 0, blobLength)   
       fs.Close   
     End While   
   End Sub   
  
   Private Function TranslateFileName(ByVal fileName As String) As String   
     If fileName.Length > 2 AndAlso fileName(1) = ":"C Then   
       Return m_DestDir + fileName.Substring(3, fileName.Length - 3)   
     Else   
       Return m_DestDir + fileName   
     End If   
   End Function   
 End Class   
End Namespace  

Voir aussi

Développement d’un composant source personnalisé
Création d’une destination à l’aide du composant Script