Benutzerdefinierte Partitionierer für PLINQ und TPL

Um einen Vorgang in einer Datenquelle zu parallelisieren, ist einer der wesentlichen Schritte das Partitionieren der Quelle in mehrere Abschnitte, auf die gleichzeitig von mehreren Threads zugegriffen werden kann. PLINQ und die Task Parallel Library (TPL) bieten standardmäßige Partitionierer, die transparent funktionieren, wenn Sie eine parallele Abfrage oder ForEach-Schleife schreiben. Für komplexere Szenarien können Sie Ihren eigenen Partitionierer verwenden.

Arten der Partitionierung

Es gibt viele Möglichkeiten, eine Datenquelle zu partitionieren. Bei den effizientesten Methoden kooperieren mehrere Threads, um die ursprüngliche Quellsequenz zu verarbeiten, anstatt die Quelle physisch in mehrere Untersequenzen zu trennen. Für Arrays und andere indizierte Quellen wie z. B. IList-Auflistungen, in denen die Länge im Voraus bekannt ist, ist die Bereichspartitionierung die einfachste Art der Partitionierung. Jeder Thread empfängt eindeutige Anfangs- und Endindizes, damit der Bereich der Quelle verarbeitet werden kann, ohne dass dieser bzw. ein anderer Thread überschrieben wird. Der einzige durch Bereichspartitionierung entstehende Mehraufwand ist die ursprüngliche Erstellung von Bereichen. Danach ist keine zusätzliche Synchronisierung erforderlich. Daher kann eine gute Leistung erzielt werden, solange die Arbeitsauslastung gleichmäßig geteilt wird. Ein Nachteil der Bereichspartitionierung ist, dass bei frühzeitigem Beenden eines Threads für die anderen Threads keine Unterstützung bei deren Arbeit mehr zur Verfügung steht.

Für verknüpfte Listen oder andere Auflistungen, deren Länge nicht bekannt ist, können Sie Blockpartitionierung verwenden. Bei der Blockpartitionierung verwendet jeder Thread oder jede Aufgabe in einer parallelen Schleife oder Abfrage eine gewisse Anzahl von Quellelementen in einem Block, verarbeitet diese und ruft anschließend zusätzliche Elemente ab. Der Partitionierer stellt sicher, dass alle Elemente verteilt werden, und dass keine Duplikate vorhanden sind. Ein Block kann jede beliebige Größe besitzen. Der Partitionierer, der in Gewusst wie: Implementieren von dynamischen Partitionen veranschaulicht wird, erstellt z. B. Ausschnitte, die nur ein Element enthalten. Solange die Blöcke nicht zu groß sind, sorgt diese Art von Partitionierung grundsätzlich für Lastenausgleich, da die Zuweisung von Elementen zu Threads nicht vorgegeben ist. Allerdings tritt beim Partitionierer ein Synchronisierungsmehraufwand auf, wenn der Thread einen weiteren Block abrufen muss. Der Synchronisierungsaufwand in diesen Fällen ist umgekehrt proportional zur Größe der Blöcke.

Im Allgemeinen ist die Bereichspartitionierung nur dann schneller, wenn die Ausführungszeit des Delegaten kurz oder von mittlerer Länge ist, die Quelle eine große Anzahl von Elementen enthält und die gesamte Arbeitslast jeder Partition ungefähr gleich ist. Blockpartitionierung ist daher im Allgemeinen in den meisten Fällen schneller. In Quellen mit einer geringen Anzahl von Elementen oder längeren Ausführungszeiten für den Delegaten ist die Leistung der Block- und Bereichspartitionierung ungefähr gleich.

Die TPL-Partitionierer unterstützen auch eine dynamische Anzahl von Partitionen. Dies bedeutet, dass Partitionen direkt erstellt werden können, z. B., wenn die ForEach-Schleife eine neue Aufgabe erzeugt. Diese Funktion ermöglicht dem Partitionierer eine gemeinsame Skalierung mit der Schleife. Dynamische Partitionierer sorgen zudem grundsätzlich für Lastenausgleich. Wenn Sie einen benutzerdefinierten Partitionierer erstellen, müssen Sie dynamische Partitionierung unterstützen, um eine Verwendung von einer ForEach-Schleife zu gewährleisten.

Konfigurieren von Lastenausgleichspartitionierern für PLINQ

