onErrorContinue()
可能没有按照你想象的那样做——它让上游运营商recover如果它们碰巧支持这样做,则可能会避免它们内部可能发生的错误。这是一个相当专业的操作员。
在这种情况下map()
实际上支持onErrorContinue
,但 map 实际上并没有产生错误 - 错误已经插入到流中(通过concat()
和明确的Flux.error()
调用。)换句话说,根本没有操作符产生错误,因此它无法恢复,因为提供的元素是错误的。
如果您更改了流,以便map()
实际上caused错误,那么它将按预期工作:
Flux.just(1, 2,3,4,5)
.map(x -> {
if(x==5) {
throw new RuntimeException();
}
return x*2;
})
.onErrorContinue((e,i)->{
System.out.println("Error For Item +" + i );
})
.subscribe(System.out::println);
生产:
2
4
6
8
Error For Item +5
根据实际用例,另一种选择可能是使用onErrorResume()
在可能错误的元素(或元素源)之后:
Flux.just(1, 2, 3, 4, 5)
.concatWith(Flux.error(new RuntimeException()))
.onErrorResume(e -> {
System.out.println("Error " + e + ", ignoring");
return Mono.empty();
})
.concatWith(Flux.just(6))
.map(i -> i * 2)
.subscribe(System.out::println);
一般来说,使用另一个“onError”运算符(例如onErrorResume()
)通常是更常用、更推荐的方法,因为onErrorContinue()
取决于运营商的支持并影响上游,而不是下游运营商(这是不寻常的。)