Développement de composants de flux de données avec plusieurs entréesDeveloping Data Flow Components with Multiple Inputs

Un composant de flux de données avec plusieurs entrées peut consommer excessivement de la mémoire si ses entrées multiples produisent des données à des taux irréguliers.A data flow component with multiple inputs may consume excessive memory if its multiple inputs produce data at uneven rates. Lorsque vous développez un composant de flux de données personnalisé qui prend en charge plusieurs entrées, vous pouvez gérer cette sollicitation de la mémoire en utilisant les membres suivants dans l’espace de noms Microsoft.SqlServer.Dts.Pipeline :When you develop a custom data flow component that supports two or more inputs, you can manage this memory pressure by using the following members in the Microsoft.SqlServer.Dts.Pipeline namespace:

  • La propriété SupportsBackPressure de la classe DtsPipelineComponentAttribute.The SupportsBackPressure property of the DtsPipelineComponentAttribute class. Définissez la valeur de cette propriété sur true si vous souhaitez implémenter le code dont votre composant de flux de données personnalisé a besoin pour gérer des flux de données irréguliers.Set the value of this property to true if you want to implement the code that is necessary for your custom data flow component to manage data flowing at uneven rates.

  • La méthode IsInputReady de la classe PipelineComponent.The IsInputReady method of the PipelineComponent class. Vous devez fournir une implémentation de cette méthode si vous définissez la propriété SupportsBackPressure sur true.You must provide an implementation of this method if you set the SupportsBackPressure property to true. Si vous ne fournissez pas d'implémentation, le moteur de flux de données lève une exception au moment de l'exécution.If you do not provide an implementation, the data flow engine raises an exception at run time.

  • La méthode GetDependentInputs de la classe PipelineComponent.The GetDependentInputs method of the PipelineComponent class. Vous devez également fournir une implémentation de cette méthode si vous définissez la propriété SupportsBackPressure sur true et que votre composant personnalisé prend en charge plusieurs entrées.You must also provide an implementation of this method if you set the SupportsBackPressure property to true and your custom component supports more than two inputs. Si vous ne fournissez pas d'implémentation, le moteur de flux de données lève une exception au moment de l'exécution si l'utilisateur joint plusieurs entrées.If you do not provide an implementation, the data flow engine raises an exception at run time if the user attaches more than two inputs.

    Ces deux membres vous permettent de développer une solution pour la sollicitation de la mémoire qui est semblable à la solution que Microsoft a développée pour les transformations de fusion et de jointure de la fusion.Together, these members enable you to develop a solution for memory pressure that is similar to the solution that Microsoft developed for the Merge and Merge Join transformations.

Définition de la propriété SupportsBackPressureSetting the SupportsBackPressure Property

La première étape pour l’implémentation d’une meilleure gestion de la mémoire pour un composant de flux de données personnalisé qui prend en charge plusieurs entrées consiste à définir la valeur de la propriété SupportsBackPressure sur true dans la classe DtsPipelineComponentAttribute.The first step in implementing better memory management for a custom data flow component that supports multiple inputs is to set the value of the SupportsBackPressure property to true in the DtsPipelineComponentAttribute. Lorsque la valeur de SupportsBackPressure est true, le moteur de flux de données appelle la méthode IsInputReady et, lorsqu’il y a plus de deux entrées, il appelle également la méthode GetDependentInputs au moment de l’exécution.When the value of SupportsBackPressure is true, the data flow engine calls the IsInputReady method and, when there are more than two inputs, also calls the GetDependentInputs method at run time.

ExempleExample

Dans l’exemple suivant, l’implémentation de DtsPipelineComponentAttribute définit la valeur de SupportsBackPressure sur true.In the following example, the implementation of the DtsPipelineComponentAttribute sets the value of SupportsBackPressure to true.

[DtsPipelineComponent(ComponentType = ComponentType.Transform,  
        DisplayName = "Shuffler",  
        Description = "Shuffle the rows from input.",  
        SupportsBackPressure = true,  
        LocalizationType = typeof(Localized),  
        IconResource = "Microsoft.Samples.SqlServer.Dts.MIBPComponent.ico")  
]  
public class Shuffler : Microsoft.SqlServer.Dts.Pipeline.PipelineComponent  
        {  
          ...  
        }  

Implémentation de la méthode IsInputReadyImplementing the IsInputReady Method

Lorsque vous définissez la valeur de la propriété SupportsBackPressure sur true dans l’objet DtsPipelineComponentAttribute, vous devez également fournir une implémentation pour la méthode IsInputReady de la classe PipelineComponent.When you set the value of the SupportsBackPressure property to true in the DtsPipelineComponentAttribute object, you must also provide an implementation for the IsInputReady method of the PipelineComponent class.

