ASP.NET Core:registra i messaggi nel database in background

ASP.NET Core:registra i messaggi nel database in background

Stavo leggendo sull'accesso ad ASP.NET quando mi sono imbattuto in questa affermazione sull'accesso al database:

In questo articolo, mostrerò come implementare questa idea di registrazione del database in background. Per prima cosa, inizierò mostrando il design, quindi mostrerò il codice.

Progettazione

Possiamo implementare l'idea del logger di database in background di Microsoft utilizzando il modello consumatore/produttore con un servizio in background. Per assicurarci che si adatti bene, inseriremo in blocco i messaggi di log nel database.

Entrerò nei dettagli di questo design, ma prima ecco il diagramma del design:

Con il modello consumatore/produttore, uno o più produttori accodano i messaggi a una coda condivisa. Hai uno o più consumatori che eliminano i messaggi dalla coda condivisa e li elaborano. Nel nostro caso, avremo più produttori (tutto ciò che registra, per lo più controller) e un singolo consumatore.

In ASP.NET è possibile aggiungere servizi ospitati eseguiti in background. Questi sono indicati come servizi in background. Utilizzeremo un servizio in background per due scopi:conterrà la coda condivisa e fungerà da consumatore.

I produttori devono solo essere esposti a un metodo Log(). Non hanno bisogno di sapere che stanno utilizzando un servizio in background o che sta registrando nel database. Questo è il motivo per cui utilizziamo l'interfaccia ILoggerService.

Poiché possono esserci più produttori, possono esserci più messaggi di registro in arrivo contemporaneamente. L'esecuzione di molte singole istruzioni INSERT può ridurre le prestazioni del sistema. Invece, inseriremo in blocco i messaggi. Per eseguire l'inserimento in blocco, il consumatore dovrà essere in grado di leggere in batch dalla coda.

Codice

In questa sezione, mostrerò il codice per il design mostrato sopra. Lo realizzerò dall'esterno verso l'interno e per ultimo il servizio in background.

In tutto questo codice, userò Console.WriteLine(). Sto eseguendo il servizio con un'interfaccia console in modo da poter vedere facilmente cosa sta succedendo.

ILoggerService e un controller che lo utilizza

Per prima cosa, dobbiamo aggiungere ILoggerService. È consigliabile codificare in base alle interfacce anziché alle implementazioni. I produttori devono solo avere accesso al metodo Log(). Non hanno bisogno di sapere nulla dell'attuazione concreta.

public interface ILoggerService
{
	void Log(LogLevel logLevel, string message);
}
Code language: C# (cs)

Nota:non sto utilizzando ILogger integrato. Ha un'enorme quantità di metodi e non volevo doverli implementare nel servizio in background.

Ecco un esempio di controller che registra i messaggi. È necessario inserire la dipendenza ILoggerService.

[Route("[controller]")]
[ApiController]
public class RecipesController : ControllerBase
{
	private readonly ILoggerService Logger;
	public RecipesController(ILoggerService logger)
	{
		Logger = logger;
	}

	[HttpGet("{id}")]
	public string Get(int id)
	{
		Logger.Log(LogLevel.Debug, $"GET /Recipes/{id}");
		return "recipe";
	}
}
Code language: C# (cs)

Repository log per inserimento in blocco

Vogliamo inserire in blocco i messaggi di registro. Ogni volta che interagisci con un database, è una buona idea implementare il modello di repository. Con questo modello, incapsula la logica di interazione del database in una classe di repository.

Innanzitutto, dobbiamo aggiungere la classe del modello LogMessage:

public class LogMessage
{
	public int ThreadId { get; set; }
	public string Message { get; set; }
	public DateTimeOffset Timestamp { get; set; }
}
Code language: C# (cs)

Quindi, poiché vogliamo usare SqlBulkCopy e stiamo usando .NET Core, dovremo installare il pacchetto nuget System.Data.SqlClient. Fallo eseguendo il comando seguente (Nota:questo sta usando Visualizza> Altre finestre> Console di gestione pacchetti) :

Install-Package System.Data.SqlClient
Code language: PowerShell (powershell)

Ora possiamo implementare la classe LogRepository. Eseguirà un inserimento in blocco utilizzando SqlBulkCopy.

Ogni volta che stai interagendo con una dipendenza esterna, come un database, è una buona idea renderlo tollerante ai guasti. In questo caso, proveremo a renderlo resiliente rilevando alcune eccezioni SQL transitorie e riprovando l'inserimento in blocco alcune volte.

Ecco la classe LogRepository:

using System.Data;
using System.Data.SqlClient;

