Пользовательские разделители для PLINQ и TPL

Одним из основных шагов при распараллеливании операции над источником данных является секционирование источника, чтобы несколько потоков могли параллельно обращаться к нескольким секциям. PLINQ и библиотека параллельных задач (TPL) предоставляют стандартные средства секционирования, которые прозрачно работают при создании параллельного запроса или цикла ForEach. Для более сложных сценариев вы можете подключить собственное средство секционирования.

Виды секционирования

Есть много разных способов секционировать источник данных. Самые эффективные подходы позволяют нескольким потокам совместно обрабатывать исходную последовательность, не разделяя источник физически на несколько подпоследовательностей. Для массивов и других индексированных источников, например коллекций IList, длина которых известна заранее, проще всего применить секционирование по диапазонам. В этом варианте каждый поток получает уникальные индексы начала и окончания диапазона, что позволяет ему обработать свою часть данных без конфликтов с другими потоками. Секционирование по диапазонам привносит только один тип накладных расходов, связанный с начальным процессом создания этих диапазонов. При дальнейшей работе никакой дополнительной синхронизации не требуется. Такой подход обеспечит хорошую производительность, если рабочую нагрузку удастся распределить равномерно. Недостаток секционирования по диапазонам заключается в том, что поток, завершивший работу раньше других, не может помочь другим потокам завершить работу.

Для связанных списков и других коллекций, длина которых неизвестна, можно применить блочное секционирование. В этом варианте каждый поток, каждая задача в параллельных циклах или каждый запрос поочередно извлекает определенное количество исходных элементов в виде блока, обрабатывает их и снова обращается за дополнительным блоком элементов. Средство секционирования следит за тем, чтобы все элементы были обработаны строго один раз. Могут использоваться блоки любого размера. Например, средство секционирования из статьи Практическое руководство. Реализация динамических разделов создает блоки размером в один элемент. Если размер блока не слишком велик, такой тип секционирования автоматически правильно балансирует нагрузку, так как не использует предварительное распределение элементов по потокам. Но средство секционирования все же добавляет определенные накладные расходы, связанные с синхронизацией, при каждом получении очередного блока для потока. Степень синхронизации в этом алгоритме обратно пропорциональна размеру блоков.

В целом секционирование по диапазонам выполняется быстрее только в том случае, если делегат выполняется за малый или средний период времени, а источник имеет большое количество элементов и трудозатраты на каждую секцию примерно одинаковы. Соответственно, блочное секционирование в большинстве случаев работает быстрее. Для источников с небольшим числом элементов или для делегатов с продолжительным временем выполнения производительность этих двух типов секционирования примерно равна.

Средства секционирования библиотеки TPL также поддерживают динамическое количество секций. Такой подход означает создание секций "на лету", например при создании новой задачи в цикле ForEach. Эта функция позволяет средству секционирования масштабироваться синхронно с циклом. Динамические средства секционирования также автоматически балансируют нагрузку. Если вы создаете собственное средство секционирования, обязательно обеспечьте поддержку динамического секционирования из цикла ForEach.

Настройка средств секционирования с балансировкой нагрузки для PLINQ

Некоторые перегрузки метода Partitioner.Create позволяют создать средство секционирования для массива или источника IList и выбрать, будет ли он распределять рабочую нагрузку между потоками. Если средство секционирования поддерживает балансировку нагрузки, используется блочное секционирование и элементы распределяются небольшими блоками по мере запрашивания. Такой подход нужен для того, чтобы все секции содержали элементы для обработки до полного завершения цикла или запроса. Можно использовать дополнительные перегрузки, чтобы реализовать балансировку нагрузки для любого источника IEnumerable.

Обычно для балансировки нагрузки важно, чтобы секции относительно часто запрашивали новые элементы у средства секционирования. Напротив, средство статического секционирования может распределить все элементы единовременно, используя секционирование по диапазонам или блочное секционирование. Накладные расходы в этом случае будут ниже, чем при балансировке нагрузки, но выполнение может занять больше времени, если одному из потоков достанется существенно больше работы, чем остальным. По умолчанию PLINQ всегда использует секционирование по диапазонам без балансировки нагрузки, когда получает на вход ILIst или массив. Чтобы включить в PLINQ балансировку нагрузки, используйте метод Partitioner.Create, как показано в следующем примере.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

