Datenflusskonstrukte der Task Parallel Library (TPL).

Datenflusskonstrukte der Task Parallel Library (TPL).

# Aktionsblock

(vorher)

Diese Klasse kann man sich logisch als einen Puffer für zu verarbeitende Daten in Kombination mit Aufgaben zur Verarbeitung dieser Daten vorstellen, wobei der „Datenflussblock“ beides verwaltet. In seiner grundlegendsten Verwendung können wir einen ActionBlock instanziieren und Daten an ihn „posten“. Der Delegierte, der bei der Erstellung des ActionBlocks bereitgestellt wird, wird asynchron für jedes gepostete Datenelement ausgeführt.

Synchrone Berechnung

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Drosselung asynchroner Downloads auf höchstens 5 gleichzeitig

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Einführung in TPL Dataflow von Stephen Toub

# BroadcastBlock

(Kopieren Sie ein Element und senden Sie die Kopien an jeden Block, mit dem es verknüpft ist)

Im Gegensatz zu BufferBlock besteht die Lebensaufgabe von BroadcastBlock darin, allen mit dem Block verknüpften Zielen zu ermöglichen, eine Kopie jedes veröffentlichten Elements zu erhalten, wobei der „aktuelle“ Wert kontinuierlich mit den an ihn weitergegebenen überschrieben wird.

Darüber hinaus speichert BroadcastBlock im Gegensatz zu BufferBlock nicht unnötigerweise Daten. Nachdem ein bestimmtes Datum allen Zielen angeboten wurde, wird dieses Element von dem Datenteil überschrieben, der als nächstes in der Reihe steht (wie bei allen Datenflussblöcken werden Nachrichten in FIFO-Reihenfolge behandelt). Dieses Element wird allen Zielen angeboten und so weiter.

Asynchroner Producer/Consumer mit einem gedrosselten Producer

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Offenlegen des Status von einem Agenten

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Einführung in TPL Dataflow von Stephen Toub

# BufferBlock

(FIFO-Warteschlange:Die eingehenden Daten sind die ausgehenden Daten)

Kurz gesagt, BufferBlock bietet einen unbegrenzten oder begrenzten Puffer zum Speichern von Instanzen von T.
Sie können Instanzen von T an den Block „posten“, wodurch die Daten, die gesendet werden, vom Block in einer FIFO-Reihenfolge (First-in-First-out) gespeichert werden.
Sie können von dem Block „empfangen“, wodurch Sie synchron oder asynchron Instanzen von T abrufen können, die zuvor gespeichert oder in der Zukunft verfügbar sind (wiederum FIFO).

Asynchroner Producer/Consumer mit einem gedrosselten Producer

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Einführung in TPL Dataflow von Stephen Toub

# JoinBlock

(Sammelt 2-3 Eingaben und kombiniert sie zu einem Tupel)

Wie BatchBlock kann JoinBlock Daten aus mehreren Datenquellen gruppieren. Tatsächlich ist das der Hauptzweck von JoinBlock.

Beispielsweise ist ein JoinBlock ein ISourceBlock>.

Wie BatchBlock kann JoinBlock sowohl im Greedy- als auch im Non-Greedy-Modus betrieben werden.

  • Im standardmäßigen Greedy-Modus werden alle Daten akzeptiert, die Zielen angeboten werden, auch wenn das andere Ziel nicht über die erforderlichen Daten verfügt, um ein Tupel zu bilden.
  • Im Non-Greedy-Modus werden die Ziele des Blocks Daten verschieben, bis allen Zielen die erforderlichen Daten zum Erstellen eines Tupels angeboten wurden, woraufhin der Block ein zweiphasiges Commit-Protokoll durchführt, um alle erforderlichen Elemente atomar abzurufen die Quellen. Diese Verschiebung ermöglicht es einer anderen Entität, die Daten in der Zwischenzeit zu konsumieren, um dem Gesamtsystem zu ermöglichen, Fortschritte zu machen.

Verarbeitung von Anfragen mit einer begrenzten Anzahl von gepoolten Objekten

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Einführung in TPL Dataflow von Stephen Toub

# WriteOnceBlock

(Schreibgeschützte Variable:Speichert ihr erstes Datenelement und gibt Kopien davon als Ausgabe aus. Ignoriert alle anderen Datenelemente)

Wenn BufferBlock der grundlegendste Block in TPL Dataflow ist, ist WriteOnceBlock der einfachste.
Es speichert höchstens einen Wert, und sobald dieser Wert festgelegt wurde, wird er niemals ersetzt oder überschrieben.

Sie können sich WriteOnceBlock ähnlich wie eine schreibgeschützte Mitgliedsvariable in C# vorstellen, außer dass sie nicht nur in einem Konstruktor einstellbar und dann unveränderlich ist, sondern nur einmal einstellbar und dann unveränderlich ist.

Potenzielle Ergebnisse einer Aufgabe aufteilen

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Einführung in TPL Dataflow von Stephen Toub

# BatchedJoinBlock

(Sammelt eine bestimmte Anzahl von Gesamtelementen aus 2-3 Eingaben und gruppiert sie in einem Tupel von Sammlungen von Datenelementen)

