我正在尝试使用 apache flink 有状态函数来实现消息传递场景。
按照设计,我需要从传入消息中计算一些统计数据并将它们存储在状态中。之后,场景函数将访问这些状态和消息并在它们上运行业务规则。但是我们每条消息可能有几十个场景,每个场景都应该只运行一次。
代码或多或少如下
@Override
public void configure(MatchBinder binder) {
binder
.predicate(Transaction.class,this::updateTransactionStatAndSendToScenatioManager)
}
private void updateTransactionStatAndSendToScenatioManager(Context context, Transaction transaction){
// state update
context.send(FnScenarioManager.TYPE, String.valueOf(transaction.id()) , transaction);
}
FnScenarioManager:
@Override
public void configure(MatchBinder binder) {
binder
.predicate(Transaction.class,this::runTransactionScenarios);
}
private void runTransactionScenarios(Context context, Transaction transaction){
context.send(Scenario1.TYPE,String.valueOf(transaction.id()),transaction);
context.send(Scenario2.TYPE,String.valueOf(transaction.id()),transaction);
context.send(Scenario3.TYPE,String.valueOf(transaction.id()),transaction);
...
context.send(ScenarioN.TYPE,String.valueOf(transaction.id()),transaction);
}
我的问题是如果集群在runTransactionScenarios中间崩溃会发生什么?
- 每个场景会只运行一次吗?如果不是,我怎么能确保呢?