0

RabbitMQ MessageConsumer 没有收到发布的消息

我正在编写一个集成测试,它应该从队列中接收消息并进行处理。但是消费者根本没有收到消息。

当我手动注入依赖项时 -没有 Spring context,一切正常。但是当我使用 SpringContext 时,消费者没有收到消息。

SpringInfraConfig.class环境变量加载值。为了'emule'我正在使用这个库EnvironmentVariables中的类的环境。env 变量加载良好 - 检查运行调试。

注意当我提到没有 SpringContext 工作正常时,我也没有使用环境库。

要将消息发布到 RabbitMQ 队列中,我在测试方法上“手动”进行。消息发布得很好。在调用我的真实测试类之前,我编写了一个 consming 测试代码。DefaultConsumer#handleDelivery这是一个简单的原始消费者,用 a覆盖sysout来打印传入的消息。作品。

当我使用我的真实测试类进行测试时 -MessageConsumerServiceImpl.class它只是记录从队列中开始消费并且测试结束。

当我调试并进入所有方法时,会发生一些非常奇怪的事情 - 收到消息并且在处理过程中它最终没有完成所有调用 -测试只是停止,也没有抛出错误。

另一个奇怪的事情是 - 启用 RabbitMQ 管理插件,没有队列、交换、通道甚至连接打开。我在调试运行中检查了这一点,同时停止进入断点。

SpringConfig 类

