Многопоточный код

Шаблоны распараллеливания обработки данных и PLINQ

Игорь Островский

Многоядерные процессоры сейчас доминируют в настольных компьютерах, но приложения, использующие весь их потенциал, до сих пор трудно писать. Распараллеливание для многоядерных процессоров, безусловно, осуществимо, однако ряд популярных приложений был просто модернизирован для повышения их производительности на компьютерах с такими процессорами. .NET Framework 4 предоставит несколько средств, которые упростят программистам решение этой задачи: набор новых координирующих и синхронизирующих примитивов и структур данных, Task Parallel Library и Parallel LINQ (PLINQ). В этой статье основное внимание будет уделено PLINQ.

PLINQ — интересное средство, которое резко упрощает написание кода, масштабируемого на многоядерных процессорах, — при условии, что ваша задача соответствует шаблону параллельной обработки данных. PLINQ является LINQ-провайдером, поэтому при программировании с его применением вы имеете дело с уже знакомой вам моделью LINQ. PLINQ очень похож на LINQ-to-Objects с тем исключением, что использует несколько потоков для планирования работы, связанной с обработкой запроса. Чтобы связать запрос с PLINQ вместо LINQ-to-Objects, вы просто добавляете вызов AsParallel после задания источника данных, как показано в следующем коде. В этом случае источник данных заключается в оболочку ParallelQuery, а остальные методы расширения в запросе связываются с PLINQ вместо LINQ-to-Objects:

IEnumerable<int> src = ...
var query = 
      src.AsParallel()
      .Where(x => x % 2 == 0)
      .Select(x => Foo(x));

foreach(var x in query)
{
      Bar(x); }

А вот так выглядит тот же код при использовании синтаксиса запроса в C#:

IEnumerable<int> src = ...
var query = 
      from x in src.AsParallel()
      where x % 2 == 0
      select Foo(x);

foreach(var x in query)
{
      Bar(x); }

Однако размещение AsParallel в запросе LINQ-to-Objects еще не гарантирует, что ваша программа будет работать быстрее. PLINQ пытается использовать подходящие алгоритмы для разделения данных, независимого параллельного выполнения частей запроса и последующего объединения результатов. Даст ли эта стратегия выигрыш в производительности на многоядерном процессоре, зависит от нескольких факторов.

Чтобы добиться прироста производительности от применения PLINQ, общая работа, заключенная в запросе, должна быть достаточно большой, чтобы перевесить издержки ее распределения в пуле потоков, а объем работы на каждый элемент должен быть достаточно весомым, чтобы перевесить издержки, связанные с обработкой элемента.

В этой статье я рассмотрю типы шаблонов параллельной обработки данных, которые можно эффективно распараллеливать с помощью PLINQ.

Проекция

Проекция (projection, mapping), оператор Select — все эти термины относятся к одному и тому же понятию операции параллельной обработки данных. В проекции у вас есть функция, которая принимает один аргумент и вычисляет ответ, и вам нужно выполнять эту функцию применительно к набору входных элементов.

Проекция естественным образом поддерживает параллельные данные, поскольку функцию проекции можно выполнять применительно к разным входным элементам одновременно. Если эта функция оказывается дорогостоящей с точки зрения потребления вычислительных ресурсов, PLINQ сможет ускорить расчеты, распределив работу, связанную с выполнением функции, между несколькими ядрами процессора.

Например, в следующем запросе PLINQ выполняет вызовы ExpensiveFunc параллельно (по крайней мере на многоядерных процессорах):

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Select(x => ExpensiveFunc(x));

foreach(var x in query)
{
      Console.WriteLine(x);
}

Этот блок кода выводит в консольное окно значения ExpensiveFunc(0), ExpensiveFunc(1) и так до ExpensiveFunc(99). Однако значения не обязательно будут выводиться в ожидаемом порядке. По умолчанию PLINQ обрабатывает последовательности как неупорядоченные, поэтому значения будут появляться в неопределенном порядке.

Чтобы PLINQ обрабатывал массив src как упорядоченную последовательность, используйте оператор AsOrdered:

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel().AsOrdered()
             .Select(x => ExpensiveFunc(x));

