我正在尝试使用以 kafka 作为代理的 spring 云数据流从 oracle DB 进行更改数据捕获。我为此使用轮询机制。我定期使用基本选择查询轮询数据库以捕获任何更新的数据。为了更好的防故障系统,我将上次轮询时间保存在 oracle DB 中,并使用它来获取上次轮询后更新的数据。
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
return jdbcPollingChannelAdapter;
}
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
flowBuilder.channel(this.source.output());
flowBuilder.transform(trans,"transform");
return flowBuilder.get();
}
我在应用程序属性中的查询如下:
query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)
update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP
这对我来说非常有用。我可以通过这种方法从数据库中获取 CDC。
我现在正在查看的问题如下:
仅仅为了维护轮询时间而创建一个表是一种负担。我正在寻找在 kafka 主题中维护最后一次投票时间,并在我进行下一次投票时从 kafka 主题中检索该时间。
我已经修改了jdbcMessageSource
如下方法来尝试:
public MessageSource<Object> jdbcMessageSource() {
String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, query);
return jdbcPollingChannelAdapter;
}
但是 Spring Data Flow 只实例化 pollingFlow()(请参见上面的代码)bean 一次。因此,首先运行的查询将保持不变。我想用每次轮询的新轮询时间来更新查询。
Integrationflow
有没有一种方法可以让我在每次进行民意调查时编写一个自定义来更新这个查询?
我已经尝试过IntegrationFlowContext
,但没有成功。
提前致谢 !!!