Самый надежный способ определиться с применением балансировки нагрузки для конкретного сценария — выполнить эксперименты и замерить длительность выполнения операций при типичных нагрузках и разных конфигурациях компьютера. Например статическое секционирование может значительно ускорить работу на компьютере с небольшим количеством ядер, но привести к замедлению работы на компьютерах с относительно большим количеством ядер.

В следующей таблице перечислены все доступные перегрузки метода Create. Эти средства секционирования можно использовать не только с PLINQ или Task. Они пригодны для любой пользовательской конструкции с параллельной обработкой.

Перегрузка Использование балансировки нагрузки
Create<TSource>(IEnumerable<TSource>) Всегда
Create<TSource>(TSource[], Boolean) Если передан логический аргумент со значением true
Create<TSource>(IList<TSource>, Boolean) Если передан логический аргумент со значением true
Create(Int32, Int32) Никогда
Create(Int32, Int32, Int32) Никогда
Create(Int64, Int64) Никогда
Create(Int64, Int64, Int64) Никогда

Настройка средств статического секционирования по диапазонам для Parallel.ForEach

Когда используется цикл For, тело этого цикла передается методу в качестве делегата. Затраты на вызов этого делегата будут примерно такими же, как на вызов виртуального метода. В некоторых сценариях тело цикла параллельной обработки будет настолько небольшим, что затраты на вызов делегата при каждой итерации цикла составят значительную часть нагрузки. В такой ситуации вы можете применить одну из перегрузок Create, чтобы создать IEnumerable<T> для секций диапазонов, распределив по ним исходные элементы. Эта коллекция диапазонов затем передается в метод ForEach, тело которого состоит из стандартного цикла for. Преимущество этого подхода заключается в том, что затраты на вызов делегата применяются только один раз для каждого диапазона, а не для каждого элемента. В следующем примере показан базовый шаблон.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Каждый поток в цикле получает собственный класс Tuple<T1,T2>, который содержит начальное и конечное значения индекса для назначенного ему поддиапазона. Внутренний цикл for использует значения fromInclusive и toExclusive, напрямую перебирая массив или IList.

Одна из перегрузок Create позволяет указать размер и количество секций. Эта перегрузка удобна для тех ситуаций, в которых обработка каждого элемента требует так мало работы, что даже один вызов виртуального метода для каждого элемента существенно снизит производительность.

Пользовательские разделители

В некоторых случаях будет оправданно или даже необходимо реализовать пользовательское средство секционирования. Например, если у вас есть пользовательский класс коллекции и знание его внутренней структуры позволяет вам секционировать его более эффективно, чем это делают стандартные средства секционирования. Или же вы знаете, что на обработку элементов, расположенных в разных местах исходной коллекции, потребуется разное и прогнозируемое время, а значит можете создать секции с диапазонами разных размеров.

Чтобы создать простое пользовательское средство секционирования, наследуйте класс от System.Collections.Concurrent.Partitioner<TSource> и переопределите для него виртуальные методы, которые описаны в следующей таблице.

Метод Description
GetPartitions Этот метод вызывается основным потоком один раз и возвращает IList(IEnumerator(TSource)). Каждый рабочий поток в цикле или запросе может вызвать GetEnumerator для списка, чтобы получить IEnumerator<T> по конкретной секции.
SupportsDynamicPartitions Возвращайте true, если вы реализовали GetDynamicPartitions, в противном случае — false.
GetDynamicPartitions Если SupportsDynamicPartitions имеет значение true, можно (но не обязательно) вызвать этот метод вместо GetPartitions.

Если результаты требуют сортировки или вам нужен доступ к элементам по индексам, наследуйте класс от System.Collections.Concurrent.OrderablePartitioner<TSource> и переопределите его виртуальные методы, как описано в следующей таблице.

