Spring Statemachine Factory - 保留在内存中

2023-12-21

I have used Spring state-machine in quite a complex scenario. I will explain my problem with the simplest part of the SM. Refer below image. This is my main state machine Main state machine and the picked sub-machine marked in red

The state circled in red points to the following sub-machine Submachine

所以,如您所见,我有 3 个操作。 sendBasicTemplate、timeoutLogAction 和 processBasicTemplateReply。我将在下面提供相关的代码段和我的配置。

在此过程中我观察到工厂创建的状态机始终驻留在内存中。有一些我想不起来的参考资料。 是SM不停止还是我哪里做错了?这是我的代码。

配置类

@Configuration @EnableStateMachineFactory public class CambodiaStateMachine extends StateMachineConfigurerAdapter<String, String> {

    @Override
    public void configure(StateMachineModelConfigurer<String, String> model) throws Exception {
        model           
            .withModel()
                .factory(modelFactory());
    }

    @Override   public void configure(StateMachineConfigurationConfigurer<String, String> config) throws Exception {
        config
            .withConfiguration()
            .machineId("cambodia")
            .autoStartup(true)
            .listener(listener());  }    

    @Bean
    public StateMachineListener<String, String> listener() {
        return new StateMachineListenerAdapter<String, String>() {
            @Override
            public void stateChanged(State<String, String> from, State<String, String> to) {
                System.out.println("State change to " + to.getId());
            }
        };
    }

    @Bean
    public StateMachineModelFactory<String, String> modelFactory() {
        return new UmlStateMachineModelFactory("classpath:stm/model.uml");
    }
}

方法: 1. 这就是我的事件被馈送到机器以及创建新 SM 实例的方式。我从队列中获取事件

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "sims.events.mq", durable = "true"), exchange = @Exchange(type = ExchangeTypes.TOPIC, value = "sims.events.mq.xch", ignoreDeclarationExceptions = "true", durable = "true"), key = "events"))
    public void process(GenericMessage<String> message) {

        try {

            String imei = (String) message.getHeaders().get("imei");
            Subscriber subscriber = subscriberService.findSubscriber(imei);


            // quickly create 'new' state machine
            StateMachine<String, String> stateMachine = factory.getStateMachine();

            stateMachine.addStateListener(new CompositeStateMachineListener<String, String>() {

                @Override
                public void stateContext(StateContext<String, String> arg0) {

                    String user = (String) arg0.getExtendedState().getVariables().get("imei");
                    if (user == null) {
                        return;
                    }

                    log.info(arg0.getStage().toString() + "**********" + stateMachine.getState());
                    try {
                        redisStateMachinePersister.persist(arg0.getStateMachine(), "testprefixSw:" + user);
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                }

            });

            // restore from persistent
            String user = (String) message.getHeaders().get("imei");
            log.info(user);

            // attempt restoring only if key is exist
            if (redisTemplate.hasKey("testprefixSw:" + user)) {
                System.out.println("************************  prefix exists...restoring");
                resetStateMachineFromStore(stateMachine, user);
            } else {
                stateMachine.start();
                System.out.println("************************  No prefix");

            }

            log.info("Payload == > " + message.getPayload());

            try {
                stateMachine.getExtendedState().getVariables().put("imei", user);
                stateMachine.getExtendedState().getVariables().put("fromState", stateMachine.getState().getId());
                stateMachine.getExtendedState().getVariables().put("eventName", message.getPayload());
                if(null!= message.getHeaders().get("templates"))
                    stateMachine.getExtendedState().getVariables().put("templates", message.getHeaders().get("templates"));

                if(null!= message.getHeaders().get("ttl"))
                    stateMachine.getExtendedState().getVariables().put("ttl", message.getHeaders().get("ttl"));
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }

            // check if state is properly restored...
            log.info("Current State " + stateMachine.getState().toString());

            feedMachine(stateMachine, user, message);

            log.info("handler exited");

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        // TODO: save persistant state..
    }


private void feedMachine(StateMachine<String, String> stateMachine, String user, GenericMessage<String> event)
        throws Exception {
    stateMachine.sendEvent(event);
    System.out.println("persist machine --- > state :" + stateMachine.getState().toString());
    redisStateMachinePersister.persist(stateMachine, "testprefixSw:" + user);
}

private StateMachine<String, String> resetStateMachineFromStore(StateMachine<String, String> stateMachine,
        String user) throws Exception {

    StateMachine<String, String> machine = redisStateMachinePersister.restore(stateMachine, "testprefixSw:" + user);
    System.out.println("restore machine --- > state :" + machine.getState().toString());
    return machine;
}

Actions

@Bean
    public Action<String, String> sendBasicTemplate() {

        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {
                // MP: variables are the right way to do
                String imeiNo = (String) context.getExtendedState().getVariables().get("imei");
                String template = (String) context.getMessageHeader("template");

                log.info("sending basic template " + template + " to " + imeiNo);

                findTemplateNSend(context, template, imeiNo);
                xbossBalanceCheck(context, imeiNo, "Direct Query");
                setRiskyState(context, "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo, 0);
            }
        };
    }

    @Bean
    public Action<String, String> processBasicTemplateReply() {

        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {

                log.info("Result for basic template processing started");
                log.info(context.getStateMachine().getState().getIds().toString());
                String imeiNo = (String) context.getExtendedState().getVariables().get("imei");

                saveDirectValues(context, imeiNo);

                String fromState = (String) context.getExtendedState().getVariables().get("fromState");
                String eventName = (String) context.getExtendedState().getVariables().get("eventName");
                long trId = (Long) context.getMessageHeader("processId") != null? (Long) context.getMessageHeader("processId") : 0;


                String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
                log.info("*Going to delete if exists key ==>" + key);

                if (clearRiskyStateIfSet(context, key)) {
                    log.info("------------------------------Jedis Exists");
                    sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getState().getId(), trId, eventName, false, "Query Event Success");
                }

                // mark as success sent
                context.getStateMachine().sendEvent("SEQUENCE_COMPLETE");
            }
        };
    }


@Bean
    public Action<String, String> timeoutLogAction() {
        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {
                // log.info("timeout log Action");

                String imeiNo = (String) context.getStateMachine().getExtendedState().getVariables().get("imei");

                // String imeiNo = (String)
                // context.getExtendedState().getVariables().get("imei");
                String fromState = (String) context.getExtendedState().getVariables().get("fromState");
                String eventName = (String) context.getExtendedState().getVariables().get("eventName");
                long trId = (Long) context.getMessageHeader("processId") != null ? (Long) context.getMessageHeader("processId") : 0;


                String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
                log.info("*Going to delete if exists key ==>" + key);

                if (clearRiskyStateIfSet(context, key)) {
                    log.info("------------------------------Jedis Exists at timeout. Event Failed");
                    sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getId(), trId, eventName, true, "Direct Query Failed due to Timeout");
                    sendAlert(imeiNo, EventPriority.NORMAL, "Direct Query Failed due to Timeout");
                }

            }
        };
    }

因此,基于上述内容,我是否缺少任何内容,以便创建的状态机不会被垃圾收集?或者任何其他解释为什么每个请求都会消耗内存并且它永远不会被释放?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Statemachine Factory - 保留在内存中 的相关文章

随机推荐