ASP.NET Core:registra mensajes en la base de datos en segundo plano

ASP.NET Core:registra mensajes en la base de datos en segundo plano

Estaba leyendo sobre cómo iniciar sesión en ASP.NET cuando encontré esta declaración sobre cómo iniciar sesión en la base de datos:

En este artículo, mostraré cómo implementar esta idea de registrador de base de datos en segundo plano. Primero, comenzaré mostrando el diseño y luego mostraré el código.

Diseño

Podemos implementar la idea del registrador de bases de datos en segundo plano de Microsoft utilizando el patrón consumidor/productor con un servicio en segundo plano. Para asegurarnos de que esto escala bien, insertaremos mensajes de registro de forma masiva en la base de datos.

Entraré en detalles sobre este diseño, pero primero aquí está el diagrama de diseño:

Con el patrón de consumidor/productor, tiene uno o más productores que envían mensajes a una cola compartida. Tiene uno o más consumidores sacando mensajes de la cola compartida y procesándolos. En nuestro caso, tendremos varios productores (cualquier cosa que registre, en su mayoría controladores) y un solo consumidor.

En ASP.NET, puede agregar servicios alojados que se ejecutan en segundo plano. Estos se conocen como servicios en segundo plano. Usaremos un servicio en segundo plano con dos propósitos:contendrá la cola compartida y actuará como consumidor.

Los productores solo necesitan estar expuestos a un método Log(). No necesitan saber que están utilizando un servicio en segundo plano o que está iniciando sesión en la base de datos. Es por eso que estamos usando la interfaz ILoggerService.

Dado que puede haber varios productores, pueden llegar varios mensajes de registro al mismo tiempo. Ejecutar muchas instrucciones INSERT individuales puede degradar el rendimiento del sistema. En su lugar, insertaremos los mensajes de forma masiva. Para realizar una inserción masiva, el consumidor deberá poder leer por lotes desde la cola.

Código

En esta sección, mostraré el código para el diseño que se muestra arriba. Construiré esto de afuera hacia adentro y construiré el servicio en segundo plano al final.

A lo largo de este código, usaré Console.WriteLine(). Estoy ejecutando el servicio con una interfaz de consola para poder ver fácilmente lo que está pasando.

ILoggerService y un controlador que lo usa

Lo primero es lo primero, necesitamos agregar ILoggerService. Es una buena práctica codificar contra interfaces en lugar de implementaciones. Los productores solo necesitan tener acceso al método Log(). No necesitan saber nada sobre la implementación concreta.

public interface ILoggerService
{
	void Log(LogLevel logLevel, string message);
}
Code language: C# (cs)

Nota:No estoy usando el ILogger integrado. Tiene una gran cantidad de métodos y no quería tener que implementarlos en el servicio en segundo plano.

Aquí hay un ejemplo de un controlador que registra mensajes. Necesita que se inyecte la dependencia de ILoggerService.

[Route("[controller]")]
[ApiController]
public class RecipesController : ControllerBase
{
	private readonly ILoggerService Logger;
	public RecipesController(ILoggerService logger)
	{
		Logger = logger;
	}

	[HttpGet("{id}")]
	public string Get(int id)
	{
		Logger.Log(LogLevel.Debug, $"GET /Recipes/{id}");
		return "recipe";
	}
}
Code language: C# (cs)

Repositorio de registros para inserción masiva

Queremos insertar masivamente los mensajes de registro. Siempre que esté interactuando con una base de datos, es una buena idea implementar el patrón de repositorio. Con este patrón, encapsula la lógica de interacción de la base de datos en una clase de repositorio.

Primero, necesitamos agregar la clase de modelo LogMessage:

public class LogMessage
{
	public int ThreadId { get; set; }
	public string Message { get; set; }
	public DateTimeOffset Timestamp { get; set; }
}
Code language: C# (cs)

A continuación, dado que queremos usar SqlBulkCopy y estamos usando .NET Core, necesitaremos instalar el paquete nuget System.Data.SqlClient. Hágalo ejecutando el siguiente comando (Nota:esto es usando Ver> Otras ventanas> Consola del administrador de paquetes) :

Install-Package System.Data.SqlClient
Code language: PowerShell (powershell)

Ahora podemos implementar la clase LogRepository. Hará una inserción masiva usando SqlBulkCopy.

Siempre que esté interactuando con una dependencia externa, como una base de datos, es una buena idea hacerla tolerante a fallas. En este caso, intentaremos que sea resistente al detectar algunas excepciones de SQL transitorias y volver a intentar la inserción masiva varias veces.

Aquí está la clase LogRepository:

using System.Data;
using System.Data.SqlClient;