Einige Überladungen der Partitioner.Create-Methode ermöglichen die Erstellung eines Partitioners für ein Array oder eine IList-Quelle und die Angabe, ob die Arbeitslast zwischen den Threads gleichmäßig verteilt werden soll. Wenn der Partitionierer für Lastenausgleich konfiguriert wird, wird Blockpartitionierung verwendet, und die Elemente werden gemäß Anforderung in kleinen Blöcken an jede Partition übergeben. Durch diese Methode wird gewährleistet, dass alle Partitionen zu verarbeitende Elemente enthalten, bis die gesamte Schleife oder Abfrage abgeschlossen ist. Mit einer zusätzlichen Überladung kann Lastenausgleichspartitionierung einer beliebigen IEnumerable-Quelle bereitgestellt werden.

Im Allgemeinen erfordert Lastenausgleich, dass die Partitionen relativ häufig Elemente vom Partitionierer anfordern. Im Gegensatz dazu kann ein Partitionierer, der eine statische Partitionierung vornimmt, jedem Partitionierer sofort alle Elemente zuweisen, wenn er entweder Bereichs- oder Blockpartitionierung verwendet. Dies erfordert weniger Aufwand als Lastenausgleich, doch die Ausführung kann mehr Zeit in Anspruch nehmen, wenn in einem Thread deutlich mehr Arbeit als in den anderen Threads anfällt. Wenn eine IList oder ein Array an PLINQ übergeben wird, wird immer Bereichspartitionierung ohne Lastenausgleich verwendet. Um Lastenausgleich für PLINQ zu aktivieren, verwenden Sie die Partitioner.Create-Methode (siehe folgendes Beispiel).

        ' Static number of partitions requires indexable source.
        Dim nums = Enumerable.Range(0, 100000000).ToArray()

        ' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
        Dim customPartitioner = Partitioner.Create(nums, True)

        ' The partitioner is the query's data source.
        Dim q = From x In customPartitioner.AsParallel()
                Select x * Math.PI

        q.ForAll(Sub(x) ProcessData(x))

            // Static partitioning requires indexable source. Load balancing
            // can use any IEnumerable.
            var nums = Enumerable.Range(0, 100000000).ToArray();

            // Create a load-balancing partitioner. Or specify false for static partitioning.
            Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

            // The partitioner is the query's data source.
            var q = from x in customPartitioner.AsParallel()
                    select x * Math.PI;

            q.ForAll((x) =>
            {
                ProcessData(x);
            });

Die beste Möglichkeit, zu bestimmen, ob in einem gegebenen Szenario Lastenausgleich verwendet werden soll, besteht darin durch Experimentieren zu ermitteln, wie viel Zeit Vorgänge bei repräsentativer Auslastung und mit entsprechenden Computerkonfigurationen in Anspruch nehmen. Statische Partitionierung kann z. B. für beträchtliche Geschwindigkeitssteigerungen auf einem Multikerncomputer sorgen, der nur einige Kerne besitzt, doch auf Computern mit verhältnismäßig vielen Kernen kann es zu einer Verlangsamung kommen.

In der folgenden Tabelle sind die verfügbaren Überladungen der Create-Methode aufgelistet. Für diese Partitionierer gilt keine Einschränkung, dass sie nur mit PLINQ oder ForEach verwendet werden dürfen. Sie können auch mit jedem benutzerdefinierten parallelen Konstrukt verwendet werden.

Überladung

Verwendet Lastenausgleich

Create<TSource>(IEnumerable<TSource>)

Immer

Create<TSource>(TSource[], Boolean)

Wenn das boolesche Argument als "true" (wahr) angegeben wird

Create<TSource>(IList<TSource>, Boolean)

Wenn das boolesche Argument als "true" (wahr) angegeben wird

Create(Int32, Int32)

Nie

Create(Int32, Int32, Int32)

Nie

Create(Int64, Int64)

Nie

Create(Int64, Int64, Int64)

Nie

Konfigurieren von statischen Bereichspartitionierern für "Parallel.ForEach"

In einer For-Schleife wird der Text der Schleife für die Methode als Delegat bereitgestellt. Der Aufwand für einen Delegatenaufruf entspricht ungefähr dem für einen virtuellen Methodenaufruf. In einigen Szenarien ist der Text einer parallelen Schleife möglicherweise so klein, dass der Aufwand für einen Delegataufruf in jeder Schleifeniteration ein beträchtliches Ausmaß erreicht. In solchen Situationen können Sie mit einer der Create-Überladungen ein IEnumerable<T>-Objekt von Bereichspartitionen über den Quellelementen erstellen. Anschließend können Sie diese Bereichsauflistung an eine ForEach-Methode übergeben, deren Text aus einer regulären for-Schleife besteht. Der Vorteil dieser Methode liegt darin, dass der Aufwand für einen Delegataufruf nur einmal pro Bereich und nicht einmal pro Element anfällt. Das grundlegende Muster wird im folgenden Beispiel veranschaulicht.

Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }           
        }
    }
}

