# Parallel Aggregation

Chapter 2 shows how to use parallel techniques that apply the same independent operation to many input values. However, not all parallel loops have loop bodies that execute independently. For example, a loop that calculates a sum does not have independent steps. All the steps accumulate their results in a single variable that represents the sum calculated up to that point. This accumulated value is an aggregation.

Nonetheless, there is a way for an aggregation operation to use a parallel loop. This is the Parallel Aggregation pattern.

Although calculating a sum is an example of aggregation, the pattern is more general than that. It works for any binary operation that is associative. The .NET implementation of the Parallel Aggregation pattern also expects the operations to be commutative.

The Parallel Aggregation pattern uses unshared, local variables that are merged at the end of the computation to give the final result. Using unshared, local variables for partial, locally calculated results is how the steps of a loop can become independent of each other. Parallel aggregation demonstrates the principle that it's usually better to make changes to your algorithm than to add synchronization primitives to an existing algorithm. For more information about the algorithmic aspects of this pattern, see the section, "Design Notes," later in this chapter.

The Parallel Aggregation pattern is also known as the Parallel Reduction pattern because it combines multiple inputs into a single output.

# The Basics

The most familiar application of aggregation is calculating a sum. Here's a sequential version.

``````double[] sequence = ...
double sum = 0.0d;
for (int i = 0; i < sequence.Length; i++)
{
sum += Normalize(sequence[i]);
}
return sum;
``````

This is a typical sequential for loop. In this example and the ones that follow, Normalize is a user-provided method that transforms the input values in some way, such as converting them to an appropriate scale. The result is the sum of the transformed values.

The Microsoft® .NET Framework Language Integrated Query (LINQ) provides a very simple way to express this kind of aggregation. Languages such as C#, F#, and Microsoft Visual Basic® development system provide special syntax for LINQ. The following LINQ expression calculates a sum.

``````  double[] sequence = ...
return (from x in sequence select Normalize(x)).Sum();
``````

The LINQ expression is a sequential operation whose performance is comparable to the sequential for loop shown in previous example.

To convert a LINQ-to-Objects expression into a parallel query is extremely easy. The following code gives an example.

``````  double[] sequence = ...
return (from x in sequence.AsParallel()
select Normalize(x)).Sum();
``````

If you invoke the AsParallel extension method, you're instructing the compiler to bind to PLINQ instead of to LINQ. The program will use the parallel versions of all further query operations within the expression. The Sum extension method executes the query and (behind the scenes and in parallel) calculates the sum of the selected, transformed values. For an introduction to PLINQ, see Chapter 2, "Parallel Loops."

This example uses addition as the underlying aggregation operator, but there are many others. For example, PLINQ has built-in standard query operators that count the number of elements and calculate the average, maximum, or minimum. PLINQ also has operators that create and combine sets (duplicate elimination, union, intersection, and difference), transform sequences (concatenation, filtering, and partitioning) and group (projection). These standard query operators are sufficient for many types of aggregation tasks, and with PLINQ they all can efficiently use the hardware resources of a multicore computer.

If PLINQ's standard query operators aren't what you need, you can also use the Aggregate extension method to define your own aggregation operators. Here's an example.

``````  double[] sequence = ...
return (from x in sequence.AsParallel() select Normalize(x))
.Aggregate(1.0d, (y1, y2) => y1 * y2);
``````

This code shows one of the overloaded versions of the Aggregate extension method. It applies a user-provided transformation to each element of the input sequence and then returns the mathematical product of the transformed values.

PLINQ is usually the recommended approach whenever you need to apply the parallel aggregation pattern to .NET applications. Its declarative nature makes it less prone to error than other approaches, and its performance on multicore computers is competitive with them. Implementing parallel aggregation with PLINQ doesn't require adding locks in your code. Instead, all the synchronization occurs internally, within PLINQ.

If PLINQ doesn't meet your needs or if you prefer a less declarative style of coding, you can also use Parallel.For or Parallel.ForEach to implement the parallel aggregation pattern. The Parallel.For and Parallel.ForEach methods require more complex code than PLINQ. For example, the Parallel.ForEach method requires your code to include synchronization primitives to implement parallel aggregation. For examples and more information, see the section, "Using Parallel Loops for Aggregation," under "Variations" later in this chapter.