foreach(var x in query)
{
      Console.WriteLine(x);

Теперь результаты отображаются на экране в ожидаемом порядке — от ExpensiveFunc(0) до ExpensiveFunc(99). Упорядочивание приводит к некоторым дополнительным издержкам для каждого входного элемента, но, как правило, эти издержки не слишком высоки.

В рассмотренных случаях в PLINQ-запросе всегда применялся цикл for. В таких вариантах PLINQ настраивает асинхронные рабочие потоки (workers), которые вычисляют результаты в фоне, а цикл for ожидает готовности любого следующего результата. Однако это не единственный способ использования PLINQ-запроса. В качестве альтернативы можно выполнять запрос с помощью таких операторов, как ToArray, ToList и ToDictionary:

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel().AsOrdered()
             .Select(x => ExpensiveFunc(x));

int[] results = query.ToArray(); // The query runs here

И вновь PLINQ инициирует параллельные вызовы ExpensiveFunc, ускоряя выполнение запроса. Но на этот раз выполнение осуществляется синхронно: весь запрос обрабатывается одной строкой кода (см. комментарий в предыдущем примере).

Вместо преобразования результатов в массив можно вычислять сумму результатов, а также минимальное, максимальное и среднее значения или использовать собственную агрегацию результатов:

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Select(x => ExpensiveFunc(x));

int resultSum = query.Sum(); // The query runs here

Кроме того, можно выполнять какую-либо операцию для каждого создаваемого элемента:

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Select(x => ExpensiveFunc(x));

int resultSum = query.ForAll(
      x => Console.WriteLine(x)
);

Между первым примером в разделе с циклом for и данным примером, где используется ForAll, есть важное различие. В примере с ForAll операции выполняются в рабочих потоках PLINQ. В примере с циклом for тело цикла выполняется в потоке, который создал PLINQ-запрос.

Наконец, при написании запросов для параллельных проекций можно столкнуться с одной проблемой, о которой стоит поговорить подробнее. PLINQ обеспечивает параллельное выполнение, разделяя входную последовательность на множество разделов, которые потом обрабатывает параллельно. Этап разделения входной последовательности называют разбиением (partitioning), и выбор алгоритма разбиения сильно влияет на производительность обработки запросов.

PLINQ обычно выбирает наиболее подходящий алгоритм для разбиения входной последовательности, но есть один случай, в котором вы, вероятно, предпочтете переопределить выбор PLINQ: входная последовательность представляет собой массив (или другой тип, реализующий IList). В таком случае PLINQ по умолчанию разбивает массив статически на столько разделов, сколько имеется ядер в процессорах, установленных в компьютере. Но если издержки, связанные с каждым элементом проекции, варьируются, все "дорогостоящие" элементы в итоге могут оказаться в одном разделе.

Чтобы PLINQ использовал алгоритм разбиения с балансировкой нагрузки для массивов (или других IList-типов), можно вызывать метод Partitioner.Create с передачей true для аргумента loadBalancing:

int[] src = Enumerable.Range(0, 100).ToArray();
var query = Partitioner.Create(src, true).AsParallel()
             .Select(x => ExpensiveFunc(x));

foreach(var x in query)
{
      Console.WriteLine(x);
}

Фильтрация

Небольшая вариация шаблона проекции — фильтрация. В этом случае вместо функции проекции, вычисляющей результат для каждого входного элемента, используется функция фильтрации, которая решает, надо ли включать конкретный элемент в результат.

Для максимальной производительности функция фильтрации должна быть должна быть достаточно ресурсоемкой при вычислении. В некоторых случаях фильтрация дает выигрыш даже при использовании функции, не требующей особых ресурсов — особенно если функция фильтрации отбрасывает большую часть входных элементов. В следующем примере PLINQ выводит числа в диапазоне 0-99, для которых ExpensiveFilter возвращает true:

int[] src = Enumerable.Range(0, 100).ToArray();
var query = src.AsParallel()
             .Where(x => ExpensiveFilter(x));

foreach(var x in query)
{
      Console.WriteLine(x);
}

В первом примере проекции результаты выводились в неупорядоченном виде. И здесь то же самое: для упорядочения вывода просто ставьте AsOrdered после AsParallel. По сути, все остальные операции, о которых я говорил применительно к проекция, в полной мере относятся и фильтрации. То есть результаты обработки запроса можно получать с помощью foreach, ToArray/ToList/ToDictionary, агрегации или ForAll. Кроме того, вы, вероятно, захотите переопределить схему разбиения по умолчанию, если входные данные представлены массивом, так как статическое разбиение в этом случае может привести к несбалансированному распределению нагрузки. (Все сказанное в целом применимо и к другим шаблонам, которые будут рассмотрены в оставшейся части статьи.)

Независимые операции

В шаблонах проекции и фильтрации "дорогостоящая" часть вычислений преобразует входную последовательность в выходную. Более простой шаблон — операция с высокими издержками, которую нужно выполнять над каждым элементом последовательности. Операции не требуется возвращать какое-либо значение; она просто выполняет некое дорогостоящее с точки зрения вычислительных ресурсов и безопасное в многопоточной среде действие:

int[] src = Enumerable.Range(0, 100).ToArray();
src.AsParallel()
.ForAll(
     x => { ExpensiveAction(x); }
);

Для параллельной обработки PLINQ выполняет ExpensiveAction в рабочих потоках. Это подразумевает, что ExpensiveAction должна быть дорогостоящей с точки зрения использования вычислительных ресурсов и, что еще важнее, безопасной в многопоточной среде. Так как ExpensiveAction вызывается в разных потоках, ни о каком порядке между вызовами не может быть и речи.

Однако этот шаблон настолько прост, что вам даже не нужен PLINQ и вы можете использовать метод Parallel.ForEach, доступный в Task Parallel Library (в .NET Framework 4). Тем не менее ForAll из PLINQ зачастую удобнее, когда требуются какие-то другие операторы PLINQ:

int[] src = Enumerable.Range(0, 100).ToArray();
src.AsParallel()
.Where(x => x%2 == 0)
.ForAll(
     x => { ExpensiveAction(x); }
);

Сжатие последовательности

Сжатие последовательности (sequence zipping) — шаблон, аналогичный проекции с тем исключением, что здесь используются две входные последовательности, а не одна. Вместо функции, преобразующей один входной элемент в один выходной, вы получаете функцию, которая преобразует по одному элементу из каждой входной последовательности в единый выходной элемент.

Этот шаблон поддерживается оператором Zip (LINQ-to-Objects), введенным в .NET 4.0. Вы можете использовать его и в PLINQ. Для максимальной производительности входные последовательности должны быть массивами или наборами, реализующими IList:

int[] arr1 = ..., arr2 = ...;
int[] results =
      arr1.AsParallel().AsOrdered()
      .Zip(
           arr2.AsParallel().AsOrdered(),
           (arr1Elem, arr2Elem) => ExpensiveFunc(arr1Elem, arr2Elem))
      .ToArray();

Вероятно, вы заметили: если входные последовательности находятся в массивах, оператор Zip может быть легко заменен проекцией:

int[] arr1 = ..., arr2 = ...;
int length = Math.Min(arr1.Length, arr2.Length);
int[] results = 
      ParallelEnumerable.Range(0, length).AsOrdered()
      .Select(index => ExpensiveFunc(arr1[index], arr2[index]))
      .ToArray();

Независимо от выбранной реализации обработку этого типа рабочей нагрузки можно легко ускорить с помощью PLINQ.

Редукция

Редукция (reduction), также известная как агрегация (aggregation) или свертывание (folding), — операция, при которой элементы последовательности комбинируются до тех пор, пока не остается единственный результат. Наиболее распространенные виды редукции — суммирование, получение среднего, минимального и максимального значений. Поскольку такие операции используются часто, они напрямую поддерживаются операторами PLINQ (Sum, Average, Min и Max). Однако эти операторы выполняют слишком мало работы в расчете на элемент, поэтому в PLINQ они обычно применяются в запросах, содержащих что-то, что требует интенсивных вычислений, например проекцию или фильтр. Возможное исключение из этого правила — операция нахождения минимального или максимального значения с "дорогостоящей" функцией сравнения. Если же функция редукции требует интенсивных вычислений, то она сама по себе может быть параллельной рабочей нагрузкой.

Существует несколько перегруженных версий Aggregate, но я не буду обсуждать их в этой статье из-за нехватки места. (Более подробное обсуждение редукций PLINQ см. по ссылкам blogs.msdn.com/pfxteam/archive/2008/01/22/7211660.aspx и blogs.msdn.com/pfxteam/archive/2008/06/05/8576194.aspx.) Сигнатура наиболее универсальной перегруженной версии Aggregate выглядит так:

public static TResult Aggregate<TSource, TAccumulate, TResult>(
    this ParallelQuery<TSource> source,
    Func<TAccumulate> seedFactory,
    Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
    Func<TAccumulate, TAccumulate, TAccumulate> combineAccumulatorsFunc,
    Func<TAccumulate, TResult> resultSelector)

И вот как вы могли бы использовать ее для реализации параллельного оператора Average:

public static double Average(this IEnumerable<int> source)
{
      return source.Aggregate(
             () => new double[2],
             (acc, elem) => {
                   acc[0] += elem; acc[1]++; return acc;
             },
             (acc1, acc2) => {
                   acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1;
             },
             acc => acc[0] / acc[1]);
}

Каждый рабочий поток PLINQ инициализирует свой сумматор, используя seedFactory, поэтому он получит свой массив из двух значений двойной точности (типа double). Затем рабочий поток обрабатывает часть входной последовательности, обновляет свой сумматор каждым элементом с помощью updateAccumulatorFunc. Далее сумматоры разных рабочих потоков объединяются функцией combineAccumulatorsFunc, а полученный в итоге сумматор преобразуется в возвращаемое значение, для чего служит resultSelector.

Хотя этот пример параллельного оператора Average удобен для объяснения семантики оператора Aggregate, его объемы работы на каждый элемент (два сложения) скорее всего окажутся слишком малы для распараллеливания. Стоит отметить, что ситуации с более ресурсоемкой функцией редукции в реальной жизни встречаются достаточно часто.

Сортировка

LINQ поддерживает сортировку через оператор OrderBy, а PLINQ, естественно, реализует сортировку с применением параллельного алгоритма. Обычно алгоритм сортировки довольно прилично ускоряется в сравнении с сортировкой LINQ-to-Objects (до двух-трех раз на четырехъядерном процессоре). Но помните, что модель сортировки LINQ-to-Objects вводит весьма громоздкий интерфейс к OrderBy. Селектор ключа обязателен и передается как делегат, а не дерево выражения, поэтому PLINQ не игнорирует селектор ключа, даже если он является функцией идентификации, x => x. Соответственно PLINQ манипулирует парами manipulates "ключ-значение", даже когда ключи идентичны значениям. Кроме того, PLINQ из-за функциональной природы LINQ не может сортировать последовательность по месту даже в случае массива, так как это привело бы к уничтожению исходной последовательности.

Так что, если вы используете оператор LINQ-to-Objects OrderBy, вы сможете ускорить свой запрос с помощью PLINQ. Однако, если вам нужно сортировать лишь массив целых чисел, то сортировка по месту вроде Array.Sort будет скорее всего быстрее, чем при использовании OrderBy в PLINQ. Если вы хотите ускорить сортировку по месту, вам придется реализовать собственный алгоритм параллельной сортировки на основе библиотеки Task Parallel Library.

Преобразование "один ко многим"

Проекция преобразует каждый входной элемент в один выходной элемент. Используя фильтр, вы можете преобразовывать все входные элементы в один выходной элемент или вообще ничего не получить на выходе. Но как быть, если нужна возможность генерировать произвольное количество выходных элементов из каждого входного? PLINQ поддерживает и такой случай — через оператор SelectMany. Вот пример:

IEnumerable<int> inputSeq = ...
int[] results = 
      inputSeq.AsParallel()
      .SelectMany(input => ComputeResults(input))
      .ToArray();

Этот код вызывает ComputeResults для каждого элемента во входной последовательности. Каждый вызов ComputeResults возвращает IEnumerable-тип (например, массив), содержащий ноль, один или несколько результатов. Вывод запроса содержит все результаты, возвращенные вызовами ComputeResults.

Поскольку этот шаблон менее понятен на интуитивном уровне, чем остальные шаблоны, рассматриваемые в статье, давайте обсудим конкретный пример его применения. Шаблон "один ко многим" мог бы реализовать алгоритм поиска для такой известной задачи, как N-Queens (поиск всех позиций королев на шахматной доске, в которых любая из двух королев не могла бы атаковать другую). Входной будет последовательность шахматных досок с несколькими уже расставленными королевами. Тогда вы могли бы использовать запрос с оператором SelectMany для поиска всех решений задачи N-Queens, возможных, если начинать с любого из начальных состояний во входной последовательности:

IEnumerable<ChessboardState> initStates = GenerateInitialStates();
ChessboardState[] solutions = 
      initStates.AsParallel()
      .SelectMany(board => board.FindAllSolutions())
      .ToArray();

Более сложные запросы

Все шаблоны PLINQ, обсуждавшиеся в этой статье, представляют собой короткие фрагменты запроса, как правило, с одним или двумя операторами. Конечно, в одном запросе можно совместно использовать разные шаблоны. В следующем запросе применяется комбинация фильтра, проекции и независимых операций:

int[] src = Enumerable.Range(0, 100).ToArray();
src.AsParallel()
            .Where(x => ExpensiveFilter(x))
            .Select(x => ExpensiveFunc(x));
            .ForAll(x => { ExpensiveAction(x); });

PLINQ будет эффективно распараллеливать этот запрос независимо от того, все ли они требуют одинаково больших издержек с точки зрения вычислительных ресурсов или же доминирует только что-то одно из них. PLINQ — так же, как и LINQ-to-Objects, — не материализует набор результатов после каждого оператора. Поэтому PLINQ не станет исполнять Where для всей последовательности, сохранит отфильтрованную последовательность, выполнит оператор Select, а затем ForAll. Операции совмещаются в максимальной степени, и в простых запросах рабочий поток "проведет" элемент ввода через весь запрос и лишь затем перейдет к следующему.

Параллельные шаблоны можно комбинировать не только друг с другом, но и с любыми LINQ-операторами. В PLINQ стараются сохранить паритет с LINQ-to-Objects, так что доступны все LINQ-операторы. Но, хотя PLINQ будет выполнять практически любой запрос LINQ-to-Objects, он не обязательно будет выполняться быстрее. Некоторые операторы и формы запросов не слишком хорошо поддаются распараллеливанию, если вообще поддаются. В идеале, наиболее ресурсоемкая часть запроса должна быть представлена в форме одного из параллельных шаблонов, о которых говорилось в этой статье.

Следует также понимать, что в некоторых сложных запросах PLINQ может решить, что части запроса нужно выполнять последовательно, а не пытаться распараллеливать их с помощью крайне "дорогостоящих" алгоритмов. Вполне вероятно, что вам это не понравится, особенно если в вашем запросе содержится "дорогостоящий" делегат, на который все равно будет потрачена основная часть времени выполнения. В следующем примере кода PLINQ предпочтет последовательное выполнение делегатов ExpensiveFunc():

int[] src = Enumerable.Range(0, 100).ToArray();
int[] res = src.AsParallel()
             .Select(x => ExpensiveFunc(x))
             .TakeWhile(x => x % 2 == 0)
             .ToArray();

Эту проблему можно устранить двумя способами. Вы можете подсказать PLINQ, что запрос следует выполнять параллельно, даже если для этого нужно применять потенциально "дорогостоящие" алгоритмы:

int[] src = Enumerable.Range(0, 100).ToArray();
int[] res = src.AsParallel()
             .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
             .Select(x => ExpensiveFunc(x))
             .TakeWhile(x => x % 2 == 0)
             .ToArray();

Или разложить запрос так, чтобы PLINQ выполнял только "дорогостоящую" часть запроса, а остальное — LINQ-to-Objects. Для привязки последовательных операторов к LINQ-to-Objects можно использовать оператор AsSequential в PLINQ-запросе:

int[] src = Enumerable.Range(0, 100).ToArray();
int[] res = src.AsParallel()
             .Select(x => ExpensiveFunc(x))
             .AsSequential()
             .TakeWhile(x => x % 2 == 0)
             .ToArray();

Заключение

Написание приложений для компьютеров с многоядерными процессорами может оказаться делом очень трудным, но отнюдь не всегда. PLINQ — полезное средство в вашем инструментарии, позволяющее при необходимости ускорять параллельные вычисления над данными. Запомните соответствующие шаблоны и используйте их в своих программах должным образом.

Игорь Островский (Igor Ostrovsky) — инженер по разработке ПО в группе Microsoft Parallel Computing Platform. Является основным разработчиком PLINQ.

Выражаю благодарность за рецензирование данной статьи эксперту Майклу Блоуму (Michael Blome).

Вопросы и комментарии (на английском языке) присылайте по адресу mmsync@microsoft.com.