@Import({SpringCoreConfig.class})
@ComponentScan({"br.com.fulltime.fullarm.fullcam.integration.infra",     "br.com.fulltime.fullarm.cross.cutting", "br.com.fulltime.fullarm.infrastructure.commons"})
@Configuration
public class SpringInfraConfig {

@Bean
public FInfraSettings getFInfraSettings() {
    Map<String, String> fInfraMap = new HashMap<>();
    fInfraMap.put("F_INFRA_RABBIT_HOST", "f_infra_rabbit_host");
    fInfraMap.put("F_INFRA_EXCHANGE", "f_infra_exchange");
    fInfraMap.put("F_INFRA_QUEUE", "f_infra_queue");
    fInfraMap.put("F_INFRA_PROCESS_ID", "f_infra_process_id");
    fInfraMap.put("F_INFRA_DESCRIPTION", "f_infra_description");
    fInfraMap.put("F_INFRA_TEXT", "f_infra_text");
    fInfraMap.put("F_INFRA_TAG", "f_infra_tag");
    fInfraMap.put("F_INFRA_WARNING_TIME", "f_infra_warning_time");
    fInfraMap.put("F_INFRA_CRITICAL_TIME", "f_infra_critical_time");

    return new FInfraSettings(
            getEnv("f_infra_run", "false").asBoolean(),
            getEnv("f_infra_ka_time", "1").asInt(),
            fInfraMap);
}

@Bean
public ApplicationSettings getApplicationSettings() {
    return new ApplicationSettings(
            getEnv("process_name", "FullArm-FullCam Integration").asString(),
            getEnv("process_version", "DEFAULT-1.0.0").asString());
}

@Bean
public PushoverSettings getPushoverSettings() {
    return new PushoverSettings(
            getEnv("pushover_api", "invalido").asString(),
            getEnv("pushover_user_id", "invalido").asString(),
            getEnv("pushover_run", "false").asBoolean());

}

@Bean
public RabbitMQSettings getRabbitMQSettings() {
    return new RabbitMQSettings(
            new RabbitConnectionInfo(
                    getEnv("rabbitmq_host", "127.0.0.1").asString(),
                    getEnv("rabbitmq_port", "5672").asInt(),
                    getEnv("rabbitmq_virtual_host", "/").asString(),
                    getEnv("rabbitmq_username", "guest").asString(),
                    getEnv("rabbitmq_password", "guest").asString()),
            new RabbitConnectionInfo(
                    getEnv("rabbitmq_fullcam_host", "127.0.0.1").asString(),
                    getEnv("rabbitmq_fullcam_port", "5672").asInt(),
                    getEnv("rabbitmq_fullcam_virtual_host", "/").asString(),
                    getEnv("rabbitmq_fullcam_username", "guest").asString(),
                    getEnv("rabbitmq_fullcam_password", "guest").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_consumer_fullarm_queue", "fcomQueConsumerFullCam").asString(),
                    getEnv("rabbitmq_consumer_fullarm_exc", "fcomExcConsumer").asString(),
                    getEnv("rabbitmq_consumer_fullarm_rk", "fcomRKConsumerFullCam").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_consumer_fullcam_queue", "foo").asString(),
                    getEnv("rabbitmq_consumer_fullcam_exc", "foo").asString(),
                    getEnv("rabbitmq_consumer_fullcam_rk", "foo").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_publish_fullcam_queue", "fullcamRequest").asString(),
                    getEnv("rabbitmq_publish_fullcam_exc", "fullcamRequestExc").asString(),
                    getEnv("rabbitmq_consumer_fullarm_rk", "fullcamRequestRK").asString()));
}

@Bean
public RedisSettings getRedisSettings() {
    return new RedisSettings(
            getEnv("redis_host", "localhost").asString(),
            getEnv("redis_port", "6379").asInt(),
            getEnv("redis_password", "123456").asString());
}

@Bean
public Connection getConnection() {
    try {
        return RabbitConnectionFactory.create(getRabbitMQSettings().getConnectionInfo());
    } catch (IOException | TimeoutException e) {
        throw new ShutdownException(e);
    }
}

@Bean
public Logging getLogging() {
    return new DefaultLogger();
}

MessageConsumerServiceImpl 类

@Component
public class MessageConsumerServiceImpl implements MessageConsumerService {

private final Connection rabbitMQConnection;
private final MessageConsumerFactory consumerFactory;
private final RabbitMQSettings mqSettings;
private final ShutdownService shutdownService;
private final Logging logger;

@Inject
public MessageConsumerServiceImpl(Connection rabbitMQConnection,
                                  MessageConsumerFactory consumerFactory,
                                  RabbitMQSettings mqSettings,
                                  ShutdownService shutdownService,
                                  Logging logger) {
    this.rabbitMQConnection = rabbitMQConnection;
    this.consumerFactory = consumerFactory;
    this.mqSettings = mqSettings;
    this.shutdownService = shutdownService;
    this.logger = logger;
}

@Override
public void startListening() {
    try {
        RabbitQueueInfo commandQueInfo = mqSettings.getRabbitMQFullArmConsumerQueue();
        final String queue = commandQueInfo.getQueue();

        Channel channel = rabbitMQConnection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        MessageConsumer commandConsumer = consumerFactory.create(channel);

        logger.info("[MESSAGE-CONSUMER] - Consumindo da fila: {}", queue);
        channel.basicConsume(queue, commandConsumer);

    } catch (IOException e) {
        logger.error("[MESSAGE-CONSUMER] - ShutdownException", e);
        shutdownService.shutdown(e);
    }
}

集成测试类

public class MessageConsumerServiceImplIntegrationTest {

private static final Integer RABBITMQ_PORT = 5672;
private static final String RABBITMQ_EXC = "fcomExcConsumer";
private static final String RABBITMQ_QUEUE = "fcomQueFullcamIntegration";
private static final String RABBITMQ_RK = "fcomRKConsumerFullCam";
private static final String REDIS_PASSWORD = "123456";
private static final int REDIS_PORT = 6379;

public static RabbitMQContainer rabbitMqContainer;
public static GenericContainer redisContainer;

static {
    redisContainer = new GenericContainer<>("redis:5.0.3-alpine")
            .withExposedPorts(REDIS_PORT)
            .withCommand("redis-server --requirepass " + REDIS_PASSWORD)
            .waitingFor(Wait.forListeningPort());
    redisContainer.start();
}

static {
    rabbitMqContainer = new RabbitMQContainer()
            .withExposedPorts(RABBITMQ_PORT)
            .withExposedPorts(15672)
            .withUser("guest", "guest")
            .withVhost("/")
            .waitingFor(Wait.forListeningPort());
    rabbitMqContainer.start();
}

@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
        .set("rabbitmq_host", rabbitMqContainer.getContainerIpAddress())
        .set("rabbitmq_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
        .set("rabbitmq_virtual_host", "/")
        .set("rabbitmq_username", "guest")
        .set("rabbitmq_password", "guest")

        .set("rabbitmq_fullcam_host", rabbitMqContainer.getContainerIpAddress())
        .set("rabbitmq_fullcam_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
        .set("rabbitmq_fullcam_virtual_host", "/")
        .set("rabbitmq_fullcam_username", "guest")
        .set("rabbitmq_fullcam_password", "guest")

        .set("rabbitmq_publish_fullcam_queue", "Fullarm.Request")
        .set("rabbitmq_publish_fullcam_exc", "fcomExcFullcam")
        .set("rabbitmq_publish_fullcam_rk", "fcomRKFullcamRequest")

        .set("rabbitmq_consumer_fullarm_queue", RABBITMQ_QUEUE)
        .set("rabbitmq_consumer_fullarm_exc", RABBITMQ_EXC)
        .set("rabbitmq_consumer_fullarm_rk", RABBITMQ_RK)

        .set("rabbitmq_consumer_fullcam_queue", "Fullarm.Reponse")
        .set("rabbitmq_consumer_fullcam_exc", "fcomExcFullarm")
        .set("rabbitmq_consumer_fullcam_rk", "fcomRKFullarmFullcamIntegration")

        .set("f_infra_rabbit_host", "abobora")
        .set("f_infra_exchange", "abobora")
        .set("f_infra_queue", "abobora")
        .set("f_infra_process_id", "0")
        .set("f_infra_description", "abobora")
        .set("f_infra_text", "abobora")
        .set("f_infra_tag", "0")
        .set("f_infra_warning_time", "0")
        .set("f_infra_critical_time", "0")
        .set("f_infra_run", "false")
        .set("f_infra_ka_time", "1")

        .set("redis_host", redisContainer.getContainerIpAddress())
        .set("redis_port", String.valueOf(redisContainer.getMappedPort(REDIS_PORT)))
        .set("redis_password", REDIS_PASSWORD);

private MessageConsumerService instance;
private ApplicationContext context;

@Before
public void setUp() {
    context = new AnnotationConfigApplicationContext(SpringInfraConfig.class);
    instance = context.getBean(MessageConsumerService.class);
}

@Test
public void deveProcessarRequisicao() throws IOException, TimeoutException {
    String message = "{ \"tipoPacote\" : 3, \"descricao_painel\" : \"Casa Mauro Naves\", \"setor_disparado\" : \"Porta da Frente\", \"data_disparo\" : 1587151300000, \"cameras\" : [90851, 90853, 90854] }";

    ConnectionFactory factory = new ConnectionFactory();
    RabbitMQSettings settings = context.getBean(RabbitMQSettings.class);
    factory.setHost(settings.getConnectionInfo().getHost());
    factory.setPort(settings.getConnectionInfo().getPort());
    factory.setVirtualHost(settings.getConnectionInfo().getVirtualHost());
    factory.setAutomaticRecoveryEnabled(true);
    factory.setUsername(settings.getConnectionInfo().getUsername());
    factory.setPassword(settings.getConnectionInfo().getPassword());
    factory.setRequestedHeartbeat(50);
    Connection connection = factory.newConnection();

    RabbitQueueInfo commandQueInfo = settings.getRabbitMQFullArmConsumerQueue();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(commandQueInfo.getExchange(), "direct", true);
    channel.queueDeclare(commandQueInfo.getQueue(), true, false, false, null);
    channel.queueBind(commandQueInfo.getQueue(), commandQueInfo.getExchange(), commandQueInfo.getRoutingKey());
    channel.basicPublish(commandQueInfo.getExchange(), commandQueInfo.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());
    channel.close();
    connection.close();

    instance.startListening();

}

Gradle 依赖项

核心构建.gradle

dependencies {

   compile group: 'javax.inject', name: 'javax.inject', version: '1'
   compile group: 'org.springframework', name: 'spring-context', version: '5.2.5.RELEASE'

   compile 'com.fasterxml.jackson.core:jackson-core:2.7.1'
   compile 'com.fasterxml.jackson.core:jackson-databind:2.7.1-1'

   compile group: 'br.com.fulltime.fullarm', name: 'cross-cutting-commons', version: '1.13.0'
   compile group: 'br.com.fulltime.fullarm', name: 'constants', version: '1.110.0'
}

infra-build.gradle

dependencies {

   testCompile group: 'junit', name: 'junit', version: '4.12'
   testCompile "org.testcontainers:testcontainers:1.14.1"
   testCompile "org.testcontainers:rabbitmq:1.14.1"
   testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'


   compile project(':core')

   compile group: 'br.com.fulltime.fullarm', name: 'infrastructure-commons', version: '1.6.0'
   compile group: 'br.com.fulltime.fullarm', name: 'FInfraJavaLibrary', version: '2.3.0'
   compile group: 'br.com.fulltime.fullarm', name: 'pushover-lib', version: '1.0.0'

   compile group: 'redis.clients', name: 'jedis', version: '3.3.0'
}

测试输出

Testing started at 08:38 ...
Starting Gradle Daemon...
Gradle Daemon started in 815 ms
> Task :core:compileJava UP-TO-DATE
> Task :core:processResources NO-SOURCE
> Task :core:classes UP-TO-DATE
> Task :core:jar UP-TO-DATE
> Task :infra:compileJava UP-TO-DATE
> Task :infra:processResources NO-SOURCE
> Task :infra:classes UP-TO-DATE
> Task :infra:compileTestJava
> Task :infra:processTestResources NO-SOURCE
> Task :infra:testClasses
> Task :infra:test
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.junit.contrib.java.lang.system.EnvironmentVariables (file:/home/*omited*/.gradle/caches/modules-2/files-2.1/com.github.stefanbirkner/system-rules/1.19.0/d541c9a1cff0dda32e2436c74562e2e4aa6c88cd/system-rules-1.19.0.jar) to field java.util.Collections$UnmodifiableMap.m
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2020-05-14 08:38:35 INFO - [MESSAGE-CONSUMER] - Consumindo da fila: fcomQueFullcamIntegration
WARNING: Please consider reporting this to the maintainers of org.junit.contrib.java.lang.system.EnvironmentVariables
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access     operations
WARNING: All illegal access operations will be denied in a future release
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 22s
5 actionable tasks: 2 executed, 3 up-to-date
08:38:36: Task execution finished ':infra:test --tests     "br.com.fulltime.fullarm.fullcam.integration.infra.consumer.MessageConsumerServiceImplIntegrationTest.deveProcessarRequisicao"'.

我对问题没有更多的想法。欢迎任何帮助。提前致谢

更新

我再次编写了测试以使其更简单。我用 Spring-context 和 env 的东西写了一个代码,而另一个没有 Spring-context 和 env 的东西。两者都没有工作。

所以,为了测试porpuse,我编写了一个简单的代码Thread#sleep(),猜猜看,两个测试都有效!

我认为原因是 RabbitMQ DefaultConsumer 实例化了一个用于消费消息的新线程,释放了主要测试线程并且它被停止了。由于主线程已停止,所有其他线程也已停止。

所以我认为我们在这里遇到了同步测试问题。

如果测试代码检查应该在执行中插入的数据库值但在检查时尚未处理,则可能会导致测试失败?

4

1 回答 1

0

首先 - 你没有启用任何日志记录,所以很难说到底发生了什么。

Spring Boot 适合您吗?它具有内置的日志记录支持。还是您只是故意使用 *context 库?

于 2020-05-14T13:13:15.560 回答