Benutzerdefinierte Partitionierer für PLINQ und TPL

Einer der wesentlichen Schritte, um einen Vorgang für eine Datenquelle zu parallelisieren, ist das Partitionieren der Quelle in mehrere Abschnitte, auf die mehrere Threads gleichzeitig zugreifen können. PLINQ und die Task Parallel Library (TPL) stellen standardmäßige Partitionierer bereit, die transparent arbeiten, wenn Sie eine parallele Abfrage oder ForEach-Schleife schreiben. Für erweiterte Szenarien können Sie Ihren eigenen Partitionierer einbeziehen.

Arten der Partitionierung

Es gibt viele Möglichkeiten, eine Datenquelle zu partitionieren. Bei den effizientesten Ansätzen wird die Quelle nicht physisch in mehrere Untersequenzen unterteilt, sondern mehrere Threads kooperieren beim Verarbeiten der ursprünglichen Quellsequenz. Für Arrays und andere indizierte Quellen wie IList-Sammlungen, bei denen die Länge im Voraus bekannt ist, ist die Bereichspartitionierung die einfachste Art der Partitionierung. Jeder Thread empfängt eindeutige Indizes für Anfang und Ende, sodass er seinen Bereich der Quelle verarbeiten kann, ohne einen anderen Thread zu überschreiben oder selbst überschrieben zu werden. Der einzige Mehraufwand bei der Bereichspartitionierung ist die ursprüngliche Erstellung von Bereichen; danach ist keine zusätzliche Synchronisierung erforderlich. Aus diesem Grund liefert sie gute Leistung, solange die Arbeitsauslastung gleichmäßig verteilt wird. Ein Nachteil der Bereichspartitionierung ist, dass ein Thread, der seine Arbeit frühzeitig beendet, den anderen Threads keine Arbeit abnehmen kann.

Für verknüpfte Listen oder andere Sammlungen, deren Länge nicht bekannt ist, können Sie die Blockpartitionierung verwenden. Bei der Blockpartitionierung nutzt jeder Thread oder jede Aufgabe in einer parallelen Schleife oder Abfrage eine Anzahl von Quellelementen in einem Block, verarbeitet sie und kehrt dann zurück, um zusätzliche Elemente abzurufen. Mit dem Partitionierer wird sichergestellt, dass alle Elemente verteilt werden und keine Duplikate vorhanden sind. Ein Block kann eine beliebige Größe haben. Der in Gewusst wie: Implementieren von dynamischen Partitionen gezeigte Partitionierer erstellt z.B. Blöcke, die nur ein Element enthalten. Solange die Blöcke nicht zu groß sind, bewirkt diese Art der Partitionierung auch grundsätzlich einen Lastenausgleich, da die Zuweisung von Elementen zu Threads nicht vorgegeben ist. Allerdings macht sich der Synchronisierungsmehraufwand für den Partitionierer immer dann bemerkbar, wenn der Thread einen anderen Block abrufen muss. Das Ausmaß der Synchronisierung ist in diesen Fällen umgekehrt proportional zur Größe der Blöcke.

Im Allgemeinen ist die Bereichspartitionierung nur schneller, wenn die Ausführungszeit des Delegaten kurz bis mäßig ist, die Quelle über eine große Anzahl von Elementen verfügt und die gesamte Arbeit in jeder Partition ungefähr gleich ist. Die Blockpartitionierung ist daher im Allgemeinen in den meisten Fällen schneller. Bei Datenquellen mit einer kleinen Anzahl von Elementen oder längeren Ausführungszeiten für den Delegaten sind Leistung des Blocks und Bereichspartitionierung ungefähr gleich.

Die TPL-Partitionierer unterstützen auch eine dynamische Anzahl von Partitionen. Dies bedeutet, dass sie Partitionen während des Betriebs erstellen können, z.B. wenn die ForEach-Schleife eine neue Aufgabe erzeugt. Dieses Feature ermöglicht dem Partitionierer, zusammen mit der Schleife selbst zu skalieren. Dynamische Partitionierer bewirken auch grundsätzlich einen Lastenausgleich. Wenn Sie einen benutzerdefinierten Partitionierer erstellen, müssen Sie die Nutzung der dynamischen Partitionierung von einer ForEach-Schleife aus unterstützen.

