大概您希望避免进行多次传递,因为管道阶段可能很昂贵。或者您希望避免收集中间值以便通过多个收集器运行它们,因为存储所有值的成本可能太高。
As 布莱恩·戈茨指出 https://stackoverflow.com/questions/29181682/performing-more-than-one-reduction-in-a-single-pass#comment46586618_29181682, Collectors.summarizingInt
将收集int
值并对它们执行多次归约,返回一个称为的聚合结构IntSummaryStatistics
。有类似的收藏家进行总结double
and long
values.
不幸的是,它们只执行一组固定的缩减,因此如果您想要执行与它们不同的缩减,则必须编写自己的收集器。
这是一种在单次传递中使用多个不相关的收集器的技术。我们可以用peek()
破解流中的每个值,使其不受干扰地通过。这peek()
操作需要一个Consumer
,所以我们需要一种方法来适应Collector
to a Consumer
. The Consumer
将成为收藏家的累加器功能。但我们还需要致电收藏家supplier函数并存储它创建的对象以传递给累加器功能。我们需要一种方法从收集器中获取结果。为此,我们将收集器包装在一个小帮助器类中:
public class PeekingCollector<T,A,R> {
final Collector<T,A,R> collector;
final A acc;
public PeekingCollector(Collector<T,A,R> collector) {
this.collector = collector;
this.acc = collector.supplier().get();
}
public Consumer<T> peek() {
if (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
return t -> collector.accumulator().accept(acc, t);
else
return t -> {
synchronized (this) {
collector.accumulator().accept(acc, t);
}
};
}
public synchronized R get() {
return collector.finisher().apply(acc);
}
}
要使用它,我们首先必须创建包装的收集器并挂载它。然后我们运行管道并调用peek
,传递包裹的收集器。最后我们打电话get
在包装的收集器上以获得其结果。这是一个简单的示例,它对一些单词进行过滤和排序,同时按首字母对它们进行分组:
List<String> input = Arrays.asList(
"aardvark", "crocodile", "antelope",
"buffalo", "bustard", "cockatoo",
"capybara", "bison", "alligator");
PeekingCollector<String,?,Map<String,List<String>>> grouper =
new PeekingCollector<>(groupingBy(s -> s.substring(0, 1)));
List<String> output = input.stream()
.filter(s -> s.length() > 5)
.peek(grouper.peek())
.sorted()
.collect(toList());
Map<String,List<String>> groups = grouper.get();
System.out.println(output);
System.out.println(groups);
输出是:
[aardvark, alligator, antelope, buffalo, bustard, capybara, cockatoo, crocodile]
{a=[aardvark, antelope, alligator], b=[buffalo, bustard], c=[crocodile, cockatoo, capybara]}
这有点麻烦,因为您必须为包装的收集器写出泛型类型(这有点不寻常;它们通常都是推断出来的)。但如果处理或存储流值的费用足够大,也许值得这么麻烦。
最后请注意peek()
如果流并行运行,则可以从多个线程调用。因此,非线程安全收集器必须受到保护synchronized
堵塞。如果收集器是线程安全的,我们不需要在调用它时进行同步。为了确定这一点,我们检查收集器的CONCURRENT
特征。如果您运行并行流,最好放置一个并发收集器(例如groupingByConcurrent
or toConcurrentMap
)内peek
操作,否则包装收集器内的同步可能会导致瓶颈并减慢整个流的速度。