Jeder Thread in der Schleife empfängt ein eigenes Tuple<T1, T2>-Objekt, das die Anfangs- und Endindexwerte im angegebenen Teilbereich enthält. Die innere for-Schleife verwendet den fromInclusive-Wert und den toExclusive-Wert, um das Array oder das IList-Objekt direkt in einer Schleife zu durchlaufen.

Eine der Create-Überladungen ermöglicht die Angabe der Größe und der Anzahl der Partitionen. Diese Überladung kann in Szenarien verwendet werden, in denen die Arbeit pro Element so geringfügig ist, dass sogar ein einzelner virtueller Methodenaufruf pro Element beträchtliche Auswirkungen auf die Leistung hat.

Benutzerdefinierte Partitionierer

In einigen Szenarien kann es empfehlenswert oder sogar erforderlich sein, einen eigenen Partitionierer zu implementieren. Beispiel: Sie besitzen eine benutzerdefinierte Auflistungsklasse, die Sie aufgrund Ihrer Kenntnisse der internen Struktur der Klasse effizienter als Standardpartitionierer partitionieren können. Möglicherweise möchten Sie auch unter Berücksichtigung Ihrer Kenntnis der Verarbeitungsdauer von Elementen an verschiedenen Orten in der Quellauflistung unterschiedlich große Bereichspartitionen erstellen.

Zum Erstellen eines grundlegenden benutzerdefinierten Partitioners muss eine Klasse von System.Collections.Concurrent.Partitioner<TSource> abgeleitet werden, und die virtuellen Methoden müssen überschrieben werden. Eine entsprechende Beschreibung finden Sie in der folgenden Tabelle.

GetPartitions

Diese Methode wird einmal vom Hauptthread aufgerufen und gibt einen IList(IEnumerator(TSource)) zurück. Jeder Arbeitsthread in der Schleife oder Abfrage kann GetEnumerator in der Liste aufrufen, um über eine unterschiedliche Partition einen IEnumerator<T> abzurufen.

SupportsDynamicPartitions

Gibt true zurück, wenn Sie GetDynamicPartitionsimplementieren, andernfalls false.

GetDynamicPartitions

Wenn SupportsDynamicPartitions den Wert true ergibt, kann optional diese Methode anstelle von GetPartitions aufgerufen werden.

Wenn die Ergebnisse sortierbar sein müssen, oder wenn Sie indizierten Zugriff auf die Elemente benötigen, führen Sie eine Ableitung von System.Collections.Concurrent.OrderablePartitioner<TSource> aus, und überschreiben Sie die virtuellen Methoden entsprechend der Beschreibung in der folgenden Tabelle.

GetPartitions

Diese Methode wird einmal vom Hauptthread aufgerufen und gibt einen IList(IEnumerator(TSource)) zurück. Jeder Arbeitsthread in der Schleife oder Abfrage kann GetEnumerator in der Liste aufrufen, um über eine unterschiedliche Partition einen IEnumerator<T> abzurufen.

SupportsDynamicPartitions

Gibt true zurück, wenn Sie GetDynamicPartitionsimplementieren, andernfalls "false".

GetDynamicPartitions

In der Regel wird dabei nur GetOrderableDynamicPartitions aufgerufen.

GetOrderableDynamicPartitions

Wenn SupportsDynamicPartitions den Wert true ergibt, kann optional diese Methode anstelle von GetPartitions aufgerufen werden.

Die folgende Tabelle enthält weitere Details darüber, wie die drei Arten von Lastenausgleichspartitionierern die OrderablePartitioner<TSource>-Klasse implementieren.

Methode/Eigenschaft

IList/Array ohne Lastenausgleich

IList/Array mit Lastenausgleich

IEnumerable

GetOrderablePartitions

Verwendet Bereichspartitionierung

Verwendet Blockpartitionierung, die für Listen für die angegebene Partitionsanzahl (partitionCount) optimiert ist

Verwendet Blockpartitionierung durch Erstellen einer statischen Anzahl von Partitionen.

OrderablePartitioner<TSource>.GetOrderableDynamicPartitions

Löst eine nicht unterstützte Ausnahme aus

