3

我有以下配置

<int:channel id="notificationChannel" datatype="com.mycompany.integration.NotificationMessage">
        <int:queue message-store="jdbc-message-store" capacity="1000" />
    </int:channel>

    <int:outbound-channel-adapter ref="notificationHandler"
        method="handle" channel="notificationChannel" >
        <int:poller max-messages-per-poll="100" fixed-delay="60000"
            time-unit="MILLISECONDS" >
            <int:transactional isolation="DEFAULT" />
        </int:poller>
    </int:outbound-channel-adapter>

现在我想对此进行单元测试,我需要等待消息在测试中被正确处理,我用拦截器尝试了它,但这不起作用,因为我只能同步消息传递,但不能成功处理信息。在处理完成后实现发送回复,但这意味着实现这个只是为了使我的单元测试工作,在生产中不会在消息头中设置回复通道。我如何在成功处理请求时实现同步而不在 messageHandler 中实现它?

4

1 回答 1

3

如果您使用的是 Spring Integration 2.2.x,则可以通过建议来执行此操作...

public class CompletionAdvice extends AbstractRequestHandlerAdvice {

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        Object result = callback.execute();
        latch.countDown();
        return result;
    }

    public CountDownLatch getLatch() {
        return latch;
    }

}

在您的测试环境中,使用 bean factory 后处理器将建议添加到适配器的处理程序。

public class AddCompletionAdvice implements BeanFactoryPostProcessor {

    private final Collection<String> handlers;

    private final Collection<String> replyProducingHandlers;

    public AddCompletionAdvice(Collection<String> handlers, Collection<String> replyProducingHandlers) {
        this.handlers = handlers;
        this.replyProducingHandlers = replyProducingHandlers;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanName : handlers) {
            defineAdviceAndInject(beanFactory, beanName, beanName + "CompletionAdvice");
        }
        for (String beanName : replyProducingHandlers) {
            String handlerBeanName = beanFactory.getAliases(beanName + ".handler")[0];
            defineAdviceAndInject(beanFactory, handlerBeanName, beanName + "CompletionAdvice");
        }
    }

    private void defineAdviceAndInject(ConfigurableListableBeanFactory beanFactory, String beanName, String adviceBeanName) {
        BeanDefinition serviceHandler = beanFactory.getBeanDefinition(beanName);
        BeanDefinition advice = new RootBeanDefinition(CompletionAdvice.class);
        ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(adviceBeanName, advice);
        serviceHandler.getPropertyValues().add("adviceChain", new RuntimeBeanReference(adviceBeanName));
    }

}

将后处理器添加到配置<bean class="foo.AddCompletionAdvice" />中。

最后,将建议注入您的测试用例

@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class TestAdvice {

    @Autowired
    private CompletionAdvice fooCompletionAdvice;

    @Autowired
    private CompletionAdvice barCompletionAdvice;

    @Autowired
    private MessageChannel input;

    @Test
    public void test() throws Exception {
        Message<?> message = new GenericMessage<String>("Hello, world!");
        input.send(message);
        assertTrue(fooCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
        assertTrue(barCompletionAdvice.getLatch().await(1, TimeUnit.SECONDS));
    }

}

并等待闩锁。

<int:publish-subscribe-channel id="input"/>

<int:outbound-channel-adapter id="foo" channel="input" ref="x" method="handle"/>

<int:service-activator id="bar" input-channel="input" ref="x"/>

<bean class="foo.AddCompletionAdvice">
    <constructor-arg name="handlers">
        <list>
            <value>foo</value>
        </list>
    </constructor-arg>
    <constructor-arg name="replyProducingHandlers">
        <list>
            <value>bar</value>
        </list>
    </constructor-arg>
</bean>

<bean id="x" class="foo.Foo" />

我将这些类添加到Gist

编辑:更新为最终消费者(无回复)和回复生产消费者提供一般案例。

于 2013-04-29T13:31:01.353 回答