我有需要丰富的传入事件流,然后在它们到达时并行处理。
我以为 Project Reactor 是为这项工作定制的,但在我的测试中,所有处理似乎都是串行完成的。
这是一些测试代码:
ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
.map(i-> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.take(3)
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");
我尝试取消注释每个注释行,但是在每种情况下,输出都表明使用同一线程来处理所有事件
Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done
(唯一的区别是,如果没有调度程序,它们在订阅线程上运行,而对于任何执行程序,它们都在同一个线程中运行,该线程不是订阅线程。)
我缺少什么?
仅供参考,有一种“睡眠”方法:
public static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("Exiting");
}
}