RabbitMQ MessageConsumer 没有收到发布的消息
我正在编写一个集成测试,它应该从队列中接收消息并进行处理。但是消费者根本没有收到消息。
当我手动注入依赖项时 -没有 Spring context,一切正常。但是当我使用 SpringContext 时,消费者没有收到消息。
从SpringInfraConfig.class
环境变量加载值。为了'emule'我正在使用这个库EnvironmentVariables
中的类的环境。env 变量加载良好 - 检查运行调试。
注意当我提到没有 SpringContext 工作正常时,我也没有使用环境库。
要将消息发布到 RabbitMQ 队列中,我在测试方法上“手动”进行。消息发布得很好。在调用我的真实测试类之前,我编写了一个 consming 测试代码。DefaultConsumer#handleDelivery
这是一个简单的原始消费者,用 a覆盖sysout
来打印传入的消息。作品。
当我使用我的真实测试类进行测试时 -MessageConsumerServiceImpl.class
它只是记录从队列中开始消费并且测试结束。
当我调试并进入所有方法时,会发生一些非常奇怪的事情 - 收到消息并且在处理过程中它最终没有完成所有调用 -测试只是停止,也没有抛出错误。
另一个奇怪的事情是 - 启用 RabbitMQ 管理插件,没有队列、交换、通道甚至连接打开。我在调试运行中检查了这一点,同时停止进入断点。
SpringConfig 类
@Import({SpringCoreConfig.class})
@ComponentScan({"br.com.fulltime.fullarm.fullcam.integration.infra", "br.com.fulltime.fullarm.cross.cutting", "br.com.fulltime.fullarm.infrastructure.commons"})
@Configuration
public class SpringInfraConfig {
@Bean
public FInfraSettings getFInfraSettings() {
Map<String, String> fInfraMap = new HashMap<>();
fInfraMap.put("F_INFRA_RABBIT_HOST", "f_infra_rabbit_host");
fInfraMap.put("F_INFRA_EXCHANGE", "f_infra_exchange");
fInfraMap.put("F_INFRA_QUEUE", "f_infra_queue");
fInfraMap.put("F_INFRA_PROCESS_ID", "f_infra_process_id");
fInfraMap.put("F_INFRA_DESCRIPTION", "f_infra_description");
fInfraMap.put("F_INFRA_TEXT", "f_infra_text");
fInfraMap.put("F_INFRA_TAG", "f_infra_tag");
fInfraMap.put("F_INFRA_WARNING_TIME", "f_infra_warning_time");
fInfraMap.put("F_INFRA_CRITICAL_TIME", "f_infra_critical_time");
return new FInfraSettings(
getEnv("f_infra_run", "false").asBoolean(),
getEnv("f_infra_ka_time", "1").asInt(),
fInfraMap);
}
@Bean
public ApplicationSettings getApplicationSettings() {
return new ApplicationSettings(
getEnv("process_name", "FullArm-FullCam Integration").asString(),
getEnv("process_version", "DEFAULT-1.0.0").asString());
}
@Bean
public PushoverSettings getPushoverSettings() {
return new PushoverSettings(
getEnv("pushover_api", "invalido").asString(),
getEnv("pushover_user_id", "invalido").asString(),
getEnv("pushover_run", "false").asBoolean());
}
@Bean
public RabbitMQSettings getRabbitMQSettings() {
return new RabbitMQSettings(
new RabbitConnectionInfo(
getEnv("rabbitmq_host", "127.0.0.1").asString(),
getEnv("rabbitmq_port", "5672").asInt(),
getEnv("rabbitmq_virtual_host", "/").asString(),
getEnv("rabbitmq_username", "guest").asString(),
getEnv("rabbitmq_password", "guest").asString()),
new RabbitConnectionInfo(
getEnv("rabbitmq_fullcam_host", "127.0.0.1").asString(),
getEnv("rabbitmq_fullcam_port", "5672").asInt(),
getEnv("rabbitmq_fullcam_virtual_host", "/").asString(),
getEnv("rabbitmq_fullcam_username", "guest").asString(),
getEnv("rabbitmq_fullcam_password", "guest").asString()),
new RabbitQueueInfo(
getEnv("rabbitmq_consumer_fullarm_queue", "fcomQueConsumerFullCam").asString(),
getEnv("rabbitmq_consumer_fullarm_exc", "fcomExcConsumer").asString(),
getEnv("rabbitmq_consumer_fullarm_rk", "fcomRKConsumerFullCam").asString()),
new RabbitQueueInfo(
getEnv("rabbitmq_consumer_fullcam_queue", "foo").asString(),
getEnv("rabbitmq_consumer_fullcam_exc", "foo").asString(),
getEnv("rabbitmq_consumer_fullcam_rk", "foo").asString()),
new RabbitQueueInfo(
getEnv("rabbitmq_publish_fullcam_queue", "fullcamRequest").asString(),
getEnv("rabbitmq_publish_fullcam_exc", "fullcamRequestExc").asString(),
getEnv("rabbitmq_consumer_fullarm_rk", "fullcamRequestRK").asString()));
}
@Bean
public RedisSettings getRedisSettings() {
return new RedisSettings(
getEnv("redis_host", "localhost").asString(),
getEnv("redis_port", "6379").asInt(),
getEnv("redis_password", "123456").asString());
}
@Bean
public Connection getConnection() {
try {
return RabbitConnectionFactory.create(getRabbitMQSettings().getConnectionInfo());
} catch (IOException | TimeoutException e) {
throw new ShutdownException(e);
}
}
@Bean
public Logging getLogging() {
return new DefaultLogger();
}
MessageConsumerServiceImpl 类
@Component
public class MessageConsumerServiceImpl implements MessageConsumerService {
private final Connection rabbitMQConnection;
private final MessageConsumerFactory consumerFactory;
private final RabbitMQSettings mqSettings;
private final ShutdownService shutdownService;
private final Logging logger;
@Inject
public MessageConsumerServiceImpl(Connection rabbitMQConnection,
MessageConsumerFactory consumerFactory,
RabbitMQSettings mqSettings,
ShutdownService shutdownService,
Logging logger) {
this.rabbitMQConnection = rabbitMQConnection;
this.consumerFactory = consumerFactory;
this.mqSettings = mqSettings;
this.shutdownService = shutdownService;
this.logger = logger;
}
@Override
public void startListening() {
try {
RabbitQueueInfo commandQueInfo = mqSettings.getRabbitMQFullArmConsumerQueue();
final String queue = commandQueInfo.getQueue();
Channel channel = rabbitMQConnection.createChannel();
channel.queueDeclare(queue, true, false, false, null);
MessageConsumer commandConsumer = consumerFactory.create(channel);
logger.info("[MESSAGE-CONSUMER] - Consumindo da fila: {}", queue);
channel.basicConsume(queue, commandConsumer);
} catch (IOException e) {
logger.error("[MESSAGE-CONSUMER] - ShutdownException", e);
shutdownService.shutdown(e);
}
}
集成测试类
public class MessageConsumerServiceImplIntegrationTest {
private static final Integer RABBITMQ_PORT = 5672;
private static final String RABBITMQ_EXC = "fcomExcConsumer";
private static final String RABBITMQ_QUEUE = "fcomQueFullcamIntegration";
private static final String RABBITMQ_RK = "fcomRKConsumerFullCam";
private static final String REDIS_PASSWORD = "123456";
private static final int REDIS_PORT = 6379;
public static RabbitMQContainer rabbitMqContainer;
public static GenericContainer redisContainer;
static {
redisContainer = new GenericContainer<>("redis:5.0.3-alpine")
.withExposedPorts(REDIS_PORT)
.withCommand("redis-server --requirepass " + REDIS_PASSWORD)
.waitingFor(Wait.forListeningPort());
redisContainer.start();
}
static {
rabbitMqContainer = new RabbitMQContainer()
.withExposedPorts(RABBITMQ_PORT)
.withExposedPorts(15672)
.withUser("guest", "guest")
.withVhost("/")
.waitingFor(Wait.forListeningPort());
rabbitMqContainer.start();
}
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
.set("rabbitmq_host", rabbitMqContainer.getContainerIpAddress())
.set("rabbitmq_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
.set("rabbitmq_virtual_host", "/")
.set("rabbitmq_username", "guest")
.set("rabbitmq_password", "guest")
.set("rabbitmq_fullcam_host", rabbitMqContainer.getContainerIpAddress())
.set("rabbitmq_fullcam_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
.set("rabbitmq_fullcam_virtual_host", "/")
.set("rabbitmq_fullcam_username", "guest")
.set("rabbitmq_fullcam_password", "guest")
.set("rabbitmq_publish_fullcam_queue", "Fullarm.Request")
.set("rabbitmq_publish_fullcam_exc", "fcomExcFullcam")
.set("rabbitmq_publish_fullcam_rk", "fcomRKFullcamRequest")
.set("rabbitmq_consumer_fullarm_queue", RABBITMQ_QUEUE)
.set("rabbitmq_consumer_fullarm_exc", RABBITMQ_EXC)
.set("rabbitmq_consumer_fullarm_rk", RABBITMQ_RK)
.set("rabbitmq_consumer_fullcam_queue", "Fullarm.Reponse")
.set("rabbitmq_consumer_fullcam_exc", "fcomExcFullarm")
.set("rabbitmq_consumer_fullcam_rk", "fcomRKFullarmFullcamIntegration")
.set("f_infra_rabbit_host", "abobora")
.set("f_infra_exchange", "abobora")
.set("f_infra_queue", "abobora")
.set("f_infra_process_id", "0")
.set("f_infra_description", "abobora")
.set("f_infra_text", "abobora")
.set("f_infra_tag", "0")
.set("f_infra_warning_time", "0")
.set("f_infra_critical_time", "0")
.set("f_infra_run", "false")
.set("f_infra_ka_time", "1")
.set("redis_host", redisContainer.getContainerIpAddress())
.set("redis_port", String.valueOf(redisContainer.getMappedPort(REDIS_PORT)))
.set("redis_password", REDIS_PASSWORD);
private MessageConsumerService instance;
private ApplicationContext context;
@Before
public void setUp() {
context = new AnnotationConfigApplicationContext(SpringInfraConfig.class);
instance = context.getBean(MessageConsumerService.class);
}
@Test
public void deveProcessarRequisicao() throws IOException, TimeoutException {
String message = "{ \"tipoPacote\" : 3, \"descricao_painel\" : \"Casa Mauro Naves\", \"setor_disparado\" : \"Porta da Frente\", \"data_disparo\" : 1587151300000, \"cameras\" : [90851, 90853, 90854] }";
ConnectionFactory factory = new ConnectionFactory();
RabbitMQSettings settings = context.getBean(RabbitMQSettings.class);
factory.setHost(settings.getConnectionInfo().getHost());
factory.setPort(settings.getConnectionInfo().getPort());
factory.setVirtualHost(settings.getConnectionInfo().getVirtualHost());
factory.setAutomaticRecoveryEnabled(true);
factory.setUsername(settings.getConnectionInfo().getUsername());
factory.setPassword(settings.getConnectionInfo().getPassword());
factory.setRequestedHeartbeat(50);
Connection connection = factory.newConnection();
RabbitQueueInfo commandQueInfo = settings.getRabbitMQFullArmConsumerQueue();
Channel channel = connection.createChannel();
channel.exchangeDeclare(commandQueInfo.getExchange(), "direct", true);
channel.queueDeclare(commandQueInfo.getQueue(), true, false, false, null);
channel.queueBind(commandQueInfo.getQueue(), commandQueInfo.getExchange(), commandQueInfo.getRoutingKey());
channel.basicPublish(commandQueInfo.getExchange(), commandQueInfo.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());
channel.close();
connection.close();
instance.startListening();
}
Gradle 依赖项
核心构建.gradle
dependencies {
compile group: 'javax.inject', name: 'javax.inject', version: '1'
compile group: 'org.springframework', name: 'spring-context', version: '5.2.5.RELEASE'
compile 'com.fasterxml.jackson.core:jackson-core:2.7.1'
compile 'com.fasterxml.jackson.core:jackson-databind:2.7.1-1'
compile group: 'br.com.fulltime.fullarm', name: 'cross-cutting-commons', version: '1.13.0'
compile group: 'br.com.fulltime.fullarm', name: 'constants', version: '1.110.0'
}
infra-build.gradle
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile "org.testcontainers:testcontainers:1.14.1"
testCompile "org.testcontainers:rabbitmq:1.14.1"
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'
compile project(':core')
compile group: 'br.com.fulltime.fullarm', name: 'infrastructure-commons', version: '1.6.0'
compile group: 'br.com.fulltime.fullarm', name: 'FInfraJavaLibrary', version: '2.3.0'
compile group: 'br.com.fulltime.fullarm', name: 'pushover-lib', version: '1.0.0'
compile group: 'redis.clients', name: 'jedis', version: '3.3.0'
}
测试输出
Testing started at 08:38 ...
Starting Gradle Daemon...
Gradle Daemon started in 815 ms
> Task :core:compileJava UP-TO-DATE
> Task :core:processResources NO-SOURCE
> Task :core:classes UP-TO-DATE
> Task :core:jar UP-TO-DATE
> Task :infra:compileJava UP-TO-DATE
> Task :infra:processResources NO-SOURCE
> Task :infra:classes UP-TO-DATE
> Task :infra:compileTestJava
> Task :infra:processTestResources NO-SOURCE
> Task :infra:testClasses
> Task :infra:test
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.junit.contrib.java.lang.system.EnvironmentVariables (file:/home/*omited*/.gradle/caches/modules-2/files-2.1/com.github.stefanbirkner/system-rules/1.19.0/d541c9a1cff0dda32e2436c74562e2e4aa6c88cd/system-rules-1.19.0.jar) to field java.util.Collections$UnmodifiableMap.m
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2020-05-14 08:38:35 INFO - [MESSAGE-CONSUMER] - Consumindo da fila: fcomQueFullcamIntegration
WARNING: Please consider reporting this to the maintainers of org.junit.contrib.java.lang.system.EnvironmentVariables
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 22s
5 actionable tasks: 2 executed, 3 up-to-date
08:38:36: Task execution finished ':infra:test --tests "br.com.fulltime.fullarm.fullcam.integration.infra.consumer.MessageConsumerServiceImplIntegrationTest.deveProcessarRequisicao"'.
我对问题没有更多的想法。欢迎任何帮助。提前致谢
更新
我再次编写了测试以使其更简单。我用 Spring-context 和 env 的东西写了一个代码,而另一个没有 Spring-context 和 env 的东西。两者都没有工作。
所以,为了测试porpuse,我编写了一个简单的代码Thread#sleep()
,猜猜看,两个测试都有效!
我认为原因是 RabbitMQ DefaultConsumer 实例化了一个用于消费消息的新线程,释放了主要测试线程并且它被停止了。由于主线程已停止,所有其他线程也已停止。
所以我认为我们在这里遇到了同步测试问题。