# An Example

Aggregation doesn't only apply to numeric values. It's a more general pattern that arises in many application contexts. The following example shows how a variation of parallel aggregation known as map/reduce is used to aggregate nonscalar data types.

The example is of a social network service, where subscribers can designate other subscribers as friends. The site recommends new friends to each subscriber by identifying other subscribers who are friends of friends. To limit the number of recommendations, the service only recommends the candidates who have the largest number of mutual friends. Candidates can be identified in independent parallel operations, and then candidates are ranked and selected in an aggregation operation.

Here's how the data structures and algorithms that are used by the recommendation service work. Subscribers are identified by integer ID numbers. A subscriber's friends are represented by the collection of their IDs. The collection is a set because each element (a friend's ID number) occurs only once and the order of the elements doesn't matter. For example, the subscriber whose ID is 0 has two friends whose IDs are 1 and 2. This can be written as:

0 -> { 1, 2 }

The social network repository stores an entry like this for every subscriber. In order to recommend friends to a subscriber, the recommendation service must consider a subscriber's entry, as well as the entries for all of that subscriber's friends. For example, to recommend friends for subscriber 0, the pertinent entries in the repository are:

0 -> { 1, 2 }

1 -> { 0, 2, 3 }

2 -> { 0, 1, 3, 4 }

You can see that the service should recommend subscribers 3 and 4 to subscriber 0 because they appear among the friends of subscribers 1 and 2, who are already friends of 0. In addition, the recommendation service should rank subscriber 3 higher than 4, because 3 is a friend of both of 0's friends, while 4 is a friend of only one of them. You can write the results like this:

{ 3(2), 4(1) }

This means that subscriber 3 shares two mutual friends with subscriber 0, and subscriber 4 shares one. This is an example of a type of collection known as a multiset. In a multiset, each element (3 and 4 in this example) is associated with a multiplicity, which is the number of times it occurs in the collection (2 and 1, respectively). So a multiset is a collection where each element only occurs once, yet it can represent duplicates (or larger multiplicities). The order of elements in a multiset doesn't matter.

The recommendation service uses map/reduce, which has several phases. In the first phase, which is the map phase, the service creates a collection of candidates that can contain duplicates — the same candidate's ID can occur several times in the list (once for each mutual friend). In the second phase, which is the reduce phase, the service aggregates this collection to create a multiset where each candidate's ID occurs only once, but is associated with its multiplicity in the first collection (the number of mutual friends). There is also a postprocessing phase where the service ranks candidates by sorting them according to their multiplicity and selects only the candidates with the largest multiplicities.

An important feature of map/reduce is that the result of the map stage is a collection of items that is compatible with the reduce stage. The reduce stage uses multisets; therefore, the map stage does not produce only a list of candidate IDs; instead, it produces a collection of multisets, where each multiset contains only one candidate with a multiplicity of one. In this example, the output of the map stage is a collection of two multisets. The subscribers are the numbers 3 and 4.

{ 3(1) }, { 3(1) , 4(1) }

Here, the first multiset contains friends of subscriber 1, and the second multiset contains friends of subscriber 2.

Another important feature of map/reduce is that the aggregation in the reduce phase is performed by applying a binary operation to pairs of elements from the collection that is produced by the map phase. In this example, the operation is a multiset union, which combines two multisets by collecting the elements and adding their multiplicities. The result of applying the multiset union operation to the two multisets in the preceding collection is:

{ 3(2), 4(1) }

Now that there is only one multiset, the reduce phase is complete. By repeatedly applying the multiset union operation, the reduce phase can aggregate any collection of multisets, no matter how large, into one multiset.

Here is the code for the sequential version.

``````public IDMultisetItemList PotentialFriendsSequential(
SubscriberID id,
int maxCandidates)
{
// Map
var foafsList = new List<IDMultiset>();
foreach (SubscriberID friend in subscribers[id].Friends)
{
var foafs = subscribers[friend].FriendsCopy();
foafs.RemoveWhere(foaf => foaf == id ||
subscribers[id].Friends.Contains(foaf));
}

// Reduce
IDMultiset candidates = new IDMultiset();
foreach (IDMultiset foafs in foafsList)
{
candidates = Multiset.Union(foafs, candidates);
}

// Postprocess
return Multiset.MostNumerous(candidates, maxCandidates);
}
``````

In the map phase, this code loops sequentially over the subscriber's friends and builds a collection of multisets of candidates (a foaf is a friend of a friend). In the reduce phase, the code loops sequentially over those multisets and aggregates them with the multiset union operation. If this code executes with the few subscribers in the example, the id argument is 0 and subscribers[id].Friends is { 1, 2}. When the map phase completes**,** foafsList is { 3(1) }, { 3(1) , 4(1) }, and when the reduce phase completes, candidates is { 3(2), 4(1) }.

Multiset union is commutative: the result does not depend on the order of its arguments. Multiset union is also associative: if you aggregate several multisets into one by successively forming unions in a pair-wise manner, the final result does not depend on the order of the union operations. If the aggregation function is not associative, it can't be done in parallel without potentially getting different results. If it's not commutative, the potential for parallelism is greatly reduced.

Here's how to use PLINQ to apply map/reduce to the social networking example.

``````public IDMultisetItemList PotentialFriendsPLinq(SubscriberID id,
int maxCandidates)
{
var candidates =
subscribers[id].Friends.AsParallel()
.SelectMany(friend => subscribers[friend].Friends)
.Where(foaf => foaf != id &&
!(subscribers[id].Friends.Contains(foaf)))
.GroupBy(foaf => foaf)
.Select(foafGroup => new IDMultisetItem(foafGroup.Key,
foafGroup.Count()));
return Multiset.MostNumerous(candidates, maxCandidates);
}
``````

Recall that in map/reduce, independent parallel operations (the map phase) are followed by aggregation (the reduce phase). In the map phase, the parallel operations iterate over all the friends of subscriber 0. The map phase is performed by the SelectMany method, which finds all the friends of each friend of the subscriber, and the Where method, which prevents redundant recommendations by removing the subscriber and the subscriber's own friends. The output of the map phase is a collection of candidate IDs, including duplicates. The reduce phase is performed by the GroupBy method, which collects duplicate candidate IDs into groups, and the Select method, which converts each group into a multiset item that associates the candidate ID with a multiplicity (or Count). The return statement performs the final postprocessing step that selects the candidates with the highest multiplicities.

When map/reduce is implemented with PLINQ, it need not be a line-by-line translation of the foreach version. In the PLINQ example, the output of the map stage is not a collection of multisets, but a collection with duplicates. The multiset is not formed until the reduce stage.

The online source code for this example also includes map/reduce implemented with the Parallel.ForEach method.

# Variations

This section contains some common variations of the parallel aggregation pattern.

## Using Parallel Loops for Aggregation

The Parallel.ForEach and Parallel.For methods have overloaded versions that can implement the parallel aggregation pattern. Here's an example.

``````  double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;

Parallel.ForEach(
// The values to be aggregated
sequence,

// The local initial partial result
() => 0.0d,

// The loop body
(x, loopState, partialResult) =>
{
return Normalize(x) + partialResult;
},

// The final step of each local context
(localPartialSum) =>
{
lock (lockObject)
{
sum += localPartialSum;
}
});
return sum;
``````

Parallel.ForEach partitions the input based on the desired degree of parallelism and creates parallel tasks to perform the work. Each parallel task has local state that isn't shared with other tasks. The loop body updates only the task-local state. In other words, the loop body accumulates its value first into a subtotal and not directly into the final total. When each task finishes, it adds its subtotal into the final sum. For an illustration of these steps, see Figure 1 in the section, "Design Notes," later in this chapter.

Here is the signature of the overloaded version of Parallel.ForEach that's used in this example.

``````Parallel.ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally);
``````

