1

我正在使用一个 Spring Integration 应用程序,它应该遍历目录的内容,处理其中的文件,然后退出。

我已经在下面设置了 XML 以每秒轮询目录,但这并不是我所追求的。如何更改它以读取目录中的所有文件,然后在消息完成流过系统后让我的程序退出?

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int-file="http://www.springframework.org/schema/integration/file"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/xml http://www.springframework.org/schema/integration/xml/spring-integration-xml-2.1.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.1.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd >

    <int-file:inbound-channel-adapter
        directory="inputDir" 
        channel="inputChannel">
        <int:poller fixed-rate="1000"></int:poller>
    </int-file:inbound-channel-adapter>

  <!-- more components to read from inputChannel, write to output adapter -->
</beans>
4

2 回答 2

1

有点hacky,我过去曾使用过这种方法,并且效果很好。

方法是定义一个shutdown队列通道,从你的主线程等待消息到达这个通道,一旦可用就关闭应用程序上下文。

<int:channel id="shutdownChannel"><int:queue/></int:channel>

在你的主线程 - :

    PollableChannel pollableChannel = applicationContext.getBean("shutdownChannel",PollableChannel.class);
    pollableChannel.receive();
    applicationContext.close();

receive只有当消息进入关闭通道时,主代码才会通过调用。现在的问题是如何将消息发送到关闭通道。

您可以在文件消息的最终处理器中保留一些状态,例如自上次处理任何文件以来的时间,以下可能是稻草人:

public class FileContentProcessor {
    private long lastProcessedTime = System.currentTimeMillis();

    public void processContent(String content){
        this.lastProcessedTime = System.currentTimeMillis();
        System.out.println("Processed content: " + content);
    }

    public long msSinceLastProcessed(){
        return System.currentTimeMillis() - this.lastProcessedTime;
    }
}

根据这个状态定义一个入站通道适配器:

<int:inbound-channel-adapter ref="fileProcessor" method="msSinceLastProcessed" channel="shutdownFilterChannel">
    <int:poller fixed-rate="3000"/> 
</int:inbound-channel-adapter>

<int:filter input-channel="shutdownFilterChannel" output-channel="shutdownChannel" expression="payload>20000"></int:filter>

在这里,我基本上得到了自上次处理以来的时间,将其传递给一个过滤器,该过滤器检查自上次处理时间以来是否超过 20 秒,并将该消息传递给关闭通道。

于 2012-11-28T16:05:46.070 回答
0

假设您使用的是独立的 java 程序,那么当您在某个时间间隔内未收到消息时,您可能可以将消息发送到输出通道和 System.exit。

 final AbstractApplicationContext context =
                new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml");

// initiates poller to poll files
// do your processing and then send to a channel say output ..

PollableChannel output = (PollableChannel) context.getBean("output");
Object msg = null;
while((msg = output.receive(1000)) != null)
   {
        msg = output.receive().getPayload();        
        System.out.println("payload - " + msg);
    }
System.exit(0);

另一种选择是向服务激活器发送消息,然后启动 jvm 关闭。

请注意:EAI 模式是实际处理文件,然后移动到已处理的目录。

于 2012-11-28T14:39:18.633 回答