public class LogRepository : ILogRepository
{
	private const string TABLE = "Log";
	private readonly string ConnectionString;
	private readonly HashSet<int> transientSqlErrors = new HashSet<int>()
	{
		-2, 258, 4060
	};
	private const int MAX_RETRIES = 3;
	private const int RETRY_SECONDS = 5;
	public LogRepository(string connectionString)
	{
		ConnectionString = connectionString;
	}
	public async Task Insert(List<LogMessage> logMessages)
	{
		DataTable table = new DataTable();
		table.TableName = TABLE;

		table.Columns.Add(nameof(LogMessage.Timestamp), typeof(DateTimeOffset));
		table.Columns.Add(nameof(LogMessage.Message), typeof(string));
		table.Columns.Add(nameof(LogMessage.ThreadId), typeof(int));
		foreach (var logMessage in logMessages)
		{
			var row = table.NewRow();

			row[nameof(LogMessage.Timestamp)] = logMessage.Timestamp;
			row[nameof(LogMessage.Message)] = logMessage.Message ?? (object)DBNull.Value;
			row[nameof(LogMessage.ThreadId)] = logMessage.ThreadId;

			table.Rows.Add(row);
		}

		await BulkInsertWithRetries(table);
	}

	private async Task BulkInsertWithRetries(DataTable table)
	{
		int attempts = 1;
		while (true)
		{
			try
			{
				using (var sqlBulkCopy = new SqlBulkCopy(ConnectionString))
				{
					sqlBulkCopy.DestinationTableName = table.TableName;
					await sqlBulkCopy.WriteToServerAsync(table);
					return;
				}
			}
			catch (SqlException sqlEx)
			when (transientSqlErrors.Contains(sqlEx.Number) && attempts <= MAX_RETRIES)
			{
				Console.WriteLine($"Transient SQL error. Retrying in {RETRY_SECONDS} seconds");
				await Task.Delay(TimeSpan.FromSeconds(RETRY_SECONDS));
				attempts++;
			}
		}
	}
}
Code language: C# (cs)

Nota:in questo scenario potremmo utilizzare un approccio con parametro con valori di tabella (TVP) invece di eseguire un inserimento collettivo. L'obiettivo principale qui è quello di adottare un approccio basato su set in modo da non inviare spam al sistema con molti singoli inserti. Entrambi gli approcci (TVP e inserimento collettivo) funzionano bene per questo.

Servizio in background con una coda di registro

Infine possiamo aggiungere la classe del servizio in background e chiamarla DatabaseLoggerService. Per farlo funzionare come servizio in background, dobbiamo implementare BackgroundService.

Implementeremo il modello consumatore/produttore aggiungendo una coda asincrona da System.Threading.Channels. Il metodo ILoggerService.Log() consentirà ai produttori di accodare i messaggi di registro. Implementeremo il ciclo del consumatore in ExecuteAsync() (un metodo di BackgroundService).

Ecco la classe DatabaseLoggerService:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Channels;

public class DatabaseLoggerService : BackgroundService, ILoggerService
{
	private readonly Channel<LogMessage> logMessageQueue;
	private readonly IHostApplicationLifetime HostApplicationLifetime;
	private const int MAX_BATCH_SIZE = 10;
	private readonly ILogRepository LogRepository;
	public DatabaseLoggerService(ILogRepository logRepository, IHostApplicationLifetime hostApplicationLifetime)
	{
		logMessageQueue = Channel.CreateUnbounded<LogMessage>();
		LogRepository = logRepository;
		HostApplicationLifetime = hostApplicationLifetime;
	}
	public async override Task StopAsync(CancellationToken cancellationToken)
	{
		await base.StopAsync(cancellationToken);
	}
	protected async override Task ExecuteAsync(CancellationToken stoppingToken)
	{
		while(!stoppingToken.IsCancellationRequested)
		{

			try
			{
				Console.WriteLine("Waiting for log messages");
				var batch = await GetBatch(stoppingToken);

				Console.WriteLine($"Got a batch with {batch.Count}(s) log messages. Bulk inserting them now.");

				//Let non-retryable errors from this bubble up and crash the service
				await LogRepository.Insert(batch);
			}
			catch (TaskCanceledException)
			{
				Console.WriteLine("Stopping token was canceled, which means the service is shutting down.");
				return;
			}
			catch (Exception ex)
			{
				Console.WriteLine($"Fatal exception in database logger. Crashing service. Error={ex}");
				HostApplicationLifetime.StopApplication();
				return;
			}
		}
	}
	public void Log(LogLevel logLevel, string message)
	{
		//The reason to use Writer.TryWrite() is because it's synchronous.
		//We want the logging to be as fast as possible for the client, so
		//we don't want the overhead of async

		//Note: We're using an unbounded Channel, so TryWrite() *should* only fail 
		//if we call writer.Complete().
		//Guard against it anyway


		var logMessage = new LogMessage()
		{
			Message = message,
			ThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId,
			Timestamp = DateTimeOffset.Now
		};

		if (!logMessageQueue.Writer.TryWrite(logMessage))
		{
			throw new InvalidOperationException("Failed to write the log message");
		}
	}
	private async Task<List<LogMessage>> GetBatch(CancellationToken cancellationToken)
	{
		await logMessageQueue.Reader.WaitToReadAsync(cancellationToken);

		var batch = new List<LogMessage>();

		while (batch.Count < MAX_BATCH_SIZE && logMessageQueue.Reader.TryRead(out LogMessage message))
		{
			batch.Add(message);
		}

		return batch;
	}
}
Code language: C# (cs)

