我正在创建一个以无限循环运行的消费者,以从队列中读取消息。我正在寻找有关如何在无限循环中恢复 abd continue 的建议/示例代码,即使存在网络中断也是如此。消费者必须保持运行,因为它将作为 WindowsService 安装。
1)有人可以解释一下如何正确使用这些设置吗?它们之间有什么区别?
NetworkRecoveryInterval
AutomaticRecoveryEnabled
RequestedHeartbeat
2)请参阅我当前为消费者提供的示例代码。我正在使用 .Net RabbitMQ 客户端 v3.5.6。
上述设置将如何为我进行“恢复”?
例如Consumer.Queue.Dequeue 会阻塞直到恢复吗?
这似乎不对
所以...
我必须手动为此编写代码吗?例如Consumer.Queue.Dequeue 是否会抛出异常,我必须检测并手动重新创建我的连接、通道和消费者?或者只是消费者,因为“自动恢复”将为我恢复频道?
这是否意味着我应该将消费者创建移到 while 循环内?频道创建怎么样?以及连接的创建?
3)假设我必须手动执行一些恢复代码,是否有事件回调(以及如何注册它们)来告诉我存在网络问题?
Thanks!
public void StartConsumer(string queue)
{
using (IModel channel = this.Connection.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
const bool noAck = false;
channel.BasicConsume(queue, noAck, consumer);
// do I need these conditions? or should I just do while(true)???
while (channel.IsOpen &&
Connection.IsOpen &&
consumer.IsRunning)
{
try
{
BasicDeliverEventArgs item;
if (consumer.Queue.Dequeue(Timeout, out item))
{
string message = System.Text.Encoding.UTF8.GetString(item.Body);
DoSomethingMethod(message);
channel.BasicAck(item.DeliveryTag, false);
}
}
catch (EndOfStreamException ex)
{
// this is likely due to some connection issue -- what am I to do?
}
catch (Exception ex)
{
// should never happen, but lets say my DoSomethingMethod(message); throws an exception
// presumably, I'll just log the error and keep on going
}
}
}
}
public IConnection Connection
{
get
{
if (_connection == null) // _connection defined in class -- private static IConnection _connection;
{
_connection = CreateConnection();
}
return _connection;
}
}
private IConnection CreateConnection()
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "RabbitMqHostName",
UserName = "RabbitMqUserName",
Password = "RabbitMqPassword",
};
// why do we need to set this explicitly? shouldn't this be the default?
factory.AutomaticRecoveryEnabled = true;
// what is a good value to use?
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(5);
// what is a good value to use? How is this different from NetworkRecoveryInterval?
factory.RequestedHeartbeat = 5;
IConnection connection = factory.CreateConnection();
return connection;
}