There are four arguments. The first argument is the data values that will be aggregated. The second argument is a delegate that returns the initial value of the aggregation. The third argument is a delegate that combines one of the input data values with a partial sum from a previous iteration.

The Task Parallel Library (TPL) implementation of Parallel.ForEach creates tasks in order to execute the parallel loop. When the loop creates multiple tasks, there will be one partial result value for each task. The number of worker tasks is determined by the parallel loop implementation, based on heuristics. Generally, there will be at least one worker task per available core, but there could be more. As the loop progresses, TPL will sometimes retire tasks and create new ones.

When the parallel loop is ready to finish, it must merge the partial results from all of its worker tasks together to produce the final result. The fourth argument to Parallel.ForEach is a delegate that performs the merge step. The delegate is invoked once for each of the worker tasks. The argument to the delegate is the partial result that was calculated by the task. This delegate locks the shared result variable and combines the partial sum with the result.

This example uses the C# lock keyword to enforce interleaved, sequential access to variables that are shared by concurrent threads. There are other synchronization techniques that could be used in this situation, but they are outside the scope of this book. Be aware that locking is cooperative; that is, unless all threads that access the shared variable use locking consistently and correctly, serial access to the variable is not guaranteed.

The syntax for locking in C# is lock ( object ) { body }. The object uniquely identifies the lock. All cooperating threads must use the same synchronizing object, which must be a reference type such as Object and not a value type such as int or double. When you use lock with Parallel.For or Parallel.ForEach you should create a dummy object and set it as the value of a captured local variable dedicated to this purpose. (A captured variable is a local variable from the enclosing scope that is referenced in the body of a lambda expression.) The lock's body is the region of code that will be protected by the lock. The body should take only a small amount of execution time. Which shared variables are protected by the lock object varies by application and is something that all programmers whose code accesses those variables must be careful not to contradict. In this example, the lock object ensures serial access to the sum variable.

