Ereignisgesteuertes .NET:Concurrent Producer/Consumer mit BlockingCollection

 C Programming >> C-Programmierung >  >> Tags >> .NET
Ereignisgesteuertes .NET:Concurrent Producer/Consumer mit BlockingCollection

Beim Producer/Consumer-Muster erzeugen ein oder mehrere Threads neue Arbeit und stellen sie in die Warteschlange, und ein oder mehrere Threads verbrauchen diese Arbeit, indem sie sie aus der Warteschlange entfernen und verarbeiten. Die Konsumenten und Produzenten teilen sich den Zugriff auf die Arbeitswarteschlange. Stellen Sie es sich wie die Post vor. Sie haben eine oder mehrere Personen (Produzenten), die Briefe in einem Briefkasten abgeben, und einen oder mehrere Postangestellte (Verbraucher), die diese Briefe entgegennehmen und bearbeiten.

Es gibt mehrere Möglichkeiten, das Producer/Consumer-Muster in .NET zu implementieren. Sie müssen zwei Designentscheidungen treffen:

  • Überprüfen der Verbraucherwarteschlange auf neue Daten
  • Wie man Nebenläufigkeit auf Thread-sichere Weise handhabt. Die Consumer- und Producer-Threads haben eine gemeinsame Ressource – die Arbeitswarteschlange. Daher muss der Zugriff auf die Warteschlange Thread-sicher gehandhabt werden.

Der beste Weg, dies zu implementieren, ist die Verwendung der BlockingCollection-Klasse. Um zu zeigen, warum dies der beste ist, finden Sie hier einen Vergleich der vier verschiedenen Ansätze:

Wie Sie in der obigen Vergleichsmatrix sehen können, bietet die BlockingCollection einen ereignisgesteuerten, gleichzeitigen Ansatz, der die Low-Level-Threading-Details abstrahiert.

Hier ist ein einfaches Beispiel für die Verwendung der BlockingCollection.

1 – Erstellen Sie einen Consumer, der BlockingCollection verwendet

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace ProducerConsumer
{
    public class StringReverser
    {
        private readonly BlockingCollection<string> messageQueue;
        public StringReverser(BlockingCollection<string> messageQueue)
        {
            this.messageQueue = messageQueue;
        }
        public void StartProcessing()
        {
            while (true)
            {
                var message = messageQueue.Take(); //Blocks until a new message is available
                var reversedString = new string(message.Reverse().ToArray());

                Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} reverse({message})=>{reversedString}");
            }
        }
        public void QueueForProcessing(string Message)
        {
            messageQueue.Add(Message);
        }
    }
}
Code language: C# (cs)

Der wichtige Teil hier ist der Aufruf von Take(). Dies blockiert, bis eine Nachricht in der messageQueue verfügbar ist.

2 – Starten Sie den Verbraucher und beginnen Sie mit der Erstellung von Nachrichten

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ProducerConsumer
{
    class Program
    {
        public static void Main(string[] args)
        {
            var messageQueue = new BlockingCollection<string>();
            var messageReverser = new StringReverser(messageQueue);

            Task.Run(() => 
            {
                messageReverser.StartProcessing();
            });
            
            while (true)
            {
                Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} Write a sentence and see each word reversed: ");
                var msg = Console.ReadLine();
                Console.WriteLine("");

                foreach(var s in msg.Split())
                {
                    messageQueue.Add(s);
                }
                
            }
        }
    }
}
Code language: C# (cs)

Hier gibt es zwei wichtige Teile:

  1. Starten des Verbrauchers in einem anderen Thread. Dies ist wichtig, da der Aufruf von Take() ein blockierender Aufruf ist – er blockiert den Thread, in dem er sich befindet, daran, irgendetwas anderes zu tun.
  2. Erstellen Sie neue Nachrichten, indem Sie sie der BlockingCollection hinzufügen.

So sieht es aus, wenn ich diese Konsolen-App ausführe:

Beachten Sie, dass der Consumer (StringReverser) auf einem anderen Thread läuft.