我正在使用一个调用我实现的函数的框架。我希望这个函数的参数转换为可观察的,并通过一系列观察者发送。我以为我可以为此使用一个主题,但它的行为并不像我预期的那样。
为了澄清一下,我有类似以下代码的内容。我想Option 1
下面的方法可行,但到目前为止我已经满足了Option 2
,这看起来根本不符合习惯。
var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
function(event) {
console.log("got event", event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSubject onCompleted');
}
);
// The framework calls this method
function onEvent(eventArray) {
var eventSource = Rx.Observable.from(eventArray);
// Option 1: I thought this would work, but it doesn't
// eventSource.subscribe(eventSubject);
// Option 2: This does work, but its obviously clunky
eventSource.subscribe(
function(event) {
log.debug("sending to subject");
eventSubject.onNext(event);
},
function(e) {
log.error(e);
},
function() {
console.log('eventSource onCompleted');
}
);
}
正如 Brandon 已经解释的那样,将 eventSubject 订阅到另一个可观察对象意味着将 eventSubjects onNext、onError 和 onComplete 订阅到该可观察对象 onNext、onError 和 onComplete。从您的示例来看,您似乎只想订阅 onNext。
一旦第一个 eventSource 完成/错误,您的主题就会完成/错误 - 您的 eventSubject 正确地忽略后续 eventSource 对其触发的任何进一步的 onNext/onError 。
有多种方法可以只订阅任意 eventSource 的 onNext:
-
仅手动订阅onNext。
resultSource = eventSubject
.map(processEvent);
eventSource.subscribe(
function(event) {
eventSubject.onNext(event);
},
function(error) {
// don't subscribe to onError
},
function() {
// don't subscribe to onComplete
}
);
-
使用仅为您处理事件源 onNext/onError 订阅的运算符。这是布兰登的建议。请记住,这也订阅了 eventSources onError,在您的示例中您似乎不想要它。
resultSource = eventSubject
.mergeAll()
.map(processEvent);
eventSubject.onNext(eventSource);
-
使用不会为 eventSources onError/onComplete 调用 eventSubjects onError/onComplete 的观察者。您可以简单地覆盖 eventSubjects onComplete 作为肮脏的黑客,但最好创建一个新的观察者。
resultSource = eventSubject
.map(processEvent);
var eventObserver = Rx.Observer.create(
function (event) {
eventSubject.onNext(event);
}
);
eventSubject.subscribe(eventObserver);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)