Note

You should document the variables that are protected by a lock with a comment in the code. It's easy to make a mistake.

For comparison, here's the PLINQ version of the example used in this variation.

``````  double[] sequence = ...
return sequence.AsParallel().Select(Normalize).Sum();
``````

## Using a Range Partitioner for Aggregation

When you have a loop body with a very small amount of work to do, and there are many iterations to perform, it's possible that the overhead of Parallel.ForEach is large compared to the cost of executing the loop body.

In this case, it's sometimes more efficient to use a Partitioner object for the loop. A Partitioner object allows you to embed a sequential for loop inside of your Parallel.ForEach loop and reduce the number of iterations of the Parallel.ForEach loop. Generally, you should profile the application in order to decide whether to use a Partitioner object.

Here's an example.

``````  double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;
var rangePartitioner = Partitioner.Create(0, sequence.Length);

Parallel.ForEach(
// The input intervals
rangePartitioner,

// The local initial partial result
() => 0.0,

// The loop body for each interval
(range, loopState, initialValue) =>
{
double partialSum = initialValue;
for (int i = range.Item1; i < range.Item2; i++)
{
partialSum += Normalize(sequence[i]);
}
return partialSum;
},

// The final step of each local context
(localPartialSum) =>
{
lock (lockObject)
{
sum += localPartialSum;
}
});
return sum;
``````

This code is very similar to the example in the section, "Using Parallel Loops for Aggregation," earlier in this chapter. The main difference is that the Parallel.ForEach loop uses a sequence of index intervals, which are generated by the Partitioner object, as its input instead of individual values. This can avoid some of the overhead involved in invoking delegate methods. You'll notice a benefit only when the amount of work in each step is small and there are many steps.

Here's the signature of the overloaded version of the Parallel.ForEach method that was used.

``````Parallel.ForEach<TSource, TLocal>(
Partitioner<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally);
``````

## Using PLINQ Aggregation with Range Selection

The PLINQ Aggregate extension method includes an overloaded version that allows a very general application of the parallel aggregation pattern. Here's an example from an application that does financial simulation. The method performs repeated simulation trials and aggregates results into a histogram. There are two dependencies that must be handled by this code. They are the accumulation of partial results into the result histogram and the use of instances of the Random class. (An instance of Random cannot be shared across multiple threads.)

``````  int[] histogram = MakeEmptyHistogram();