BatchedJoinBlock ist gewissermaßen eine Kombination aus BatchBlock und JoinBlock.
Während JoinBlock verwendet wird, um eine Eingabe von jedem Ziel in ein Tupel zu aggregieren, und BatchBlock verwendet wird, um N Eingaben in einer Sammlung zu aggregieren, wird BatchedJoinBlock verwendet, um N Eingaben von allen zu sammeln alle Ziele in Tupel von Sammlungen.

Streuen/Sammeln

Stellen Sie sich ein Streu/Sammel-Problem vor, bei dem N Operationen gestartet werden, von denen einige erfolgreich sein und String-Ausgaben erzeugen können, während andere fehlschlagen und Ausnahmen erzeugen können.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Einführung in TPL Dataflow von Stephen Toub

# TransformBlock

(Auswählen, eins zu eins)

Wie bei ActionBlock ermöglicht TransformBlock die Ausführung eines Delegaten, um eine Aktion für jedes Eingabedatum auszuführen; Im Gegensatz zu ActionBlock hat diese Verarbeitung eine Ausgabe. Dieser Delegat kann ein Func sein, in diesem Fall gilt die Verarbeitung dieses Elements als abgeschlossen, wenn der Delegat zurückkehrt, oder er kann ein Func sein, in diesem Fall gilt die Verarbeitung dieses Elements als nicht abgeschlossen wenn der Delegat zurückkehrt, aber wenn die zurückgegebene Aufgabe abgeschlossen ist. Für diejenigen, die mit LINQ vertraut sind, ist es Select() insofern ähnlich, als es eine Eingabe entgegennimmt, diese Eingabe auf irgendeine Weise umwandelt und dann eine Ausgabe erzeugt.

Standardmäßig verarbeitet TransformBlock seine Daten sequenziell mit einem MaxDegreeOfParallelism gleich 1. Dieser Block empfängt nicht nur gepufferte Eingaben und verarbeitet sie, sondern nimmt auch seine gesamte verarbeitete Ausgabe und puffert diese ebenfalls (Daten, die noch nicht vorhanden waren verarbeitet und Daten, die verarbeitet wurden).

Es hat 2 Aufgaben:Eine, um die Daten zu verarbeiten, und eine, um Daten in den nächsten Block zu schieben.

Eine gleichzeitige Pipeline

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Einführung in TPL Dataflow von Stephen Toub

# TransformManyBlock

(SelectMany, 1-m:Die Ergebnisse dieser Zuordnung werden „geglättet“, genau wie SelectMany von LINQ)

TransformManyBlock ist TransformBlock sehr ähnlich.
Der Hauptunterschied besteht darin, dass ein TransformBlock eine und nur eine Ausgabe für jede Eingabe erzeugt, TransformManyBlock eine beliebige Anzahl (null oder mehr) Ausgaben für jede Eingabe erzeugt. Wie bei ActionBlock und TransformBlock kann diese Verarbeitung sowohl für die synchrone als auch für die asynchrone Verarbeitung mithilfe von Delegaten angegeben werden.

Ein Func wird für synchron verwendet, und ein Func> wird für asynchron verwendet. Wie bei ActionBlock und TransformBlock ist TransformManyBlock standardmäßig auf sequenzielle Verarbeitung eingestellt, kann aber anders konfiguriert werden.

Der Mapping-Delegat sendet eine Sammlung von Elementen zurück, die einzeln in den Ausgabepuffer eingefügt werden.

Asynchroner Web-Crawler

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Expandieren einer Aufzählung in ihre Bestandteile

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtern von 1 bis 0 oder 1 Element

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Einführung in TPL Dataflow von Stephen Toub

# BatchBlock

(Gruppiert eine bestimmte Anzahl aufeinanderfolgender Datenelemente in Sammlungen von Datenelementen)

BatchBlock kombiniert N einzelne Elemente zu einem Stapelelement, das als Array von Elementen dargestellt wird. Eine Instanz wird mit einer bestimmten Stapelgröße erstellt, und der Block erstellt dann einen Stapel, sobald er diese Anzahl von Elementen empfangen hat, und gibt den Stapel asynchron an den Ausgabepuffer aus.

BatchBlock kann sowohl im Greedy- als auch im Non-Greedy-Modus ausgeführt werden.

  • Im Standard-Greedy-Modus werden alle Nachrichten, die dem Block von einer beliebigen Anzahl von Quellen angeboten werden, akzeptiert und gepuffert, um in Stapel umgewandelt zu werden.
    • Im Non-Greedy-Modus werden alle Nachrichten von Quellen verschoben, bis genügend Quellen dem Block Nachrichten angeboten haben, um einen Stapel zu erstellen. Somit kann ein BatchBlock verwendet werden, um 1 Element von jeder von N Quellen, N Elemente von 1 Quelle und eine Vielzahl von Optionen dazwischen zu empfangen.

    Bündelung von Anfragen in Gruppen von 100 zur Übermittlung an eine Datenbank

    var batchRequests = new BatchBlock<Request>(batchSize:100);
    var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
    
    batchRequests.LinkTo(sendToDb);
    
    

    Batch einmal pro Sekunde erstellen

    var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
    new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
    
    

    Einführung in TPL Dataflow von Stephen Toub