理解Filter
and Reduce
运营商
正如其他人在评论中提到的,您引用的文章使用的是具有不同语法的旧版本 rxjs。对于此示例,我将使用从版本 6 开始的较新语法。
在 rxjs 中,有各种运算符可用于转换通过流发出的值。通常这些是这样导入的:
import { filter, reduce } from 'rxjs/operators';
还有许多生成器函数可用于创建值流。interval
是这些函数之一,它将创建一个流,该流每隔n
毫秒。导入如:
import { interval } from 'rxjs';
让我们创建一个简单的流:
number$ = interval(1000); // emit number every 1 second
// output: 0, 1, 2, 3, 4, 5...
我们可以将运算符应用于此流来转换排放:
的用法filter
很简单。它只是发出通过给定真值测试的值(与Array.filter()
方法)。
numbersLessThan4$ = numbers$.pipe(
filter(number => number < 4)
);
// output: 0, 1, 2, 3
The reduce
运算符有点复杂,其行为就像Array.reduce()
方法。函数应用于每个发射值,并且能够存储在评估下一次发射时可以引用的值。
reduce
有两个参数。第一个是接收当前发射的函数(cur
) 和之前的累计结果 (acc
)并返回一个新的累计值。第二个是初始值acc
.
example:
sumOfNumbers$ = numbers$.pipe(
reduce((acc, cur) => acc + cur, 0)
);
那么,让我们看看什么reduce
何时做numbers$
发出前 3 个数字:
- 0
-
cur
接收当前排放值0
-
acc
从提供的默认值开始0
- 表达方式
acc + cur
回报0
- 1
-
cur
接收当前排放值1
-
acc
接收之前返回的值0
- 表达方式
acc + cur
回报1
- 2
-
cur
接收当前排放值2
-
acc
接收之前返回的值1
- 表达方式
acc + cur
回报3
所以这很酷。我们可以将相当多的逻辑放入一行简单的代码中。一件重要的事情reduce
是在源可观察对象完成之前它不会发出。现在,numbers$
永远不会完成(interval()
无限期地发出连续整数)。
我们可以使用take()
运算符在发出一定数量的值后完成流。
Example:
numbers$ = interval(1000).pipe(take(5)); // completes after 5 emissions
sumOfNumbers$ = numbers$.pipe(
// receives 5 values (0, 1, 2, 3, 4) and performs the logic described above.
reduce((acc, cur) => acc + cur, 0)
);
// output: 10
可以使用多个运营商来转换排放。只需在里面提供多个pipe()
:
sumOfNumbersLessThan4$ = numbers$.pipe(
filter(number => number < 4),
reduce((acc, cur) => acc + cur, 0)
);
// output: 6