Note

Votre implémentation de la méthode IsInputReady ne doit pas appeler les implémentations dans la classe de base.Your implementation of the IsInputReady method should not call the implementations in the base class. L’implémentation par défaut de cette méthode dans la classe de base lève simplement une exception NotImplementedException.The default implementation of this method in the base class simply raises a NotImplementedException.

Lorsque vous implémentez cette méthode, vous définissez l’état d’un élément dans le tableau booléen canProcess pour chacune des entrées du composant.When you implement this method, you set the status of an element in the Boolean canProcess array for each of the component's inputs. (Les entrées sont identifiées par leur valeur d’ID dans le tableau inputIDs.) Lorsque vous définissez la valeur d’un élément du tableau canProcess sur true pour une entrée, le moteur de flux de données appelle la méthode ProcessInput du composant et fournit davantage de données pour l’entrée spécifiée.(The inputs are identified by their ID values in the inputIDs array.) When you set the value of an element in the canProcess array to true for an input, the data flow engine calls the component's ProcessInput method and provides more data for the specified input.

Si plus de données en amont sont disponibles, la valeur de l’élément de tableau canProcess pour au moins une entrée doit toujours être true, ou traiter les arrêts.While more upstream data is available, the value of the canProcess array element for at least one input must always be true, or processing stops.

Le moteur de flux de données appelle la méthode IsInputReady avant d'envoyer chaque mémoire tampon de données pour déterminer quelles entrées attendent de recevoir d'autres données.The data flow engine calls the IsInputReady method before sending each buffer of data to determine which inputs are waiting to receive more data. Lorsque la valeur de retour indique qu'une entrée est bloquée, le moteur de flux de données met temporairement en cache les mémoires tampon supplémentaires de données pour cette entrée au lieu de les envoyer au composant.When the return value indicates that an input is blocked, the data flow engine temporarily caches additional buffers of data for that input instead of sending them to the component.

Note

Vous n'appelez pas les méthodes IsInputReady ou GetDependentInputs dans votre propre code.You do not call the IsInputReady or GetDependentInputs methods in your own code. Le moteur de flux de données appelle ces méthodes, et les autres méthodes de la classe PipelineComponent que vous remplacez, lorsque le moteur de flux de données exécute votre composant.The data flow engine calls these methods, and the other methods of the PipelineComponent class that you override, when the data flow engine runs your component.

ExempleExample

Dans l'exemple suivant, l'implémentation de la méthode IsInputReady indique qu'une entrée attend pour recevoir plus de données lorsque les conditions suivantes sont remplies :In the following example, the implementation of the IsInputReady method indicates that an input is waiting to receive more data when the following conditions are true:

  • Plus de données en amont sont disponibles pour l'entrée (!inputEOR).More upstream data is available for the input (!inputEOR).

  • Le composant n'a pas actuellement de données disponibles pour traiter l'entrée dans les mémoires tampon que le composant a déjà reçues (inputBuffers[inputIndex].CurrentRow() == null).The component does not currently have data available to process for the input in the buffers that the component has already received (inputBuffers[inputIndex].CurrentRow() == null).

    Si une entrée attend de recevoir plus de données, le composant de flux de données indique cela en définissant sur true la valeur de l’élément dans le tableau canProcess qui correspond à cette entrée.If an input is waiting to receive more data, the data flow component indicates this by setting to true the value of the element in the canProcess array that corresponds to that input.

    Inversement, lorsque le composant a encore des données disponibles à traiter pour l'entrée, l'exemple interrompt le traitement de l'entrée.Conversely, when the component still has data available to process for the input, the example suspends the processing of the input. L’exemple fait cela en définissant sur false la valeur de l’élément dans le tableau canProcess qui correspond à cette entrée.The example does this by setting to false the value of the element in the canProcess array that corresponds to that input.

public override void IsInputReady(int[] inputIDs, ref bool[] canProcess)  
{  
    for (int i = 0; i < inputIDs.Length; i++)  
    {  
        int inputIndex = ComponentMetaData.InputCollection.GetObjectIndexByID(inputIDs[i]);  

        canProcess[i] = (inputBuffers[inputIndex].CurrentRow() == null)  
            && !inputEOR[inputIndex];  
    }  
}  

