.NET basado en eventos:productor/consumidor simultáneo mediante BlockingCollection

 C Programming >> Programación C >  >> Tags >> .NET
.NET basado en eventos:productor/consumidor simultáneo mediante BlockingCollection

Con el patrón Productor/Consumidor, tiene uno o más subprocesos que producen trabajo nuevo y lo ponen en cola, y uno o más subprocesos consumen ese trabajo quitándolo de la cola y procesándolo. Los consumidores y productores comparten el acceso a la cola de trabajo. Piense en ello como la oficina de correos. Tiene una o más personas (productores) que dejan cartas en un buzón, y uno o más trabajadores postales (consumidores) toman estas cartas y las procesan.

Hay varias formas de implementar el patrón Producer/Consumer en .NET. Debe tomar dos decisiones de diseño:

  • Cómo buscar nuevos datos en la cola del consumidor
  • Cómo manejar la simultaneidad de una manera segura para subprocesos. Los subprocesos de consumidor y productor tienen un recurso compartido:la cola de trabajo. Por lo tanto, el acceso a la cola debe manejarse de manera segura para subprocesos.

La mejor manera de implementar esto es usando la clase BlockingCollection. Para mostrar por qué este es el mejor, aquí hay una comparación de los cuatro enfoques diferentes:

Como puede ver en la matriz de comparación anterior, BlockingCollection proporciona un enfoque concurrente basado en eventos que abstrae los detalles de subprocesos de bajo nivel.

Aquí hay un ejemplo simple de cómo usar BlockingCollection.

1 – Cree un Consumidor que use BlockingCollection

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace ProducerConsumer
{
    public class StringReverser
    {
        private readonly BlockingCollection<string> messageQueue;
        public StringReverser(BlockingCollection<string> messageQueue)
        {
            this.messageQueue = messageQueue;
        }
        public void StartProcessing()
        {
            while (true)
            {
                var message = messageQueue.Take(); //Blocks until a new message is available
                var reversedString = new string(message.Reverse().ToArray());

                Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} reverse({message})=>{reversedString}");
            }
        }
        public void QueueForProcessing(string Message)
        {
            messageQueue.Add(Message);
        }
    }
}
Code language: C# (cs)

La parte importante aquí es la llamada a Take(). Esto bloquea hasta que haya un mensaje disponible en la cola de mensajes.

2 – Inicie el Consumidor y comience a producir mensajes

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ProducerConsumer
{
    class Program
    {
        public static void Main(string[] args)
        {
            var messageQueue = new BlockingCollection<string>();
            var messageReverser = new StringReverser(messageQueue);

            Task.Run(() => 
            {
                messageReverser.StartProcessing();
            });
            
            while (true)
            {
                Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} Write a sentence and see each word reversed: ");
                var msg = Console.ReadLine();
                Console.WriteLine("");

                foreach(var s in msg.Split())
                {
                    messageQueue.Add(s);
                }
                
            }
        }
    }
}
Code language: C# (cs)

Hay dos partes importantes aquí:

  1. Iniciando el Consumidor en otro hilo. Esto es importante porque la llamada a Take() es una llamada de bloqueo:bloquea el hilo en el que se encuentra para que no haga nada más.
  2. Produzca nuevos mensajes agregándolos a BlockingCollection.

Esto es lo que parece cuando ejecuto esta aplicación de consola:

Observe que el Consumidor (StringReverser) se está ejecutando en un subproceso diferente.