I produttori chiameranno Log() in modo sincrono. Questo è veloce perché tutto ciò che sta facendo è mettere in coda il messaggio.

Il ciclo del consumatore legge un batch di messaggi dalla coda e quindi attende l'inserimento in blocco. Non suggerirei il fuoco e dimenticare l'inserimento di massa. Per uno, in attesa che si completi, agisce come un meccanismo di limitazione. Stai facendo un solo inserimento collettivo alla volta. In secondo luogo, semplifica la gestione degli errori.

Si noti che questo sta chiamando StopApplication() se viene visualizzata un'eccezione da LogRepository. Come notato nella sezione LogRepository, sta riprovando alcune volte se sono presenti eccezioni SQL transitorie. Per qualsiasi altro tipo di errore, o se supera il numero massimo di tentativi, verrà generato. Ciò attiverà un arresto regolare dell'intero servizio, non solo un arresto anomalo del servizio in background. Ci sono sicuramente altri modi possibili per gestirlo, come la registrazione in un file di fallback, ma ho deciso di adottare l'approccio più semplice partendo dal presupposto che questa registrazione sia fondamentale e che il servizio dovrebbe interrompersi se non riesce a registrarsi.

Nota:TryWrite() potrebbe non riuscire, ma è altamente improbabile (se non impossibile) quando si utilizza un canale illimitato (e nulla sta mettendo il canale in uno stato completato).

Registrazione dei servizi

Devi registrare DatabaseLoggerService come singleton (in modo che possa essere iniettata la dipendenza nei controller e in qualsiasi altro produttore) e anche come servizio ospitato in modo che venga eseguito come servizio in background.

public class Startup
{
	//rest of class
	public void ConfigureServices(IServiceCollection services)
	{
		//rest of method

		services.AddSingleton<ILogRepository, LogRepository>(_ 
			=> new LogRepository(Configuration.GetConnectionString("Default")));

		services.AddSingleton<ILoggerService, DatabaseLoggerService>();

		services.AddHostedService(sp => sp.GetService<ILoggerService>() as DatabaseLoggerService);
	   
	}
}
Code language: C# (cs)

Stringa di connessione predefinita in appsettings.json

Possiamo aggiungere un segnaposto per la stringa di connessione in appsettings.json, quindi aggiungere la stringa di connessione reale a un segreto utente.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConnectionStrings": {
    "Default": "The connection string is defined in the user secrets file"
  }
}

Code language: JSON / JSON with Comments (json)

Eseguilo

Per testarlo e vederlo in azione, esegui il servizio ASP.NET e invia richieste simultanee. Controllare la tabella Log nel database per verificare che abbia inserito i messaggi.

Per vedere il funzionamento degli inserimenti in blocco, utilizza il seguente client di prova che invia tonnellate di richieste simultanee:

static async Task Main(string[] args)
{
	var httpClient = new HttpClient();
	var go = new ManualResetEventSlim();


	for (int i = 1; i <= 77; i++)
	{
		var num = i; //capture for closure
		Task.Run(async () =>
		{
			Console.WriteLine($"Num {num} waiting");
			go.Wait();
			Console.WriteLine($"Num {num} going");
			var response = await httpClient.GetAsync($"https://localhost:12345/Recipes/{num}");
			response.EnsureSuccessStatusCode();
			
			Console.WriteLine($"Num {num} done");

		});
	}

	go.Set();

	Console.ReadLine();

}
Code language: C# (cs)

Nota:ho provato a inviare richieste simultanee con Postman, ma è stato troppo lento.

Oltre a vedere il funzionamento degli inserimenti in blocco, puoi indurre un'eccezione SQL transitoria per vedere che esegue nuovi tentativi. Il modo più semplice per farlo è impostare manualmente il database offline, attendere che segnali l'errore transitorio e quindi ripristinare manualmente il database online.

Ecco un esempio di come appare quando esegui questo:

CommandLoop starting
Waiting for log messages
Got a batch with 7(s) log messages. Bulk inserting them now.
Transient SQL error. Retrying in 5 seconds
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messagesCode language: plaintext (plaintext)

Il consumatore legge da 1 a 10 messaggi dalla coda. Se sono disponibili 10 messaggi, ne leggerà tutti e 10. In caso contrario, ne leggerà il maggior numero possibile. In primo luogo, nota che legge solo 7 messaggi. Questo perché c'erano solo 7 messaggi disponibili in coda in quel momento. Dopodiché, è stato in grado di leggere 10 messaggi ogni volta.

Notare inoltre che ha rilevato l'errore SQL transitorio, ha atteso 5 secondi e ha riprovato. Ci è riuscito quando ha riprovato.

Codice sorgente

Il codice sorgente completo per il database logger in background mostrato in questo articolo è disponibile qui:https://github.com/makolyte/aspdotnet-background-dblogger