Метод Description
GetPartitions Этот метод вызывается основным потоком один раз и возвращает IList(IEnumerator(TSource)). Каждый рабочий поток в цикле или запросе может вызвать GetEnumerator для списка, чтобы получить IEnumerator<T> по конкретной секции.
SupportsDynamicPartitions Возвращайте true, если вы реализовали GetDynamicPartitions, или FALSE в противном случае.
GetDynamicPartitions Как правило, он просто вызывает GetOrderableDynamicPartitions.
GetOrderableDynamicPartitions Если SupportsDynamicPartitions имеет значение true, можно (но не обязательно) вызвать этот метод вместо GetPartitions.

В следующей таблице приведены дополнительные сведения о том, как реализован класс OrderablePartitioner<TSource> в трех типах средств секционирования с балансировкой нагрузки.

Метод или свойство IList или массив без балансировки нагрузки IList или массив с балансировкой нагрузки IEnumerable
GetOrderablePartitions Использует секционирование по диапазонам Использует блочное секционирование, оптимизированное для списков с указанным значением partitionCount Использует блочное секционирование, создавая статическое количество секций.
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Создает неподдерживаемое исключение Использует блочное секционирование для списков и динамических секций Использует блочное секционирование, создавая динамическое количество секций.
KeysOrderedInEachPartition Возвращает true. Возвращает true. Возвращает true.
KeysOrderedAcrossPartitions Возвращает true. Возвращает false. Возвращает false.
KeysNormalized Возвращает true. Возвращает true. Возвращает true.
SupportsDynamicPartitions Возвращает false. Возвращает true. Возвращает true.

Динамические секции

Если ваше средство секционирования будет использоваться в методе ForEach, нужно поддерживать создание динамического количества секций. Это означает, что средство секционирования в любой момент обработки цикла может передать перечислитель для создания новой секции по требованию. Обычно цикл запрашивает новую секцию для каждой новой параллельной задачи, которую он создает. Если вам важно поддерживать упорядочение данных, наследуйте класс от System.Collections.Concurrent.OrderablePartitioner<TSource>, чтобы присваивать уникальный индекс каждому элементу в каждой секции.

Дополнительные сведения и пример см. в статье Практическое руководство. Реализация динамических разделов.

Контракт для средств секционирования

Если вы реализуете пользовательское средство секционирования, придерживайтесь следующих рекомендаций для корректного взаимодействия с PLINQ и ForEach в TPL:

  • Если GetPartitions вызывается с аргументом partitionsCount, значение которого равно нулю или меньше, создавайте исключение ArgumentOutOfRangeException. Несмотря на то, что PLINQ и TPL никогда не передают для partitionCount значение 0, мы рекомендуем подстраховаться от такой ситуации.

  • GetPartitions и GetOrderablePartitions должны всегда возвращать число секций (partitionsCount). Если у средства секционирования недостаточно данных, чтобы создать требуемое число секций, метод должен возвращать пустой перечислитель для всех остальных секций. В противном случае PLINQ и TPL создадут исключение InvalidOperationException.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitions и GetOrderableDynamicPartitions никогда не должны возвращать null (Nothing в Visual Basic). В противном случае PLINQ и (или) TPL создадут исключение InvalidOperationException.

  • Методы, которые возвращают секции, должны всегда возвращать секции, которые способны полностью и однозначно перечислить данные из источника. Не допускайте дублирования или пропуска элементов в источнике данных, если такое поведение не требуется от средства секционирования для конкретных целей. Если вы не будете соблюдать это правило, может быть нарушен порядок выходных данных.

  • Следующие методы получения логических значений должны всегда точно возвращать следующие значения, чтобы не нарушать порядок выходных данных:

    • KeysOrderedInEachPartition. Каждая секция возвращает элементы в порядке увеличения индексов ключа.

    • KeysOrderedAcrossPartitions. Для всех возвращаемых секций соблюдается условие, что все индексы ключа в секции i выше, чем все индексы ключа в секции i–1.

    • KeysNormalized. Все индексы ключа возрастают монотонно и без промежутков, начиная с нуля.

  • Все индексы должны быть уникальными. Не допускается повторение индексов. Если вы не будете соблюдать это правило, может быть нарушен порядок выходных данных.

  • Все индексы должны быть неотрицательными. Если это правило не соблюдается, PLINQ и (или) TPL могут создавать исключения.

См. также