嗨,我在我的项目中广泛使用 spring 集成,在当前情况下,使用 spring 动态流注册动态创建我的 ftp、sftp 适配器。此外,为了提供会话工厂,我根据每个唯一连接的持久配置动态创建它们。
这很好用,但有时我需要动态修改现有会话配置,在这种情况下,我确实需要会话工厂用新的会话配置刷新。这可能是由于动态更改信用而发生的。
为了做同样的事情,我正在寻找两种方法
- 通过 flowcontext.remove(flowid) 删除动态流。但这并没有以某种方式杀死流程,我仍然看到旧的会话工厂和流程正在运行。
- 如果有一种方法可以动态地将正在运行的适配器与新的 Sessionfactory 相关联,这也将起作用。但是仍然没有找到方法来完成这个。
请帮忙
更新
下面是我的动态注册码
CachingSessionFactory<FTPFile> csf = cache.get(feed.getConnectionId());
IntegrationFlow flow = IntegrationFlows
.from(inboundAdapter(csf).preserveTimestamp(true)//
.remoteDirectory(feed.getRemoteDirectory())//
.regexFilter(feed.getRegexFilter())//
.deleteRemoteFiles(feed.getDeleteRemoteFiles())
.autoCreateLocalDirectory(feed.getAutoCreateLocalDirectory())
.localFilenameExpression(feed.getLocalFilenameExpression())//
.localFilter(localFileFilter)//
.localDirectory(new File(feed.getLocalDirectory())),
e -> e.id(inboundAdapter.get(feed.getId())).autoStartup(false)
.poller(Pollers//
.cron(feed.getPollingFreq())//
.maxMessagesPerPoll(1)//
.advice(retryAdvice)))
.enrichHeaders(s -> s.header(HEADER.feed.name(), feed))//
.filter(selector)//
.handle(fcHandler)//
.handle(fileValidationHandler)//
.channel(ftbSubscriber)//
.get();
this.flowContext.registration(flow).addBean(csf).//
id(inboundFlow.get(feed.getId())).//
autoStartup(false).register();
我正在尝试通过删除相同的
flowContext.remove(flowId);
在删除轮询器和适配器时,它们看起来仍然处于活动状态
java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:275)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:200)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:62)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy188.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
*POST Gary 评论 *更改了链的顺序并删除了他的示例中定义的自动启动,现在轮询适配器看起来像被删除了。
更改顺序以匹配来自 Gary 的顺序,并从 flowcontext 链中删除自动启动。如果 autstrtup 为 true ,虽然看起来 bug 仍然存在。
this.flowContext.registration(flow).//
id(inboundFlow.get(feed.getId()))//
.addBean(sessionFactory.get(feed.getId()), csf)//
.register();
* 研究更多 *standardIntegrationFlow.start
无论自动启动状态如何,都会启动流程中的所有组件
。我想我们确实需要检查这些 isAutostartup 并且只有在启动 IntegrationFlow 时自动启动为 True 时才启动它们。standardIF 下面的现有代码。我有办法覆盖这个或者这需要 PR 或 fix 。
if (!this.running) {
ListIterator<Object> iterator = this.integrationComponents.listIterator(this.integrationComponents.size());
this.lifecycles.clear();
while (iterator.hasPrevious()) {
Object component = iterator.previous();
if (component instanceof SmartLifecycle) {
this.lifecycles.add((SmartLifecycle) component);
((SmartLifecycle) component).start();
}
}
this.running = true;
}