我有一个 WebAPI,它也应该从 RabbitMQ 接收消息。我用了this教程,因为我知道有时 IIS 喜欢终止长时间运行的任务(但尚未在服务器上测试它,也许它不起作用)。我有一个处理通过 RabbitMQ 接收的消息的服务。我遇到的第一个问题 - 我无法将其注入BackgroundService
类,所以我用了IServiceScopeFactory
。现在,我必须使用来自两个队列的消息,据我了解,最佳实践是为此使用两个通道。但处理是在一项服务中完成的。后台服务:
public class ConsumeRabbitMQHostedService : BackgroundService
{
private IConnection _connection;
private IModel _firstChannel;
private IModel _secondChannel;
private RabbitConfigSection _rabbitConfig;
public IServiceScopeFactory _serviceScopeFactory;
public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
{
_rabbitConfig = rabbitConfig.Value;
_serviceScopeFactory = serviceScopeFactory;
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };
_connection = factory.CreateConnection();
_firstChannel = _connection.CreateModel();
_firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
_firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_firstChannel.BasicQos(0, 1, false);
_secondChannel = _connection.CreateModel();
_secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
_secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
_secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
_secondChannel.BasicQos(0, 1, false);
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var firstConsumer = new EventingBasicConsumer(_firstChannel);
var secondConsumer = new EventingBasicConsumer(_secondChannel);
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
firstConsumer.Received += (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleFirstMessage(content, scoped);
_firstChannel.BasicAck(ea.DeliveryTag, false);
};
firstConsumer.Shutdown += OnConsumerShutdown;
firstConsumer.Registered += OnConsumerRegistered;
firstConsumer.Unregistered += OnConsumerUnregistered;
firstConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
}
using (var scope = _serviceScopeFactory.CreateScope())
{
IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
secondConsumer.Received += (ch, ea) =>
{
// received message
var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());
// handle the received message
HandleSecondMessage(content, scoped);
_secondChannel.BasicAck(ea.DeliveryTag, false);
};
secondConsumer.Shutdown += OnConsumerShutdown;
secondConsumer.Registered += OnConsumerRegistered;
secondConsumer.Unregistered += OnConsumerUnregistered;
secondConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
_secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
}
return Task.CompletedTask;
}
private void HandleFirstMessage(string content, IIntegrationService integrationService)
{
List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
integrationService.ImportFirst(dataToImport);
}
private void HandleSecondMessage(string content, IIntegrationService integrationService)
{
List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
integrationService.ImportSecond(importData);
}
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
public override void Dispose()
{
_firstChannel.Close();
_connection.Close();
base.Dispose();
}
}
在服务中我得到
System.ObjectDisposeException:“无法访问已处置的上下文实例。导致此错误的一个常见原因是处置从依赖项注入解析的上下文实例,然后尝试在应用程序的其他位置使用相同的上下文实例。如果您在上下文实例上调用“Dispose”或将其包装在 using 语句中,则可能会发生这种情况。如果您使用依赖项注入,则应该让依赖项注入容器负责处理上下文实例。
对象名称:“IntegrationDbContext”。
DbContext
被注入IIntegrationService
。如果我了解发生了什么,服务的两个实例(甚至一个)共享DbContext
,当其中一个完成时,它会处置DbContext
。我尝试不创建两个实例(所有代码都在一个实例中)using
),尝试制作IIntegrationService
瞬态,尝试异步执行所有操作(这是初始版本,使其同步测试)-仍然是相同的错误。我应该在这里做什么?这是正确的方法吗?
更新1。 ConfigureServices
in Startup
:
public void ConfigureServices(IServiceCollection services)
{
var rabbitConfigSection =
Configuration.GetSection("Rabbit");
services.Configure<RabbitConfigSection>(rabbitConfigSection);
services.AddDbContext<SUNDbContext>(options =>
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));
services.AddCors();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Title = "My API",
Version = "v1"
});
});
services.AddRabbit(Configuration);
services.AddHostedService<ConsumeRabbitMQHostedService>();
services.AddControllers();
services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
}