我正在寻找一个好的解决方案来在超出重试限制后立即记录失败消息,而无需处理错误队列。到目前为止我发现了什么:
- 我可以继承InMemory入站消息跟踪器并覆盖是否超出重试限制,但此时除了 id 之外,没有关于消息本身的信息。
- 我可以实施IInboundMessage拦截器并得到IConsumerContext in 发货前/发货后,但此时没有有关成功/失败的信息。
所以作为解决方案,我可以得到IConsumerContext in 预调度将其放入某种缓存中,然后将其从缓存中取出是否超出重试限制当超过重试限制时。
方法按以下顺序调用:IsRetryLimitExceeded -> PreDispatch -> PostDispatch
所以我找不到一个好地方来从缓存中删除成功处理的消息。
当然,我可以使用大小受限的缓存,但整个解决方案似乎很奇怪。
对此事的任何想法将不胜感激。
我已经找到了这个解决方案:
class MessageInterceptor: IInboundMessageInterceptor
{
public void PreDispatch(IConsumeContext context)
{
MessageTracker.Register(context);
}
public void PostDispatch(IConsumeContext context)
{}
}
class MessageTracker: InMemoryInboundMessageTracker
{
readonly Logger logger;
static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>();
public MessageTracker(int retryLimit, Logger logger)
: base(retryLimit)
{
this.logger = logger;
}
public static void Register(IConsumeContext context)
{
DispatchingCache.GetOrAdd(context.MessageId, context);
}
public override void MessageWasReceivedSuccessfully(string id)
{
base.MessageWasReceivedSuccessfully(id);
IConsumeContext value;
DispatchingCache.TryRemove(id, out value);
}
public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions)
{
var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions);
IConsumeContext failed;
if (!result || !DispatchingCache.TryRemove(id, out failed))
return result;
// --> log failed IConsumeContext with exception
return true;
}
}
并将这些类插入
serviceBus = ServiceBusFactory.New(config =>
{
...
config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb =>
{
var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline);
interceptorConfig.Create(new MessageInterceptor());
}));
config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger()));
});
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)