2

我在相当复杂的场景中使用了 Spring 状态机。我将用 SM 中最简单的部分来解释我的问题。参考下图。这是我的主要状态机 红色标记的主状态机和拾取的子机

红圈中的状态指向下面的子机 冲锋枪

所以,如你所见,我有 3 个动作。sendBasicTemplate、timeoutLogAction 和 processBasicTemplateReply。我将在下面提供相关的代码段和我的配置。

我在这个过程中观察到的是工厂创建的状态机总是驻留在内存中。有一些我想不出的参考。是SM没有停止还是我做错了什么?这是我的代码。

配置类

@Configuration @EnableStateMachineFactory public class CambodiaStateMachine extends StateMachineConfigurerAdapter<String, String> {

    @Override
    public void configure(StateMachineModelConfigurer<String, String> model) throws Exception {
        model           
            .withModel()
                .factory(modelFactory());
    }

    @Override   public void configure(StateMachineConfigurationConfigurer<String, String> config) throws Exception {
        config
            .withConfiguration()
            .machineId("cambodia")
            .autoStartup(true)
            .listener(listener());  }    

    @Bean
    public StateMachineListener<String, String> listener() {
        return new StateMachineListenerAdapter<String, String>() {
            @Override
            public void stateChanged(State<String, String> from, State<String, String> to) {
                System.out.println("State change to " + to.getId());
            }
        };
    }

    @Bean
    public StateMachineModelFactory<String, String> modelFactory() {
        return new UmlStateMachineModelFactory("classpath:stm/model.uml");
    }
}

方法: 1. 这就是我的事件被提供给机器的方式以及新的 SM 实例被创建的地方。我从队列中取出我的事件

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "sims.events.mq", durable = "true"), exchange = @Exchange(type = ExchangeTypes.TOPIC, value = "sims.events.mq.xch", ignoreDeclarationExceptions = "true", durable = "true"), key = "events"))
    public void process(GenericMessage<String> message) {

        try {

            String imei = (String) message.getHeaders().get("imei");
            Subscriber subscriber = subscriberService.findSubscriber(imei);


            // quickly create 'new' state machine
            StateMachine<String, String> stateMachine = factory.getStateMachine();

            stateMachine.addStateListener(new CompositeStateMachineListener<String, String>() {

                @Override
                public void stateContext(StateContext<String, String> arg0) {

                    String user = (String) arg0.getExtendedState().getVariables().get("imei");
                    if (user == null) {
                        return;
                    }

                    log.info(arg0.getStage().toString() + "**********" + stateMachine.getState());
                    try {
                        redisStateMachinePersister.persist(arg0.getStateMachine(), "testprefixSw:" + user);
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                }

            });

            // restore from persistent
            String user = (String) message.getHeaders().get("imei");
            log.info(user);

            // attempt restoring only if key is exist
            if (redisTemplate.hasKey("testprefixSw:" + user)) {
                System.out.println("************************  prefix exists...restoring");
                resetStateMachineFromStore(stateMachine, user);
            } else {
                stateMachine.start();
                System.out.println("************************  No prefix");

            }

            log.info("Payload == > " + message.getPayload());

            try {
                stateMachine.getExtendedState().getVariables().put("imei", user);
                stateMachine.getExtendedState().getVariables().put("fromState", stateMachine.getState().getId());
                stateMachine.getExtendedState().getVariables().put("eventName", message.getPayload());
                if(null!= message.getHeaders().get("templates"))
                    stateMachine.getExtendedState().getVariables().put("templates", message.getHeaders().get("templates"));

                if(null!= message.getHeaders().get("ttl"))
                    stateMachine.getExtendedState().getVariables().put("ttl", message.getHeaders().get("ttl"));
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }

            // check if state is properly restored...
            log.info("Current State " + stateMachine.getState().toString());

            feedMachine(stateMachine, user, message);

            log.info("handler exited");

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        // TODO: save persistant state..
    }


private void feedMachine(StateMachine<String, String> stateMachine, String user, GenericMessage<String> event)
        throws Exception {
    stateMachine.sendEvent(event);
    System.out.println("persist machine --- > state :" + stateMachine.getState().toString());
    redisStateMachinePersister.persist(stateMachine, "testprefixSw:" + user);
}

private StateMachine<String, String> resetStateMachineFromStore(StateMachine<String, String> stateMachine,
        String user) throws Exception {

    StateMachine<String, String> machine = redisStateMachinePersister.restore(stateMachine, "testprefixSw:" + user);
    System.out.println("restore machine --- > state :" + machine.getState().toString());
    return machine;
}

行动

@Bean
    public Action<String, String> sendBasicTemplate() {

        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {
                // MP: variables are the right way to do
                String imeiNo = (String) context.getExtendedState().getVariables().get("imei");
                String template = (String) context.getMessageHeader("template");

                log.info("sending basic template " + template + " to " + imeiNo);

                findTemplateNSend(context, template, imeiNo);
                xbossBalanceCheck(context, imeiNo, "Direct Query");
                setRiskyState(context, "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo, 0);
            }
        };
    }

    @Bean
    public Action<String, String> processBasicTemplateReply() {

        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {

                log.info("Result for basic template processing started");
                log.info(context.getStateMachine().getState().getIds().toString());
                String imeiNo = (String) context.getExtendedState().getVariables().get("imei");

                saveDirectValues(context, imeiNo);

                String fromState = (String) context.getExtendedState().getVariables().get("fromState");
                String eventName = (String) context.getExtendedState().getVariables().get("eventName");
                long trId = (Long) context.getMessageHeader("processId") != null? (Long) context.getMessageHeader("processId") : 0;


                String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
                log.info("*Going to delete if exists key ==>" + key);

                if (clearRiskyStateIfSet(context, key)) {
                    log.info("------------------------------Jedis Exists");
                    sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getState().getId(), trId, eventName, false, "Query Event Success");
                }

                // mark as success sent
                context.getStateMachine().sendEvent("SEQUENCE_COMPLETE");
            }
        };
    }


@Bean
    public Action<String, String> timeoutLogAction() {
        // Action handler...
        return new Action<String, String>() {
            @Override
            public void execute(StateContext<String, String> context) {
                // log.info("timeout log Action");

                String imeiNo = (String) context.getStateMachine().getExtendedState().getVariables().get("imei");

                // String imeiNo = (String)
                // context.getExtendedState().getVariables().get("imei");
                String fromState = (String) context.getExtendedState().getVariables().get("fromState");
                String eventName = (String) context.getExtendedState().getVariables().get("eventName");
                long trId = (Long) context.getMessageHeader("processId") != null ? (Long) context.getMessageHeader("processId") : 0;


                String key = "testprefixSw:RISKY_StateBasic_WFT_Timeout" + imeiNo;
                log.info("*Going to delete if exists key ==>" + key);

                if (clearRiskyStateIfSet(context, key)) {
                    log.info("------------------------------Jedis Exists at timeout. Event Failed");
                    sendSubscriberEventLog(imeiNo, fromState, context.getStateMachine().getId(), trId, eventName, true, "Direct Query Failed due to Timeout");
                    sendAlert(imeiNo, EventPriority.NORMAL, "Direct Query Failed due to Timeout");
                }

            }
        };
    }

那么基于上述,我是否遗漏了什么,以便创建的状态机不会被垃圾收集?或任何其他解释为什么每个请求都会消耗内存并且它永远不会被释放?

4

0 回答 0