Konfigurieren von Lastenausgleichspartitionierern für PLINQ

Mit einigen Überladungen der Partitioner.Create-Methode können Sie einen Partitionierer für ein Array oder eine IList-Datenquelle erstellen und festlegen, ob er versuchen soll, die Arbeitsauslastung zwischen den Threads auszugleichen. Wenn der Partitionierer für den Lastenausgleich konfiguriert ist, wird die Blockpartitionierung verwendet, und die Elemente werden bei Anforderung jeder Partition in kleinen Blöcken übergeben. Dieser Ansatz stellt sicher, dass alle Partitionen über zu verarbeitende Elemente verfügen, bis die gesamte Schleife oder Abfrage abgeschlossen ist. Eine weitere Überladung kann zur Lastenausgleichspartitionierung einer beliebigen IEnumerable-Quelle dienen.

Lastenausgleich erfordert im Allgemeinen, dass die Partitionen relativ häufig Elemente vom Partitionierer anfordern. Dagegen kann ein Partitionierer, der statische Partitionierung durchführt, mithilfe der Bereichs- oder Blockpartitionierung alle Elemente jedem Partitionierer gleichzeitig zuweisen. Dies erfordert weniger Mehraufwand als der Lastenausgleich, aber die Ausführung dauert möglicherweise länger, wenn ein Thread am Ende deutlich mehr Arbeit bewältigen muss als die anderen. Wenn PLINQ eine IList oder ein Array übergeben wird, verwendet PLINQ standardmäßig immer die Bereichspartitionierung ohne Lastenausgleich. Um den Lastenausgleich für PLINQ zu aktivieren, verwenden Sie die Partitioner.Create-Methode, wie im folgenden Beispiel gezeigt.

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

Ob in einem gegebenen Szenario der Lastenausgleich verwendet werden sollte, bestimmen Sie am besten, indem Sie durch Experimentieren und Messen feststellen, wie lange die Ausführung von Vorgängen unter repräsentativen Lasten und Computerkonfigurationen dauert. Die statische Partitionierung könnte z.B. möglicherweise auf einem Mehrkerncomputer mit nur wenigen Kernen für erhebliche Beschleunigung sorgen, doch bei Computern mit relativ vielen Kernen zu Verlangsamungen führen.

In der folgenden Tabelle sind die verfügbaren Optionen der Create-Methode aufgelistet. Diese Partitionierer sind nicht auf die Verwendung mit PLINQ oder Task beschränkt. Sie können auch mit jedem benutzerdefinierten parallelen Konstrukt verwendet werden.

Überladung Verwendet Lastenausgleich
Create<TSource>(IEnumerable<TSource>) Always
Create<TSource>(TSource[], Boolean) Wenn das boolesche Argument als „true“ angegeben wird
Create<TSource>(IList<TSource>, Boolean) Wenn das boolesche Argument als „true“ angegeben wird
Create(Int32, Int32) Nie
Create(Int32, Int32, Int32) Nie
Create(Int64, Int64) Nie
Create(Int64, Int64, Int64) Nie

Konfigurieren von statischen Bereichspartitionierern für Parallel.ForEach

In einer For-Schleife wird der Inhalt der Schleife der Methode als Delegat bereitgestellt. Der Aufwand für den Aufruf dieses Delegaten ist mit dem des Aufrufs einer virtuellen Methode identisch. In einigen Szenarien könnte der Inhalt einer parallelen Schleife so klein sein, dass der Aufwand des Delegatenaufrufs in jeder Schleifeniteration signifikant wird. In solchen Situationen können Sie mithilfe einer der Create-Überladungen eine IEnumerable<T>-Schnittstelle von Bereichspartitionen über die Quellelemente erstellen. Dann übergeben Sie diese Sammlung von Bereichen an eine ForEach-Methode, deren Inhalt aus einer regulären for-Schleife besteht. Dieser Ansatz hat den Vorteil, dass der Aufwand für den Delegatenaufruf nur einmal pro Bereich und nicht einmal pro Element anfällt. Das Grundmuster wird im folgenden Beispiel veranschaulicht.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

