ASP.NET – Endpoint SSE asincrono

ASP.NET – Endpoint SSE asincrono

Gli eventi inviati dal server (SSE) consentono a un client di iscriversi ai messaggi su un server. Il server invia i nuovi messaggi al client non appena si verificano. Questa è un'alternativa al client che esegue il polling del server per i nuovi messaggi.

In questo articolo mostrerò come implementare il sistema di messaggistica mostrato nel diagramma seguente. Questo utilizza un endpoint SSE asincrono per inoltrare i messaggi da una coda di messaggi al client.

Prima di passare all'implementazione, indicherò le basi di un endpoint SSE asincrono usando questo semplice esempio di seguito. Ci sono tre punti chiave (li metto come commenti nel codice qui sotto).

[ApiController]
public class MessagesController : ControllerBase
{
	[HttpGet]
	[Route("messages/sse/{id}")]
	public async Task SimpleSSE(string id)
	{
		//1. Set content type
		Response.ContentType = "text/event-stream";
		Response.StatusCode = 200;

		StreamWriter streamWriter = new StreamWriter(Response.Body);

		while(!HttpContext.RequestAborted.IsCancellationRequested)
		{
			//2. Await something that generates messages
			await Task.Delay(5000, HttpContext.RequestAborted);
			
			//3. Write to the Response.Body stream
			await streamWriter.WriteLineAsync($"{DateTime.Now} Looping");
			await streamWriter.FlushAsync();
			
		}
	}
}
Code language: C# (cs)

Ci sono tre punti chiave per configurare un endpoint SSE:

  1. Imposta Response.ContentType ="text/event-stream".
  2. Attendere qualcosa che generi messaggi in modo asincrono.
  3. Per inviare effettivamente il messaggio sullo stream SSE, usa semplicemente StreamWriter.WriteAsync + FlushAsync sullo stream Response.Body.

Ora, diamo un'occhiata a come implementare il sistema di messaggistica che utilizza un endpoint SSE asincrono.

1 – MessagesController – Aggiungi un controller con un endpoint SSE asincrono

Il codice seguente sta configurando un endpoint SSE e attende i messaggi da una coda di messaggi asincrona. Quando arrivano i messaggi, li scrive nel flusso Response.Body.

[ApiController]
public class MessagesController : ControllerBase
{
	private readonly IMessageQueue MessageQueue;
	public MessagesController(IMessageQueue messageQueue)
	{
		MessageQueue = messageQueue;
	}
	[HttpGet]
	[Route("messages/subscribe/{id}")]
	public async Task Subscribe(string id)
	{
		Response.ContentType = "text/event-stream";
		Response.StatusCode = 200;

		StreamWriter streamWriter = new StreamWriter(Response.Body);

		MessageQueue.Register(id);
		
		try
		{
			await MessageQueue.EnqueueAsync(id, $"Subscribed to id {id}", HttpContext.RequestAborted);

			await foreach (var message in MessageQueue.DequeueAsync(id, HttpContext.RequestAborted))
			{
				await streamWriter.WriteLineAsync($"{DateTime.Now} {message}");
				await streamWriter.FlushAsync();
			}
		}
		catch(OperationCanceledException)
		{
			//this is expected when the client disconnects the connection
		}
		catch(Exception)
		{
			Response.StatusCode = 400;
		}
		finally
		{
			MessageQueue.Unregister(id);
		}
	}
}
Code language: C# (cs)

2 – MessagesController – Aggiungi un endpoint per la pubblicazione di messaggi

In MessagesController aggiungi il seguente endpoint:

[HttpPost]
[Route("messages/{id}")]

public async Task<IActionResult> PostMessage(string id, string message)
{
	try
	{
		await MessageQueue.EnqueueAsync(id, message, HttpContext.RequestAborted);
		return Ok();
	}
	catch(Exception ex)
	{
		return BadRequest(ex.Message);
	}
}
Code language: C# (cs)

Un endpoint SSE è inutile se non hai qualcosa che genera nuovi messaggi. Per mantenere questa implementazione semplice, ma realistica, sto utilizzando un endpoint che consente di inviare messaggi alla coda dei messaggi. Poiché l'endpoint SSE è in attesa di messaggi da questa coda, non appena pubblichi il messaggio, rimuoverà il messaggio dalla coda e lo invierà al client SSE.

3 – MessageQueue – Crea una coda di messaggi asincrona

Aggiungi interfaccia

public interface IMessageQueue
{
	void Register(string id);
	void Unregister(string id);
	IAsyncEnumerable<string> DequeueAsync(string id, CancellationToken cancelToken);
	Task EnqueueAsync(string id, string message, CancellationToken cancelToken);

}
Code language: C# (cs)

Implementa MessageQueue asincrona

Sto usando System.Threading.Channels come coda simultanea asincrona. Fondamentalmente quando un abbonato si registra, creo un nuovo canale.

MessagesController.PostMessage(…) accoda i messaggi, mentre l'endpoint SSE li rimuove dalla coda.

public class MessageQueue : IMessageQueue
{
	private ConcurrentDictionary<string, Channel<string>> clientToChannelMap;
	public MessageQueue()
	{
		clientToChannelMap = new ConcurrentDictionary<string, Channel<string>>();
	}

	public IAsyncEnumerable<string> DequeueAsync(string id, CancellationToken cancelToken)
	{
		if (clientToChannelMap.TryGetValue(id, out Channel<string> channel))
		{
			return channel.Reader.ReadAllAsync(cancelToken);
		}
		else
		{
			throw new ArgumentException($"Id {id} isn't registered");
		}
	}

	public async Task EnqueueAsync(string id, string message, CancellationToken cancelToken)
	{
		if(clientToChannelMap.TryGetValue(id, out Channel<string> channel))
		{
			await channel.Writer.WriteAsync(message, cancelToken);
		}
	}

	public void Register(string id)
	{
		if(!clientToChannelMap.TryAdd(id, Channel.CreateUnbounded<string>()))
		{
			throw new ArgumentException($"Id {id} is already registered");
		}
	}

	public void Unregister(string id)
	{
		clientToChannelMap.TryRemove(id, out _);
	}

	private Channel<string> CreateChannel()
	{
		return Channel.CreateUnbounded<string>();
	}
}
Code language: C# (cs)

Registra IMessageQueue nella classe Startup

Per inserire la dipendenza IMessageQueue in MessagesController, devo registrarla in ConfigureServices(...) nella classe Startup.

public class Startup
{
	//other methods...

	public void ConfigureServices(IServiceCollection services)
	{
		services.AddSingleton<IMessageQueue, MessageQueue>();
		
		//other service registrations
	}
}
Code language: C# (cs)

Ora, quando un registro entra in MessagesController, passerà nel singleton IMessageQueue.

4 – Risultati finali:iscriviti utilizzando più client e invia loro messaggi

Fare riferimento al seguente articolo su come creare un'app della console SSE Client.

Ho avviato l'API Web in IISExpress, quindi ho lanciato cinque client SSE. Ogni cliente è iscritto a un ID diverso.

Sto usando l'interfaccia utente generata da Swagger per pubblicare messaggi.

Qui puoi vedere tutti i client che si connettono, ricevono il messaggio di iscrizione iniziale, quindi ricevono i messaggi che ho pubblicato.