我猜您需要 MessageVerifier bean 作为 StreamStubMessages 类的实例。这是一个有条件的,它的实例化取决于类路径的内容。您可以尝试通过显式实例化来解决问题:
@Bean
MessageVerifier<Message<?>> activeMqContractVerifier(ApplicationContext applicationContext) {
return new StreamStubMessages(applicationContext);
}
它应该以 no class def found 异常结束,这将引导您丢失依赖项。
我有类似的问题,但在我的情况下,有 ActiveMQ 代替了 Kafka(没有弹簧集成或弹簧流)。因此,可用的 MessageVerifier 似乎都不合适。对我来说,MessageVerifier 的以下简约实现(它不是 ActiveMQ 的通用解决方案)完成了这项工作:
import org.springframework.cloud.contract.verifier.messaging.MessageVerifier;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import javax.jms.JMSException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ActiveMqStubMessages implements MessageVerifier<Message<?>> {
private final ApplicationContext applicationContext;
public ActiveMqStubMessages(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public void send(Message message, String destination) {
throw new UnsupportedOperationException();
}
@Override
public <T> void send(T payload, Map<String, Object> headers, String destination) {
throw new UnsupportedOperationException();
}
@Override
public Message<?> receive(String destination, long timeout, TimeUnit timeUnit) {
final JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
final long originalReceiveTimeout = jmsTemplate.getReceiveTimeout();
try {
jmsTemplate.setReceiveTimeout(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
return (Message) new MessagingMessageConverter().fromMessage(jmsTemplate.receive(destination));
} catch (JMSException e) {
throw new RuntimeException(e);
} finally {
jmsTemplate.setReceiveTimeout(originalReceiveTimeout);
}
}
@Override
public Message<?> receive(String destination) {
return receive(destination, 3, TimeUnit.SECONDS);
}
}