public class LogRepository : ILogRepository
{
	private const string TABLE = "Log";
	private readonly string ConnectionString;
	private readonly HashSet<int> transientSqlErrors = new HashSet<int>()
	{
		-2, 258, 4060
	};
	private const int MAX_RETRIES = 3;
	private const int RETRY_SECONDS = 5;
	public LogRepository(string connectionString)
	{
		ConnectionString = connectionString;
	}
	public async Task Insert(List<LogMessage> logMessages)
	{
		DataTable table = new DataTable();
		table.TableName = TABLE;

		table.Columns.Add(nameof(LogMessage.Timestamp), typeof(DateTimeOffset));
		table.Columns.Add(nameof(LogMessage.Message), typeof(string));
		table.Columns.Add(nameof(LogMessage.ThreadId), typeof(int));
		foreach (var logMessage in logMessages)
		{
			var row = table.NewRow();

			row[nameof(LogMessage.Timestamp)] = logMessage.Timestamp;
			row[nameof(LogMessage.Message)] = logMessage.Message ?? (object)DBNull.Value;
			row[nameof(LogMessage.ThreadId)] = logMessage.ThreadId;

			table.Rows.Add(row);
		}

		await BulkInsertWithRetries(table);
	}

	private async Task BulkInsertWithRetries(DataTable table)
	{
		int attempts = 1;
		while (true)
		{
			try
			{
				using (var sqlBulkCopy = new SqlBulkCopy(ConnectionString))
				{
					sqlBulkCopy.DestinationTableName = table.TableName;
					await sqlBulkCopy.WriteToServerAsync(table);
					return;
				}
			}
			catch (SqlException sqlEx)
			when (transientSqlErrors.Contains(sqlEx.Number) && attempts <= MAX_RETRIES)
			{
				Console.WriteLine($"Transient SQL error. Retrying in {RETRY_SECONDS} seconds");
				await Task.Delay(TimeSpan.FromSeconds(RETRY_SECONDS));
				attempts++;
			}
		}
	}
}
Code language: C# (cs)

Nota:Podríamos usar un enfoque de parámetro con valores de tabla (TVP) en este escenario en lugar de hacer una inserción masiva. El objetivo principal aquí es hacer un enfoque basado en conjuntos para que no estemos enviando spam al sistema con muchas inserciones individuales. Ambos enfoques (TVP e inserción masiva) funcionan bien para esto.

Servicio en segundo plano con una cola de registro

Finalmente, podemos agregar la clase de servicio en segundo plano y llamarlo DatabaseLoggerService. Para hacer que esto se ejecute como un servicio en segundo plano, tenemos que implementar BackgroundService.

Implementaremos el patrón de consumidor/productor agregando una cola asíncrona de System.Threading.Channels. El método ILoggerService.Log() permitirá a los productores poner en cola los mensajes de registro. Implementaremos el ciclo del consumidor en ExecuteAsync() (un método de BackgroundService).

Aquí está la clase DatabaseLoggerService:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Channels;

public class DatabaseLoggerService : BackgroundService, ILoggerService
{
	private readonly Channel<LogMessage> logMessageQueue;
	private readonly IHostApplicationLifetime HostApplicationLifetime;
	private const int MAX_BATCH_SIZE = 10;
	private readonly ILogRepository LogRepository;
	public DatabaseLoggerService(ILogRepository logRepository, IHostApplicationLifetime hostApplicationLifetime)
	{
		logMessageQueue = Channel.CreateUnbounded<LogMessage>();
		LogRepository = logRepository;
		HostApplicationLifetime = hostApplicationLifetime;
	}
	public async override Task StopAsync(CancellationToken cancellationToken)
	{
		await base.StopAsync(cancellationToken);
	}
	protected async override Task ExecuteAsync(CancellationToken stoppingToken)
	{
		while(!stoppingToken.IsCancellationRequested)
		{

			try
			{
				Console.WriteLine("Waiting for log messages");
				var batch = await GetBatch(stoppingToken);

				Console.WriteLine($"Got a batch with {batch.Count}(s) log messages. Bulk inserting them now.");

				//Let non-retryable errors from this bubble up and crash the service
				await LogRepository.Insert(batch);
			}
			catch (TaskCanceledException)
			{
				Console.WriteLine("Stopping token was canceled, which means the service is shutting down.");
				return;
			}
			catch (Exception ex)
			{
				Console.WriteLine($"Fatal exception in database logger. Crashing service. Error={ex}");
				HostApplicationLifetime.StopApplication();
				return;
			}
		}
	}
	public void Log(LogLevel logLevel, string message)
	{
		//The reason to use Writer.TryWrite() is because it's synchronous.
		//We want the logging to be as fast as possible for the client, so
		//we don't want the overhead of async

		//Note: We're using an unbounded Channel, so TryWrite() *should* only fail 
		//if we call writer.Complete().
		//Guard against it anyway


		var logMessage = new LogMessage()
		{
			Message = message,
			ThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId,
			Timestamp = DateTimeOffset.Now
		};

		if (!logMessageQueue.Writer.TryWrite(logMessage))
		{
			throw new InvalidOperationException("Failed to write the log message");
		}
	}
	private async Task<List<LogMessage>> GetBatch(CancellationToken cancellationToken)
	{
		await logMessageQueue.Reader.WaitToReadAsync(cancellationToken);

		var batch = new List<LogMessage>();

		while (batch.Count < MAX_BATCH_SIZE && logMessageQueue.Reader.TryRead(out LogMessage message))
		{
			batch.Add(message);
		}

		return batch;
	}
}
Code language: C# (cs)

