Ereignisgesteuertes .NET:Concurrent Producer/Consumer mit einer nicht blockierenden, asynchronen Warteschlange

 C Programming >> C-Programmierung >  >> Tags >> .NET
Ereignisgesteuertes .NET:Concurrent Producer/Consumer mit einer nicht blockierenden, asynchronen Warteschlange

In einem früheren Artikel habe ich darüber geschrieben, wie man Concurrent Producer/Consumer mit einer BlockingCollection implementiert. Dies ist ein Thread-sicherer, ereignisgesteuerter Ansatz, der gleichzeitige Konstrukte auf hoher Ebene verwendet. Der einzige Nachteil besteht darin, dass der Verbraucher einen blockierenden Aufruf verwendet, um Nachrichten aus der Warteschlange zu nehmen. Mit anderen Worten, es wird ein Thread verschwendet.

Gibt es eine Möglichkeit, dies mit einem nicht blockierenden Ansatz zu implementieren?

Ja, indem Sie den Kanal verwenden Klasse aus System.Threading.Channels. Dies ist im Wesentlichen eine asynchrone Warteschlange.

In diesem Artikel zeige ich, wie man einen Kanal verwendet gleichzeitigen Producer/Consumer nicht blockierend zu implementieren.

1 – Erstellen Sie einen Verbraucher, der einen Kanal als asynchrone Warteschlange verwendet

Hinweis:Dies verwendet System.Threading.Channels.

public class StringReverser
{
	private readonly Channel<string> messageQueue;
	public StringReverser(Channel<string> messageQueue)
	{
		this.messageQueue = messageQueue;
	}
	public async Task StartProcessing(CancellationToken cancelToken)
	{
		await foreach(var message in messageQueue.Reader.ReadAllAsync(cancelToken))
		{
			var reversedString = new string(message.Reverse().ToArray());

			Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} reverse({message})=>{reversedString}");
		}
	}
	public async Task QueueForProcessing(string Message, CancellationToken cancelToken)
	{
		await messageQueue.Writer.WriteAsync(Message, cancelToken);
	}
}
Code language: C# (cs)

2 – Starten Sie den Verbraucher und beginnen Sie mit der Erstellung von Nachrichten

public static async Task Main(string[] args)
{
	var messageQueue = Channel.CreateUnbounded<string>();
	var messageReverser = new StringReverser(messageQueue);

	CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();


	messageReverser.StartProcessing(cancellationTokenSource.Token);

	while (true)
	{
		Console.WriteLine($"Thread={Thread.CurrentThread.ManagedThreadId} Write a sentence and see each word reversed: ");
		var msg = Console.ReadLine();
		Console.WriteLine("");

		foreach (var s in msg.Split())
		{
			await messageReverser.QueueForProcessing(s, cancellationTokenSource.Token);
		}

	}
}
Code language: C# (cs)

3 – Endergebnisse – Ausführen der Konsolen-App

Wenn ich dies ausführe, können Sie sehen, dass der Verbraucher nicht blockiert, da ich sonst keine Nachrichten in die Konsole eingeben könnte.

Beachten Sie außerdem, dass zuerst Thread 4 verwendet wird und am Ende zu Thread 5 gewechselt wird. Es verwendet Threadpool-Threads und sitzt dort nicht und verschwendet keinen dedizierten Thread. Dies ist ein entscheidender Vorteil dieses asynchronen, nicht blockierenden Ansatzes.