Разработка компонентов потоков данных с несколькими входами

Компонент потока данных с несколькими входами может использовать чрезмерное количество ресурсов памяти в случае неравномерного поступления данных из нескольких входов этого потока. При разработке пользовательского компонента потока данных, поддерживающего два или более входов, нагрузку на ресурсы памяти можно контролировать с помощью следующих элементов в пространстве имен Microsoft.SqlServer.Dts.Pipeline:

  • Свойство DtsPipelineComponentAttribute.SupportsBackPressure класса DtsPipelineComponentAttribute. Задайте значение этого свойства равным true, если необходимо реализовать код, который требуется пользовательскому компоненту потока данных для управления данными, поступающими с неравномерной скоростью.

  • Метод IsInputReady класса PipelineComponent. Необходимо обеспечить реализацию этого метода, если для свойства SupportsBackPressure задано значение true. Если реализация не будет предоставлена, то подсистема обработки потока данных сформирует исключение во время выполнения.

  • Метод GetDependentInputs класса PipelineComponent. Также необходимо обеспечить реализацию этого метода, если для свойства SupportsBackPressure задано значение true, а пользовательский компонент поддерживает более двух входов. Если реализация не будет предоставлена, то подсистема обработки потока данных сформирует исключение во время выполнения при подключении пользователем более двух входов.

Совместно эти элементы позволяют разработать решение управления нагрузкой на ресурсы памяти схожее с решением, разработанным корпорацией Майкрософт для преобразований «Слияние» и «Соединение слиянием».

Настройка свойства SupportsBackPressure

Первым этапом реализации усовершенствованного управления ресурсами памяти для пользовательского компонента потока данных, поддерживающего несколько входов, является задание значения свойства SupportsBackPressure равным true в DtsPipelineComponentAttribute. Если значение SupportsBackPressure равно true, подсистема обработки потока данных вызывает метод IsInputReady и, при наличии более двух входов, также вызывает GetDependentInputs во время выполнения.

Пример

В следующем примере реализация DtsPipelineComponentAttribute задает значение SupportsBackPressure равным true.

[DtsPipelineComponent(ComponentType = ComponentType.Transform,
        DisplayName = "Shuffler",
        Description = "Shuffle the rows from input.",
        SupportsBackPressure = true,
        LocalizationType = typeof(Localized),
        IconResource = "Microsoft.Samples.SqlServer.Dts.MIBPComponent.ico")
]
public class Shuffler : Microsoft.SqlServer.Dts.Pipeline.PipelineComponent
        {
          ...
        }

Реализация метода IsInputReady

При задании значения свойства SupportsBackPressure равным true в объекте DtsPipelineComponentAttribute, также необходимо обеспечить реализацию для метода IsInputReady класса PipelineComponent.

ПримечаниеПримечание

Реализация метода IsInputReady не должна вызывать реализации базового класса. Реализация этого метода по умолчанию в базовом классе просто вызывает исключение NotImplementedException.

Во время реализации этого метода нужно задать состояние элемента в массиве canProcess типа Boolean для каждого входа компонента. (Входы определяются значениями идентификаторов в массиве inputIDs). При задании значения элемента в массиве canProcess равным true для входа, подсистема обработки потока данных вызывает метод компонента ProcessInput и предоставляет дополнительные данные для указанного входа.

Если доступны дополнительные восходящие данные, значение элемента массива canProcess хотя бы для одного входа должно быть равным true, или обработка будет прервана.

Подсистема обработки потока данных вызывает метод IsInputReady до отправки всех буферов с данными для определения входов, ожидающих получения дополнительных данных. Если возвращаемое значение указывает на блокировку входа, подсистема обработки потока данных временно кэширует дополнительные буферы данных для этого входа вместо отправки их компоненту.

ПримечаниеПримечание

Не следует вызывать методы IsInputReady или GetDependentInputs в собственном коде. Подсистема обработки потока данных вызывает эти методы класса PipelineComponent, которые переопределяются при запуске подсистемой обработки потока данных компонента.

Пример

