Construcciones de flujo de datos de la biblioteca paralela de tareas (TPL)

Construcciones de flujo de datos de la biblioteca paralela de tareas (TPL)

# Bloque de acción

(foreach)

Esta clase se puede considerar lógicamente como un búfer para procesar los datos combinados con tareas para procesar esos datos, con el "bloque de flujo de datos" administrando ambos. En su uso más básico, podemos instanciar un ActionBlock y "publicar" datos en él; el delegado proporcionado en la construcción de ActionBlock se ejecutará de forma asíncrona para cada dato publicado.

Cómputo síncrono

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Limitación de las descargas asincrónicas a un máximo de 5 al mismo tiempo

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Introducción a TPL Dataflow por Stephen Toub

# BroadcastBlock

(Copie un elemento y envíe las copias a cada bloque al que está vinculado)

A diferencia de BufferBlock, la misión en la vida de BroadcastBlock es permitir que todos los objetivos vinculados desde el bloque obtengan una copia de cada elemento publicado, sobrescribiendo continuamente el valor "actual" con los que se propagan a él.

Además, a diferencia de BufferBlock, BroadcastBlock no retiene datos innecesariamente. Después de que se haya ofrecido un dato en particular a todos los objetivos, ese elemento se sobrescribirá con el siguiente dato en la línea (al igual que con todos los bloques de flujo de datos, los mensajes se manejan en orden FIFO). Ese elemento se ofrecerá a todos los objetivos, y así sucesivamente.

Productor/consumidor asíncrono con un productor limitado

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Exponer el estado de un agente

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Introducción a TPL Dataflow por Stephen Toub

# bloque de búfer

(Cola FIFO:Los datos que entran son los datos que salen)

En resumen, BufferBlock proporciona un búfer ilimitado o limitado para almacenar instancias de T.
Puede "publicar" instancias de T en el bloque, lo que hace que los datos que se publican se almacenen en un orden de primero en entrar, primero en salir (FIFO) por el bloque.
Puede "recibir" del bloque, lo que le permite obtener de forma sincrónica o asincrónica instancias de T previamente almacenadas o disponibles en el futuro (de nuevo, FIFO).

Productor/consumidor asíncrono con un productor limitado

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Introducción a TPL Dataflow por Stephen Toub

# JoinBlock

(Recopila 2-3 entradas y las combina en una Tupla)

Al igual que BatchBlock, JoinBlock puede agrupar datos de múltiples fuentes de datos. De hecho, ese es el objetivo principal de JoinBlock.

Por ejemplo, un JoinBlock es un ISourceBlock>.

Al igual que con BatchBlock, JoinBlock es capaz de operar tanto en modo voraz como no voraz.

  • En el modo voraz predeterminado, se aceptan todos los datos ofrecidos a los objetivos, incluso si el otro objetivo no tiene los datos necesarios para formar una tupla.
  • En el modo no codicioso, los objetivos del bloque pospondrán los datos hasta que se les haya ofrecido a todos los objetivos los datos necesarios para crear una tupla, momento en el cual el bloque se involucrará en un protocolo de confirmación de dos fases para recuperar atómicamente todos los elementos necesarios de las fuentes. Este aplazamiento hace posible que otra entidad consuma los datos mientras tanto para permitir que el sistema general avance.

Procesamiento de solicitudes con un número limitado de objetos agrupados

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Introducción a TPL Dataflow por Stephen Toub

# WriteOnceBlock

(Variable de solo lectura:memoriza su primer elemento de datos y distribuye copias de él como su salida. Ignora todos los demás elementos de datos)

Si BufferBlock es el bloque más fundamental en TPL Dataflow, WriteOnceBlock es el más simple.
Almacena como máximo un valor y, una vez que se ha establecido ese valor, nunca se reemplazará ni se sobrescribirá.

Puede pensar en WriteOnceBlock como similar a una variable miembro de solo lectura en C#, excepto que en lugar de que solo se pueda configurar en un constructor y luego sea inmutable, solo se puede configurar una vez y luego es inmutable.

Dividir los resultados potenciales de una tarea

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Introducción a TPL Dataflow por Stephen Toub

# Bloque de unión por lotes

(Recopila una cierta cantidad de elementos totales de 2-3 entradas y los agrupa en una tupla de colecciones de elementos de datos)