return ParallelEnumerable.Range(0, count).Aggregate(

// 1- Create an empty local accumulator object
//    that includes all task-local state.
() => new Tuple<int[], Random>(
MakeEmptyHistogram(),
new Random(SampleUtilities.MakeRandomSeed())),

// 2- Run the simulation, adding result to local accumulator.
(localAccumulator, i) =>
{
// With each iteration, get the next random value.
var sample = localAccumulator.Item2.NextDouble();

if (sample > 0.0 && sample < 1.0)
{
// Perform a simulation trial for the sample value.
var simulationResult =
DoSimulation(sample, mean, stdDev);

// Add result to the histogram of the local accumulator.
int histogramBucket =
(int)Math.Floor(simulationResult / BucketSize);
if (0 <= histogramBucket && histogramBucket < TableSize)
localAccumulator.Item1[histogramBucket] += 1;
}
return localAccumulator;
},

// 3- Combine local results pair-wise.
(localAccumulator1, localAccumulator2) =>
{
return new Tuple<int[], Random>(
CombineHistograms(localAccumulator1.Item1,
localAccumulator2.Item1),
null);
},

// 4- Extract answer from final combination.
finalAccumulator => finalAccumulator.Item1
); // Aggregate
``````

Note

For information about the statistical limitations of the Random class when used in parallel programs, see Chapter 3, "Parallel Tasks."

The data source in this example is a parallel query that's created with the Range static method of the ParallelEnumerable class. Here's the signature of the overloaded version of the Aggregate extension method that was used in this example.

``````Aggregate<TSource, TAccumulate, TResult>(
this ParallelQuery<TSource> source,
Func<TAccumulate> seedFactory,
Func<TAccumulate, TSource, TAccumulate> updateAccumulatorFunc,
Func<TAccumulate, TAccumulate, TAccumulate>
combineAccumulatorsFunc,
Func<TAccumulate, TResult> resultSelector);
``````

There are four arguments. Each of the arguments is a delegate method.

The first argument is a delegate that establishes the local state of each worker task that the query creates. This is called once, at the beginning of the task. In this example, the delegate returns a tuple (unnamed record) instance that contains two fields. The first field is an empty histogram. This will accumulate the local partial results of the simulation in this task. The second field is an instance of the Random class. It's part of the local state to ensure that the simulation does not violate the requirements of the Random class by sharing instances across threads. Note that you can store virtually any kind of local state in the object you create.

The second argument is the loop body. This delegate is invoked once per data element in the partition. In this example, the loop body creates a new random sample and performs the simulation experiment. Then it classifies the result of the simulation into buckets used by the histogram and increments the appropriate bucket of the task's local histogram.

The third argument is invoked for pair-wise combinations of local partial results. The delegate merges the input histograms (it adds their corresponding bucket values and returns a new histogram with the sum). It returns a new tuple. The null argument reflects the fact that the random number generator is no longer needed. The combination delegate is invoked as many times as necessary to consolidate all the local partial results.

The fourth argument selects the result from the final, merged local state object.

This variation can be adapted to many situations that use the Parallel Aggregation pattern. Note that this implementation doesn't require locks.

# Design Notes

If you compare the sequential and parallel versions of the aggregation pattern, you see that the design of the parallel version includes an additional step in the algorithm that merges partial results. Figure 1, which illustrates the Parallel.ForEach and Parallel.For methods, shows this. Figure 1

Aggregation using Parallel For and ForEach

Figure 1 shows that instead of placing the accumulation in a single, shared result, the parallel loop uses unshared local storage for partial results (these are named subtotals in Figure 1). Each task processes a single partition of the input values. The number of partitions depends on the degree of parallelism that's needed to efficiently use the computer's available cores. After a task finishes accumulating the values in its assigned partition, it merges its local result into the final, global result. The final result is shared by all tasks. Locks are required to ensure that updates are consistent.

The reason that this approach is fast is because there are very few locks. In normal cases, the number of elements to be processed is many times larger than the number of tasks and partitions. The cost of serializing locks can be amortized over many individual accumulation operations.