В следующем примере реализация метода IsInputReady указывает на ожидание входом приема дополнительных данных при соблюдении следующих условий:

  • Для входа доступны дополнительные данные выше по уровню обработки (!inputEOR).

  • В буферах, уже полученных компонентом (inputBuffers[inputIndex].CurrentRow() == null), сейчас отсутствуют данные, доступные для обработки для входа.

Если вход ожидает приема дополнительных данных, компонент потока данных указывает на это путем задания значения true для элемента в массиве canProcess, соответствующем этому входу.

И наоборот, если у компонента все еще имеются данные, доступные для обработки, для входа, в примере обработка входа приостанавливается. В примере это выполняется путем задания значения false для элемента в массиве canProcess, соответствующем этому входу.

public override void IsInputReady(int[] inputIDs, ref bool[] canProcess)
{
    for (int i = 0; i < inputIDs.Length; i++)
    {
        int inputIndex = ComponentMetaData.InputCollection.GetObjectIndexByID(inputIDs[i]);

        canProcess[i] = (inputBuffers[inputIndex].CurrentRow() == null)
            && !inputEOR[inputIndex];
    }
}

В предыдущем примере использовался массив inputEOR типа Boolean для указания наличия дополнительных восходящих данных, доступных для каждого входа. EOR в имени массива представляет «конец набора строк» и ссылается на свойство EndOfRowset буферов потока данных. В части примера, не приведенной здесь, метод ProcessInput проверяет значение свойства EndOfRowset для всех получаемых буферов данных. Если значение true указывает на отсутствие дополнительных данных с предыдущего этапа для входа, значение элемента массива inputEOR для этого входа задается в примере равным true. Образец метода IsInputReady задает значение соответствующего элемента в массиве canProcess равным false для входа, если значение элемента массива inputEOR указывает на отсутствие доступных данных с предыдущего этапа для входа.

Реализация метода GetDependentInputs

Если пользовательский компонент потока данных поддерживает более двух входов, также необходимо обеспечить реализацию для метода GetDependentInputs класса PipelineComponent.

ПримечаниеПримечание

Реализация метода GetDependentInputs не должна вызывать реализации базового класса. Реализация этого метода по умолчанию в базовом классе просто вызывает исключение NotImplementedException.

Подсистема обработки потока данных вызывает метод GetDependentInputs только в случаях, когда пользователь подключает к компоненту более двух входов. Если у компонента имеется только два входа и метод IsInputReady указывает, что один вход заблокирован (canProcess = false), подсистеме обработки потока данных известно, что другой вход ожидает получения дополнительных данных. Однако при наличии более двух входов и в случае, если метод IsInputReady указывает на блокировку одного из входов, дополнительный код в GetDependentInputs указывает входы, ожидающие приема дополнительных данных.

ПримечаниеПримечание

Не следует вызывать методы IsInputReady или GetDependentInputs в собственном коде. Подсистема обработки потока данных вызывает эти методы класса PipelineComponent, которые переопределяются при запуске подсистемой обработки потока данных компонента.

Пример

Для конкретного блокируемого входа следующая реализация метода GetDependentInputs возвращает коллекцию входов, ожидающих дополнительных данных и поэтому блокирующих указанный вход. Компонент определяет блокирующие входы путем проверки наличия других входов помимо заблокированного, которые не имеют в настоящее время в буферах доступных для обработки данных, которые уже получены компонентом (inputBuffers[i].CurrentRow() == null). Затем метод GetDependentInputs возвращает коллекцию блокирующих входов в виде коллекции идентификаторов входов.

        public override Collection<int> GetDependentInputs(int blockedInputID)
        {
            Collection<int> currentDependencies = new Collection<int>();
            for (int i = 0; i < ComponentMetaData.InputCollection.Count; i++)
            {
                if (ComponentMetaData.InputCollection[i].ID != blockedInputID
                    && inputBuffers[i].CurrentRow() == null)
                {
                    currentDependencies.Add(ComponentMetaData.InputCollection[i].ID);
                }
            }
            
            return currentDependencies;
        }