将 Observables 与反馈合并,将 Observable 与其自身合并

2024-01-07

我需要创建 Observable,它将从不同的来源(其他 Observables)收集信息,每个来源都会对事件值产生影响,但该值仍然是基于先前的值(一种状态机)构建的。

我们有带有 int 值和操作代码的消息:

class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }
}

以及此类值的一些来源,具有初始值:

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));

现在有事件源了。这些源将发出值,新的dynamicMessage 值将基于该值以及dynamicMessage 的先前值而出现。实际上它的事件类型是:

class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }
}


class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }
}

变更者1负责变更操作。变更者对变更的价值负责。

以及这些值的来源:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));

Now I need change dynamicMessage Observable and take in respect messages from changers. here is a diagram: state machine

我还尝试编写一个程序,我如何看到解决方案,但它因 OutOfMemoryException 挂起,就在第一个值之后:
消息{值=1,操作='+'}
清单:

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class RxDilemma {


static class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Message{" +
                "value=" + value +
                ", operation='" + operation + '\'' +
                '}';
    }
}

static class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Changer1{" +
                "operation='" + operation + '\'' +
                '}';
    }
}


static class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Changer2{" +
                "value=" + value +
                '}';
    }
}





static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer1 changer) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                message.operation = changer.operation;
                return message;
            }
        });
    }
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer2 changer2) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                if("+".equalsIgnoreCase(message.operation)){
                    message.value = message.value+changer2.value;
                } else if("-".equalsIgnoreCase(message.operation)){
                    message.value = message.value-changer2.value;
                }
                return message;
            }
        });
    }
}));

public static void main(String[] args) throws InterruptedException {

    dynamicMessage.subscribe(new Action1<Message>() {
        @Override
        public void call(Message message) {
            System.out.println(message);
        }
    });


    Thread.sleep(5000000);
}
}

所以我的预期输出是:

消息{值=1,操作='+'}
消息{值=1,操作='-'}
消息{值=-1,操作='-'}
消息{值=1,操作='+'}
消息{值=2,操作='+'}

我还考虑过用CombineLatest合并所有内容,但后来我发现CombineLatest没有提供更改了哪一个元素,如果没有这些信息,我将不知道需要对消息进行哪些更改。 有什么帮助吗?


In other words, you want to reduce scan your actions to reflect changes in a state. There is an operator, rightly named reduce scan, that does exactly what you are asking for. It takes every event that comes and lets you transform it with the addition of previous result of that transformation, which will be your state.

interface Changer {
    State apply(State state);
}

class ValueChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}


class OperationChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}

Observable<ValueChanger> valueChangers
Observable<OperationChanger> operationChangers

Observable.merge(valueChangers, operationChangers)
    .scan(initialState, (state, changer) -> changer.apply(state))
    .subscribe(state -> {
        // called every time a change happens
    });

请注意,我稍微更改了您的命名以使其更有意义。另外,我在这里使用 Java8 lambda 表达式。如果您无法在项目中使用它们,只需将它们替换为匿名类即可。

编辑:使用scan运算符而不是reduce因为它会传递整个过程中的每个结果,而不是仅发出最终结果

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 Observables 与反馈合并,将 Observable 与其自身合并 的相关文章

随机推荐