L'exemple précédent utilise le tableau inputEOR booléen pour indiquer si davantage de données en amont sont disponibles pour chaque entrée.The preceding example uses the Boolean inputEOR array to indicate whether more upstream data is available for each input. EOR dans le nom du tableau représente « fin d'ensemble de lignes » et fait référence à la propriété EndOfRowset de mémoires tampon de flux de données.EOR in the name of the array represents "end of rowset" and refers to the EndOfRowset property of data flow buffers. Dans une partie de l'exemple qui n'est pas inclus ici, la méthode ProcessInput vérifie la valeur de la propriété EndOfRowset pour chaque mémoire tampon des données qu'elle reçoit.In a portion of the example that is not included here, the ProcessInput method checks the value of the EndOfRowset property for each buffer of data that it receives. Lorsqu’une valeur true indique que plus aucune donnée en amont n’est disponible pour une entrée, l’exemple définit la valeur de l’élément de tableau inputEOR pour cette entrée sur true.When a value of true indicates that there is no more upstream data available for an input, the example sets the value of the inputEOR array element for that input to true. Cette exemple de la méthode IsInputReady définit la valeur de l’élément correspondant dans le tableau canProcess sur false pour une entrée lorsque la valeur de l’élément de tableau inputEOR indique qu’aucune donnée en amont n’est disponible pour l’entrée.This example of the IsInputReady method sets the value of the corresponding element in the canProcess array to false for an input when the value of the inputEOR array element indicates that there is no more upstream data available for the input.

Implémentation de la méthode GetDependentInputsImplementing the GetDependentInputs Method

Lorsque votre composant de flux de données personnalisé prend en charge plus de deux entrées, vous devez également fournir une implémentation pour la méthode GetDependentInputs de la classe PipelineComponent.When your custom data flow component supports more than two inputs, you must also provide an implementation for the GetDependentInputs method of the PipelineComponent class.

Note

Votre implémentation de la méthode GetDependentInputs ne doit pas appeler les implémentations dans la classe de base.Your implementation of the GetDependentInputs method should not call the implementations in the base class. L’implémentation par défaut de cette méthode dans la classe de base lève simplement une exception NotImplementedException.The default implementation of this method in the base class simply raises a NotImplementedException.

Le moteur de flux de données appelle seulement la méthode GetDependentInputs lorsque l'utilisateur joint plus de deux entrées au composant.The data flow engine only calls the GetDependentInputs method when the user attaches more than two inputs to the component. Lorsqu’un composant a uniquement deux entrées, et que la méthode IsInputReady indique qu’une entrée est bloquée (canProcess = false), le moteur de flux de données sait que l’autre entrée attend de recevoir plus de données.When a component has only two inputs, and the IsInputReady method indicates that one input is blocked (canProcess = false), the data flow engine knows that the other input is waiting to receive more data. Toutefois, lorsqu'il y a plus de deux entrées, et que la méthode IsInputReady indique qu'une entrée est bloquée, le code supplémentaire dans le GetDependentInputs identifie quelles entrées attendent de recevoir plus de données.However, when there are more than two inputs, and the IsInputReady method indicates that one input is blocked, the additional code in the GetDependentInputs identifies which inputs are waiting to receive more data.

Note

Vous n'appelez pas les méthodes IsInputReady ou GetDependentInputs dans votre propre code.You do not call the IsInputReady or GetDependentInputs methods in your own code. Le moteur de flux de données appelle ces méthodes, et les autres méthodes de la classe PipelineComponent que vous remplacez, lorsque le moteur de flux de données exécute votre composant.The data flow engine calls these methods, and the other methods of the PipelineComponent class that you override, when the data flow engine runs your component.

ExempleExample

Pour une entrée spécifique bloquée, l'implémentation suivante de la méthode GetDependentInputs retourne une collection des entrées qui attendent de recevoir plus de données et par conséquent bloque l'entrée spécifiée.For a specific input that is blocked, the following implementation of the GetDependentInputs method returns a collection of the inputs that are waiting to receive more data, and are therefore blocking the specified input. Le composant identifie les entrées bloquantes en recherchant d'autres entrées que celle qui est bloquée qui n'ont pas actuellement de données disponibles pour traiter dans les mémoires tampon que le composant a déjà reçues (inputBuffers[i].CurrentRow() == null).The component identifies the blocking inputs by checking for inputs other than the blocked input that do not currently have data available to process in the buffers that the component has already received (inputBuffers[i].CurrentRow() == null). La méthode GetDependentInputs retourne ensuite la collection d'entrées bloquantes comme une collection d'ID d'entrée.The GetDependentInputs method then returns the collection of blocking inputs as a collection of input IDs.

public override Collection<int> GetDependentInputs(int blockedInputID)  
{  
    Collection<int> currentDependencies = new Collection<int>();  
    for (int i = 0; i < ComponentMetaData.InputCollection.Count; i++)  
    {  
        if (ComponentMetaData.InputCollection[i].ID != blockedInputID  
            && inputBuffers[i].CurrentRow() == null)  
        {  
            currentDependencies.Add(ComponentMetaData.InputCollection[i].ID);  
        }  
    }  

    return currentDependencies;  
}