Jeder Thread in der Schleife empfängt ein eigenes Tuple<T1,T2>, das Start- und Endindexwerte im angegebenen untergeordneten Bereich enthält. Die innere for-Schleife verwendet die fromInclusive- und toExclusive-Werte für einen direkten Schleifendurchlauf über das Array oder IList.

Mit einer der Create-Überladungen können Sie Größe und Anzahl der Partitionen angeben. Diese Überladung kann in Szenarien verwendet, wo die Arbeit pro Element so gering ist, dass auch ein einziger Aufruf einer virtuellen Methode pro Element eine merkliche Auswirkung auf die Leistung hat.

Benutzerdefinierte Partitionierer

In einigen Szenarien kann es empfehlenswert oder sogar erforderlich sein, dass Sie Ihren eigenen Partitionierer implementieren. Beispielsweise könnten Sie über eine benutzerdefinierte Auflistungsklasse verfügen, die Sie basierend auf Ihren Kenntnissen der internen Struktur der Klasse effizienter partitionieren können als der Standardpartitionierer. Vielleicht möchten Sie auch Bereichspartitionen mit unterschiedlichen Größen basierend auf Ihrer Kenntnis der Dauer der Verarbeitung von Elementen an verschiedenen Positionen in der Quellsammlung erstellen.

Leiten Sie zum Erstellen eines einfachen benutzerdefinierten Partitionierers eine Klasse von System.Collections.Concurrent.Partitioner<TSource> ab, und überschreiben Sie die virtuellen Methoden, wie in der folgenden Tabelle beschrieben.

Methode Beschreibung
GetPartitions Diese Methode wird einmal vom Hauptthread aufgerufen und gibt eine IList(IEnumerator(TSource)) zurück. Jeder Arbeitsthread in der Schleife oder Abfrage kann GetEnumerator in der Liste zum Abrufen eines IEnumerator<T> über eine unterschiedliche Partition aufrufen.
SupportsDynamicPartitions Geben Sie true zurück, wenn Sie GetDynamicPartitions implementieren, andernfalls false.
GetDynamicPartitions Wenn SupportsDynamicPartitionstrue ist, kann diese Methode optional anstelle von GetPartitions aufgerufen werden.

Wenn die Ergebnisse sortierbar sein müssen, oder Sie indizierten Zugriff auf die Elemente benötigen, leiten Sie von System.Collections.Concurrent.OrderablePartitioner<TSource> ab, und überschreiben Sie dessen virtuelle Methoden wie in der folgenden Tabelle beschrieben.

Methode Beschreibung
GetPartitions Diese Methode wird einmal vom Hauptthread aufgerufen und gibt eine IList(IEnumerator(TSource)) zurück. Jeder Arbeitsthread in der Schleife oder Abfrage kann GetEnumerator in der Liste zum Abrufen eines IEnumerator<T> über eine unterschiedliche Partition aufrufen.
SupportsDynamicPartitions Geben Sie true zurück, wenn Sie GetDynamicPartitions implementieren; andernfalls „false“.
GetDynamicPartitions In der Regel wird hiermit GetOrderableDynamicPartitions aufgerufen.
GetOrderableDynamicPartitions Wenn SupportsDynamicPartitionstrue ist, kann diese Methode optional anstelle von GetPartitions aufgerufen werden.

Die folgende Tabelle enthält weitere Details zur Implementierung der OrderablePartitioner<TSource>-Klasse durch die drei Arten von Lastenausgleichspartitionierern.