BatchedJoinBlock es, en cierto sentido, una combinación de BatchBlock y JoinBlock.
Mientras que JoinBlock se usa para agregar una entrada de cada destino en una tupla, y BatchBlock se usa para agregar N entradas a una colección, BatchedJoinBlock se usa para recopilar N entradas de todo todos los objetivos en tuplas de colecciones.

Dispersión/Recopilación

Considere un problema de dispersión/recopilación en el que se inician N operaciones, algunas de las cuales pueden tener éxito y producir salidas de cadena, y otras pueden fallar y producir excepciones.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Introducción a TPL Dataflow por Stephen Toub

# TransformBlock

(Seleccionar, uno a uno)

Al igual que ActionBlock, TransformBlock permite la ejecución de un delegado para realizar alguna acción para cada dato de entrada; a diferencia de ActionBlock, este procesamiento tiene una salida. Este delegado puede ser Func, en cuyo caso el procesamiento de ese elemento se considera completado cuando el delegado regresa, o puede ser Func, en cuyo caso el procesamiento de ese elemento se considera completado no cuando el delegado regresa pero cuando se completa la tarea devuelta. Para aquellos familiarizados con LINQ, es algo similar a Select() en el sentido de que toma una entrada, transforma esa entrada de alguna manera y luego produce una salida.

De forma predeterminada, TransformBlock procesa sus datos secuencialmente con un MaxDegreeOfParallelism igual a 1. Además de recibir la entrada almacenada en el búfer y procesarla, este bloque tomará toda su salida procesada y también la almacenará en el búfer (datos que no han sido procesados ​​y los datos que han sido procesados).

Tiene 2 tareas:una para procesar los datos y otra para enviar datos al siguiente bloque.

Una canalización simultánea

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Introducción a TPL Dataflow por Stephen Toub

# TransformManyBlock

(SelectMany, 1-m:los resultados de esta asignación se "aplanan", al igual que SelectMany de LINQ)

TransformManyBlock es muy similar a TransformBlock.
La diferencia clave es que, mientras que TransformBlock produce una y solo una salida para cada entrada, TransformManyBlock produce cualquier número (cero o más) de salidas para cada entrada. Al igual que con ActionBlock y TransformBlock, este procesamiento se puede especificar mediante delegados, tanto para el procesamiento sincrónico como asincrónico.

Func se usa para sincrónico y Func> para asincrónico. Al igual que con ActionBlock y TransformBlock, TransformManyBlock tiene como valor predeterminado el procesamiento secuencial, pero se puede configurar de otra manera.

El delegado de mapeo devuelve una colección de elementos, que se insertan individualmente en el búfer de salida.

Rastreador web asíncrono

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Expansión de un enumerable en sus elementos constituyentes

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtrar pasando de 1 a 0 o 1 elementos

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Introducción a TPL Dataflow por Stephen Toub

# Bloque por lotes

(Agrupa una cierta cantidad de elementos de datos secuenciales en colecciones de elementos de datos)

BatchBlock combina N elementos individuales en un elemento de lote, representado como una matriz de elementos. Se crea una instancia con un tamaño de lote específico, y el bloque luego crea un lote tan pronto como recibe esa cantidad de elementos, enviando el lote de forma asíncrona al búfer de salida.

BatchBlock es capaz de ejecutarse tanto en modo voraz como no voraz.

  • En el modo codicioso predeterminado, todos los mensajes ofrecidos al bloque desde cualquier número de fuentes se aceptan y almacenan en búfer para convertirlos en lotes.
    • En el modo no codicioso, todos los mensajes se posponen desde las fuentes hasta que suficientes fuentes hayan ofrecido mensajes al bloque para crear un lote. Por lo tanto, se puede usar un BatchBlock para recibir 1 elemento de cada una de las N fuentes, N elementos de 1 fuente y una miríada de opciones intermedias.

    Solicitudes por lotes en grupos de 100 para enviar a una base de datos

    var batchRequests = new BatchBlock<Request>(batchSize:100);
    var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
    
    batchRequests.LinkTo(sendToDb);
    
    

    Crear un lote una vez por segundo

    var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
    new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
    
    

    Introducción a TPL Dataflow por Stephen Toub