您可以分享源序列,应用distinctUntilChanged
到一条路径,然后将驱动buffer
使用的运算符Observable
指示边界:
@Test
@SuppressWarnings("unchecked")
public void test() {
Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
.compose(bufferUntilChanged(v -> v))
.test()
.assertResult(
Arrays.asList(1, 1, 1),
Arrays.asList(2, 2, 2),
Arrays.asList(3, 3),
Arrays.asList(1, 1),
Arrays.asList(5, 5)
);
}
static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
Function<T, K> keySelector) {
return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}
The skip(1)
是因为第一个项目通过distinctUntilChanged
将触发一个新的缓冲区,使第一个缓冲区为空。