Verwendet für Listen und dynamische Partitionen optimierte Blockpartitionierung

Verwendet Blockpartitionierung durch Erstellen einer dynamischen Anzahl von Partitionen.

KeysOrderedInEachPartition

Gibt true zurück.

Gibt true zurück.

Gibt true zurück.

KeysOrderedAcrossPartitions

Gibt true zurück.

Gibt false zurück.

Gibt false zurück.

KeysNormalized

Gibt true zurück.

Gibt true zurück.

Gibt true zurück.

SupportsDynamicPartitions

Gibt false zurück.

Gibt true zurück.

Gibt true zurück.

Dynamische Partitionen

Wenn Sie den Partitionierer in einer ForEach-Methode verwenden möchten, muss eine dynamische Anzahl von Partitionen zurückgegeben werden können. Dies bedeutet, dass der Partitionierer jederzeit während der Schleifenausführung für eine neue Partition bedarfsabhängig einen Enumerator angeben kann. Grundsätzlich wird beim Hinzufügen einer neuen parallelen Aufgabe durch die Schleife eine neue Partition für diese Aufgabe angefordert. Wenn die Daten sortierbar sein müssen, nehmen Sie eine Ableitung von System.Collections.Concurrent.OrderablePartitioner<TSource> vor, damit jedem Element in jeder Partition ein eindeutiger Index zugewiesen wird.

Weitere Informationen und ein Beispiel finden Sie unter Gewusst wie: Implementieren von dynamischen Partitionen.

Vertrag für Partitionierer

Wenn Sie einen benutzerdefinierten Partitionierer implementieren, gehen Sie folgendermaßen vor, um eine ordnungsgemäße Interaktion mit PLINQ und ForEach in der TPL sicherzustellen:

  • Wenn GetPartitions mit einem Argument kleiner oder gleich 0 (null) für partitionsCount aufgerufen wird, wird ArgumentOutOfRangeException ausgelöst. Obwohl PLINQ und TPL niemals eine partitionCount gleich 0 übergeben, empfiehlt es sich trotzdem, sich davor zu schützen.

  • GetPartitions und GetOrderablePartitions geben immer die folgende Partitionsanzahl zurück: partitionsCount Wenn dem Partitionierer keine Daten mehr zur Verfügung stehen und nicht so viele Partitionen wie gewünscht erstellt werden können, wird von dieser Methode für jede der verbleibenden Partitionen ein leerer Enumerator zurückgegeben. Andernfalls lösen sowohl PLINQ als auch TPL eine InvalidOperationException aus.

  • GetPartitions, GetOrderablePartitions, GetDynamicPartitions und GetOrderableDynamicPartitions geben niemals null zurück (Nothing in Visual Basic). Wenn dies doch der Fall ist, löst PLINQ/TPL eine InvalidOperationException aus.

  • Methoden, die Partitionen zurückgeben, geben immer Partitionen zurück, die die Datenquelle vollständig und eindeutig auflisten können. Es darf keine doppelten Vorkommen in der Datenquelle oder übersprungene Elementen geben, sofern dies die Struktur des Partitionierers nicht ausdrücklich erfordert. Wenn diese Regel nicht befolgt wird, gerät die Ausgabereihenfolge möglicherweise durcheinander.

  • Die folgenden booleschen Getter müssen immer exakt die folgenden Werte zurückgeben, damit die Ausgabereihenfolge nicht durcheinander gerät:

    • KeysOrderedInEachPartition: Jede Partition gibt Elemente mit zunehmenden Schlüsselindizes zurück.

    • KeysOrderedAcrossPartitions: Für alle Partitionen, die zurückgegeben werden, sind die Schlüsselindizes in Partition i höher als die Schlüsselindizes in Partition i.

    • KeysNormalized: Alle Schlüsselindizes steigen monoton ohne Lücken an (ausgehend von 0 (null)).

  • Alle Indizes müssen eindeutig sein. Es darf keine doppelten Indizes geben. Wenn diese Regel nicht befolgt wird, gerät die Ausgabereihenfolge möglicherweise durcheinander.

  • Alle Indizes müssen nicht negativ sein. Wenn diese Regel nicht befolgt wird, löst PLINQ/TPL möglicherweise Ausnahmen aus.

Siehe auch

Aufgaben

Gewusst wie: Implementieren von dynamischen Partitionen

Gewusst wie: Implementieren eines Partitionierers mit einer statischen Anzahl von Partitionen

Konzepte

Parallele Programmierung in .NET Framework