The algorithm used by PLINQ differs slightly from what's shown in Figure 1. Aggregation in PLINQ does not require the developer to use locks. Instead, the final merge step is expressed as a binary operator that combines any two partial result values (that is, two of the subtotals) into another partial result. Repeating this process on subsequent pairs of partial results eventually converges on a final result. One of the advantages of the PLINQ approach is that it requires less synchronization, so it's more scalable. The binary combinations do not require the system to lock the final result; they can be performed in parallel. (This is possible because the binary combination operation is associative, so the order of these operations doesn't affect the result.)

This discussion shows that the parallel aggregation pattern is a good example of why changes to your algorithm are often needed when moving from a sequential to parallel approach.

To make this point clear, here's an example of what parallel aggregation would look like if you simply added locks to the existing sequential algorithm. You just need to convert sequential for to Parallel.For and add one lock statement.

``````  // Do not copy this code. This version will run much slower
// than the sequential version. It's included here to
// illustrate what not to do.
double[] sequence = ...
object lockObject = new object();
double sum = 0.0d;

// BUG – don't do this
Parallel.For(0, sequence.Length, i =>
{
// BUG – don't do this
lock (lockObject)
{
sum += sequence[i];
}
});
return sum;
``````

If you forget to add the lock statement, this code fails to calculate the correct sum on a multicore computer. Adding the lock statement makes this code example correct with respect to serialization. If you run this code, it produces the expected sum. However, it fails completely as an optimization. This code is many times slower than the sequential version it attempted to optimize! The reason for the poor performance is the cost of synchronization.

In contrast, the examples of the parallel aggregation pattern that you have seen elsewhere in this chapter will run much faster on multicore computers than the sequential equivalent, and their performance also improves in approximate proportion to the number of cores.

It might at first seem counterintuitive that adding additional steps can make an algorithm go faster, but it's true. If you introduce extra work and that work has the effect of preventing data dependencies between parallel tasks, you often benefit in terms of performance.

# Related Patterns

There's a group of patterns related to summarizing data in a collection. Aggregation (also known as Reduce) is one of them. The others include Scan and Pack. The Scan pattern occurs when each iteration of a loop depends on data computed in the previous iteration. The Pack pattern uses a parallel loop to select elements to retain or discard. The result of a pack operation is a subset of the original input. These patterns can be combined, as in the Fold and Scan pattern. For more information about these related patterns, see the section, "Further Reading," at the end of this chapter.

# Exercises

1. Consider the small social network example (with subscribers 0, 1, 2). What constraints exist in the data? How are these constraints observed in the sample code?
2. In the social network example, there's a separate postprocessing step where the multiset of candidates, which is an unordered collection, is transformed into a sequence that is sorted by the number of mutual friends, and then the top N candidates are selected. Could some or all of this postprocessing be incorporated into the reduction step? Provide answers for both the PLINQ and Parallel.ForEach versions.
3. In the standard reference on map/reduce (see the section, "Further Reading"), the map phase executes a map function that takes an input pair and produces a set of intermediate key/value pairs. All pairs for the same intermediate key are passed to the reduce phase. That reduce phase executes a reduce function that merges all the values for the same intermediate key to a possibly smaller set of values. The signatures of these functions can be expressed as: map (k1,v1) -> list(k2,v2) and reduce (k2,list(v2)) -> list(v2). In the social network example, what are the types of k1, v1, k2, and v2? What are the map and reduce functions? Provide answers for both the PLINQ and Parallel.ForEach versions.

MSDN® describes the standard query operators of LINQ and PLINQ. A thorough treatment of synchronization techniques appears in the book by Duffy. The related patterns of Stencil, Scan, and Pack are discussed by McCool. The standard reference on map/reduce is the paper by Dean and Ghemawat. Other cases of algorithms that use parallel loops with some dependencies between steps are described by Toub. These include fold-and-scan and dynamic programming.

J. Dean and S. Ghemawat. "MapReduce: Simplified Data Processing on Large Clusters." In OSDI '04: Sixth Symposium on Operating System Design and Implementation, 137–150, 2004.

J. Duffy. Concurrent Programming on Windows, Addison-Wesley, 2008.

M. McCool. "Structured Patterns: An Overview." December 2009.
http://www.ddj.com/go-parallel/article/showArticle.jhtml?articleID=223101515

S. Toub. "Patterns of Parallel Programming: Understanding and Applying Parallel Patterns with the .NET Framework 4." 2009.