当您处理典型的处理“流”时,甚至可能是多线程/并发处理流,最好的方法是“邮槽”/消息泵。通过交换消息,您可以轻松协调应用程序的多个层,包括报告错误、通知下一个命令链等。我不是指 Windows 消息,而是指类似以下内容的内容:
public abstract class Message
{
public abstract void Process();
}
Then:
public class MessageQueue
{
private Queue m_Queue;
public void Post(Message msg) {....}
public void Process() {.....}
}
然后,您在应用程序的每个线程/处理层上分配一个 MessageQueue 并传递消息,如下所示:
GUIMessageQueue.Post(
new ErrorMessage("Internal async file reader ran out of buffer"));
在 GUI 线程上放置一个读取 GUI 队列并调用它的 Process() 的计时器。
现在,您可以创建许多消息派生的工作项来执行各种任务,这些任务可以在线程/逻辑层之间轻松编排。此外,消息可能包含与其相关的数据块的引用:
公共飞机着陆消息:消息{公共飞机飞机......}
这是我在大规模并行链处理系统中使用的一些真实代码:
/// <summary>
/// Defines a base for items executable by WorkQueue
/// </summary>
public interface IWorkItem<TContext> where TContext : class
{
/// <summary>
/// Invoked on an item to perform actual work.
/// For example: repaint grid from changed data source, refresh file, send email etc...
/// </summary>
void PerformWork(TContext context);
/// <summary>
/// Invoked after successfull work execution - when no exception happened
/// </summary>
void WorkSucceeded();
/// <summary>
/// Invoked when either work execution or work success method threw an exception and did not succeed
/// </summary>
/// <param name="workPerformed">When true indicates that PerformWork() worked without exception but exception happened later</param>
void WorkFailed(bool workPerformed, Exception error);
}
/// <summary>
/// Defines contract for work queue that work items can be posted to
/// </summary>
public interface IWorkQueue<TContext> where TContext : class
{
/// <summary>
/// Posts work item into the queue in natural queue order (at the end of the queue)
/// </summary>
void PostItem(IWorkItem<TContext> work);
long ProcessedSuccessCount{get;}
long ProcessedFailureCount{get;}
TContext Context { get; }
}