Los productores llamarán a Log() sincrónicamente. Esto es rápido porque todo lo que hace es poner en cola el mensaje.

El ciclo del consumidor lee un lote de mensajes de la cola y luego espera la inserción masiva. No sugeriría disparar y olvidar la inserción masiva. Por un lado, esperar a que se complete actúa como un mecanismo de estrangulamiento. Solo estás haciendo una inserción masiva a la vez. En segundo lugar, simplifica el manejo de errores.

Tenga en cuenta que esto está llamando a StopApplication() si surge una excepción de LogRepository. Como se indicó en la sección LogRepository, se vuelve a intentar varias veces si hay excepciones de SQL transitorias. Para cualquier otro tipo de error, o si excede el número máximo de reintentos, se lanzará. Esto activará un apagado ordenado de todo el servicio, no solo bloqueará el servicio en segundo plano. Definitivamente, hay otras formas posibles de manejar esto, como iniciar sesión en un archivo de respaldo, pero decidí hacer el enfoque más simple con el supuesto de que este registro es crítico y el servicio debería detenerse si no se registra.

Nota:TryWrite() podría fallar, pero es muy poco probable (si no imposible) cuando se usa un canal ilimitado (y nada pone el canal en un estado completo).

Registro de los servicios

Debe registrar DatabaseLoggerService como singleton (para que se pueda inyectar dependencia a los controladores y cualquier otro productor), y también como un servicio alojado para que se ejecute como un servicio en segundo plano.

public class Startup
{
	//rest of class
	public void ConfigureServices(IServiceCollection services)
	{
		//rest of method

		services.AddSingleton<ILogRepository, LogRepository>(_ 
			=> new LogRepository(Configuration.GetConnectionString("Default")));

		services.AddSingleton<ILoggerService, DatabaseLoggerService>();

		services.AddHostedService(sp => sp.GetService<ILoggerService>() as DatabaseLoggerService);
	   
	}
}
Code language: C# (cs)

Cadena de conexión predeterminada en appsettings.json

Podemos agregar un marcador de posición para la cadena de conexión en appsettings.json y luego agregar la cadena de conexión real como un secreto de usuario.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConnectionStrings": {
    "Default": "The connection string is defined in the user secrets file"
  }
}

Code language: JSON / JSON with Comments (json)

Ejecutar

Para probar esto y verlo en acción, ejecute el servicio ASP.NET y envíe solicitudes simultáneas. Verifique la tabla de registro en la base de datos para verificar que insertó los mensajes.

Para ver cómo funcionan las inserciones masivas, utilice el siguiente cliente de prueba que envía toneladas de solicitudes simultáneas:

static async Task Main(string[] args)
{
	var httpClient = new HttpClient();
	var go = new ManualResetEventSlim();


	for (int i = 1; i <= 77; i++)
	{
		var num = i; //capture for closure
		Task.Run(async () =>
		{
			Console.WriteLine($"Num {num} waiting");
			go.Wait();
			Console.WriteLine($"Num {num} going");
			var response = await httpClient.GetAsync($"https://localhost:12345/Recipes/{num}");
			response.EnsureSuccessStatusCode();
			
			Console.WriteLine($"Num {num} done");

		});
	}

	go.Set();

	Console.ReadLine();

}
Code language: C# (cs)

Nota:intenté enviar solicitudes simultáneas con Postman, pero fue demasiado lento.

Además de ver el funcionamiento de las inserciones masivas, puede inducir una excepción de SQL transitoria para ver cómo se realizan los reintentos. La forma más sencilla de hacerlo es configurar la base de datos fuera de línea manualmente, esperar a que informe el error transitorio y luego volver a configurar la base de datos en línea manualmente.

Aquí hay un ejemplo de cómo se ve cuando ejecutas esto:

CommandLoop starting
Waiting for log messages
Got a batch with 7(s) log messages. Bulk inserting them now.
Transient SQL error. Retrying in 5 seconds
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messages
Got a batch with 10(s) log messages. Bulk inserting them now.
Waiting for log messagesCode language: plaintext (plaintext)

El consumidor lee entre 1 y 10 mensajes de la cola. Si hay 10 mensajes disponibles, leerá los 10. De lo contrario, leerá tantos como pueda. En primera instancia, observe que solo leyó 7 mensajes. Eso es porque solo había 7 mensajes disponibles en la cola en ese momento. Después de eso, pudo leer 10 mensajes cada vez.

Observe también que detectó el error de SQL transitorio, esperó 5 segundos y volvió a intentarlo. Tuvo éxito cuando lo volvió a intentar.

Código fuente

El código fuente completo para el registrador de la base de datos en segundo plano que se muestra en este artículo está disponible aquí:https://github.com/makolyte/aspdotnet-background-dblogger