0

我们可以使用 Spring Integration 为文件配置目录轮询,例如 -

配置 2 台服务器后,轮询发生在 1 台服务器上,相应的处理在两台服务器上进行分布式处理。

另外,我们可以在运行时切换任一服务器上的轮询吗?

编辑 - 尝试配置 JBDC MetaStore 并分别运行两个实例,能够轮询和处理但间歇性地得到DeadLockLoserDataAccessException

下面的配置

@Bean
public MessageChannel fileInputChannel(){
return new DirectChannel();
}

@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller(){
PollerMetadata pollermetadata = new PollerMetadata();
pollermetadata.setMaxMessagesPerPoll(-1);
pollermetadata.setTrigger(new PeriodicTrigger(1000));
return pollermetadata;
}

@Bean
@InBoundChannelAdapter(value = "fileInputChannel"){
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory("Mylocalpath");
FileSystemPersistentAcceptOnceFileListFilter acceptOnce = new FileSystemPersistentAcceptOnceFileListFilter();
ChainFileListFilter<File> chainFilter = ChainFileListFilter(".*\\.txt"));
chainFilter.addFilter(acceptOnce);
source.setFilter(chainFilter);
source.setUseWatchService(true);
source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,FileReadingMessageSource.WatchEventType.MODIFY);
return source;
}

@Bean
public IntegrationFlow processFileFlow(){
return IntegrationFlows.from("fileInputChannel")
.handle(service).get();
}
4

1 回答 1

0

轻松实现分布式解决方案确实是 Spring Integration 的特性之一。您只需将消息传递中间件添加到集群基础架构中,并将所有节点连接到某个目的地以进行发送和接收。一个很好的例子是SubscribableJmsChannel你可以简单地在你的应用程序上下文中声明它,你的集群的所有节点都将订阅这个通道,以便从 JMS 队列中循环消费。哪个节点产生到这个通道已经无关紧要了。

在文档中查看更多信息:https ://docs.spring.io/spring-integration/docs/current/reference/html/jms.html#jms-channel 。

另一个类似分布式通道的示例是:AMQP、Kafka、Redis、ZeroMQ。

您还可以拥有一个共享消息存储并在QueueChannel定义中使用它:https ://docs.spring.io/spring-integration/docs/current/reference/html/system-management.html#message-store

目前尚不清楚您对“运行时轮询器”的含义是什么,因此我建议您使用更多信息启动一个新的 SO 线程。

将规则视为指导:https ://stackoverflow.com/help/how-to-ask

于 2021-12-23T13:40:24.073 回答