Methode/Eigenschaft IList/Array ohne Lastenausgleich IList/Array mit Lastenausgleich IEnumerable
GetOrderablePartitions Verwendet Bereichspartitionierung Verwendet optimierte Blockpartitionierung für Listen für die angegebene partitionCount Verwendet Blockpartitionierung durch Erstellen einer statischen Anzahl von Partitionen
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions Löst nicht unterstützte Ausnahme aus Verwendet optimierte Blockpartitionierung für Listen und dynamische Partitionen Verwendet Blockpartitionierung durch Erstellen einer dynamischen Anzahl von Partitionen.
KeysOrderedInEachPartition Gibt true zurück. Gibt true zurück. Gibt true zurück.
KeysOrderedAcrossPartitions Gibt true zurück. Gibt false zurück. Gibt false zurück.
KeysNormalized Gibt true zurück. Gibt true zurück. Gibt true zurück.
SupportsDynamicPartitions Gibt false zurück. Gibt true zurück. Gibt true zurück.

Dynamische Partitionen

Wenn Sie beabsichtigen, den Partitionierer in einer ForEach-Methode zu verwenden, müssen Sie eine dynamische Anzahl von Partitionen zurückgeben können. Dies bedeutet, dass der Partitionierer bei Bedarf jederzeit während der Schleifenausführung einen Enumerator für eine neue Partition bereitstellen kann. Im Wesentlichen wird immer dann, wenn die Schleife eine neue parallele Aufgabe hinzufügt, eine neue Partition für diese Aufgabe angefordert. Wenn die Daten sortierbar sein müssen, leiten Sie von System.Collections.Concurrent.OrderablePartitioner<TSource> ab, sodass jedem Element in jeder Partition ein eindeutiger Index zugewiesen wird.

Weitere Informationen und ein Beispiel finden Sie unter Gewusst wie: Implementieren von dynamischen Partitionen.

Vertrag für Partitionierer

Wenn Sie einen benutzerdefinierten Partitionierer implementieren, befolgen Sie diese Richtlinien, um ordnungsgemäße Interaktion mit PLINQ und ForEach in der TPL sicherzustellen:

  • Wenn GetPartitions mit einem Argument von 0 (null) oder weniger für partitionsCount aufgerufen wird, soll ArgumentOutOfRangeException ausgelöst werden. Obwohl PLINQ und TPL niemals eine partitionCount gleich 0 übergeben werden, sollten Sie dennoch einen Schutz vor dieser Möglichkeit einbauen.

  • GetPartitions und GetOrderablePartitions sollten stets eine partitionsCount Anzahl von Partitionen zurückgeben. Wenn dem Partitionierer keine Daten mehr zur Verfügung stehen, sodass er nicht mehr so viele Partitionen wie angefordert erstellen kann, sollte die Methode für jede der verbleibenden Partitionen einen leeren Enumerator zurückgeben. Andernfalls löst sowohl PLINQ als auch TPL eine InvalidOperationException aus.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitions und GetOrderableDynamicPartitions sollten niemals null (Nothing in Visual Basic) zurückgeben. Wenn dies doch der Fall ist, lösen PLINQ/TPL eine InvalidOperationException aus.

  • Partitionen zurückgebende Methoden sollten immer Partitionen zurückgeben, die die Datenquelle vollständig und eindeutig aufzählen können. Sofern nicht ausdrücklich vom Entwurf des Partitionierers gefordert, dürfen in der Datenquelle keine Duplikate oder übersprungenen Elemente enthalten sein. Wenn diese Regel nicht befolgt wird, kann die Ausgabereihenfolge durcheinander geraten.

  • Die folgenden booleschen Getter müssen immer genau die folgenden Werte zurückgeben, damit die Reihenfolge der Ausgabe nicht durcheinander gerät:

    • KeysOrderedInEachPartition: Jede Partition gibt Elemente mit zunehmenden Schlüsselindizes zurück.

    • KeysOrderedAcrossPartitions: Für alle zurückgegebenen Partitionen sind die Schlüsselindizes in Partition i höher als diejenigen in Partition i-1.

    • KeysNormalized: Alle Schlüsselindizes nehmen beginnend mit 0 (null) ohne Lücken monoton zu.

  • Alle Indizes müssen eindeutig sein. Es darf keine doppelten Indizes geben. Wenn diese Regel nicht befolgt wird, kann die Ausgabereihenfolge durcheinander geraten.

  • Alle Indizes dürfen nicht negativ sein. Wenn diese Regel nicht befolgt wird, kann PLINQ/TPL Ausnahmen auslösen.

Siehe auch