4

我有一个@KafkaListener消费者,想编写集成测试。事实是,在处理消息并且数据库中的某些状态发生更改后,似乎很难找到方法Consumer#consume完成执行以执行某些断言的确切时刻。

@Component
public class Consumer {

    private final Service service;

    @KafkaListener(id = "id", groupId = "group", topics = "topic", containerFactory = "factory")
    public void consume(@Payload Message message, Acknowledgment acknowledgment) {
        service.process(message);
        acknowledgment.acknowledge();
    }

}

测试

@SpringBootTest
@EmbeddedKafka
void class Testing {
    // some useful beans 

    @SpyBean
    private Consumer consumer;

    @Test
    void shoudConsume() throws Exception {
        Message message = new Message();
        String topic = "topic";
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        new KafkaProducer<String, String>(senderProps).send(new ProducerRecord<>(topic, message))
                .get(1L, TimeUnit.SECONDS);

        Mockito.verify(consumer, Mockito.timeout(1_000L)).consume(any(Message.class), any(Acknowledgment.class));
        // perform some asserts
    }

事实是,如果我将Thread.sleep(1000L)消费者进程消息和所有工作正常但使用 Mockito 它不起作用,因为所有断言在消费者完成其执行之前执行 method Consumer#consume

@KafkaListener是否有机会(使用侦听器等)捕捉消费者确认/完成消息处理以执行具有适当数据库状态的断言的时刻?需要进行集成测试以确保端到端功能正常工作。

我也尝试#verify检查@SpyBean private Service service, method Service#process,但它也不起作用。

4

3 回答 3

1

如果你想做一些更简单的事情,你可以检查这些选项:

  1. 使用Awaitility(当你只需要检查队列处理的效果时);
  @Test
  public void testMessageSendReceive_Awaitility() {
    producer.getMysource()
      .output()
      .send(MessageBuilder.withPayload("payload")
        .setHeader("type", "string")
        .build());

    waitAtMost(5, TimeUnit.SECONDS)
      .untilAsserted(() -> {
        then("payload").isEqualTo(
          EmbeddedKafkaAwaitilityTest.this.consumer.getReceivedMessage());
      });
  }
  • 编辑:如果您尝试这种方法,在不同的环境中可能会出现问题。看看这条评论:https ://stackoverflow.com/a/65480474/10746857 。可能会有所帮助。
  1. 使用 CountDownLach (例如,当您无法访问注入的侦听器时,作为没有@Autowired类的@SpringBootTest,这可能是个坏主意);
  @Test
  public void testMessageSendReceive() throws InterruptedException {
    producer.getMysource()
      .output()
      .send(MessageBuilder.withPayload("payload")
        .setHeader("type", "string")
        .build());

    latch.await();
    assertThat(consumer.getReceivedMessage()).isEqualTo("payload");
  }
  1. 您也可以创建一个BlockingQueue(但我认为这不是一个好的选择)。
BlockingQueue<ConsumerRecord<String, String>> consumerRecords;
consumerRecords = new LinkedBlockingQueue<>();
consumerRecords.poll(10, TimeUnit.SECONDS);

参考:

另一种方法(我没有测试):

于 2021-06-01T14:22:24.837 回答
0

我有同样的问题并通过创建以下类来解决它

@Primary
@Service
class MyCustomConsumerForTest(
    // Services
) : MyConsumer(// Services) {

    val latch = CountDownLatch(1)

    override fun listen(message: String) {
        super.listen(message)
        latch.countDown()
    }
}

和我的消费者

@Service
class MyConsumer(
    private val service
) {

    @KafkaListener(
        topics = ["topic"])
    fun listen(message: String) {
        //my process
        service.somefunction(foo)
        .......
    }
}

和我的测试

@EnableKafka
@SpringBootTest(classes = [MyCustomConsumerForTest::class,
    KafkaConfig::class])
@EmbeddedKafka(
    partitions = 1,
    controlledShutdown = false,
    brokerProperties = [
        "listeners=PLAINTEXT://localhost:9092",
        "port=9092"
    ])
@ActiveProfiles("test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class CampaignDataValidatorIntegrationTest {

    private val TOPIC_KAFKA = "topic"
 
    @Autowired
    private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker

    @Autowired
    private lateinit var listener: MyCustomConsumerForTest

    private lateinit var container: KafkaMessageListenerContainer<String, String>
    private lateinit var records: BlockingQueue<ConsumerRecord<String, String>>

    @SpyBean
    private lateinit var service: Service

    @BeforeAll
    fun setUp() {
        val configs = HashMap(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker))
        val consumerFactory = DefaultKafkaConsumerFactory(configs, StringDeserializer(), StringDeserializer())
        val containerProperties = ContainerProperties(TOPIC_KAFKA)
        container = KafkaMessageListenerContainer(consumerFactory, containerProperties)
        records = LinkedBlockingQueue()
        container.setupMessageListener(MessageListener<String, String> { records.add(it) })
        container.start()
        embeddedKafkaBroker.partitionsPerTopic.let { ContainerTestUtils.waitForAssignment(container, it) }
    }

    @AfterAll
    fun tearDown() {
        logger.info("Stop Listener")
        container.stop()
    }

    @Test
    fun kafkaSetup_withTopic_ensureSendMessageIsReceived() {
        // Arrange
        val configs = HashMap(KafkaTestUtils.producerProps(embeddedKafkaBroker))
        val producer = DefaultKafkaProducerFactory(configs, StringSerializer(), StringSerializer()).createProducer()

        // Act
        producer.send(ProducerRecord<String, String>(TOPIC_KAFKA, file))
        producer.flush()

        // Assert
        val singleRecord = records.poll(1, TimeUnit.MILLISECONDS)

        listener.latch.await(1000, TimeUnit.MILLISECONDS)

        assert(singleRecord != null)

        verify(service, times(1)).validate(anyOrNull())

        argumentCaptor<Foo>().apply {
            verify(service, times(1)).somefunction(capture())

            Assertions.assertEquals(1, allValues.size)
            Assertions.assertEquals("text", firstValue.text)
        }
    }
}

和我的 kafkaconfig

@Configuration
@EnableKafka
class KafkaConfig {

    @Value("\${kafka.bootstrap-servers}")
    private lateinit var bootstrapAddress: String

    @Value("\${kafka.consumer.group-id}")
    private lateinit var groupId: String

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        val props = HashMap<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {

        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.isMissingTopicsFatal = false
        return factory
    }
}
于 2020-08-06T23:52:30.633 回答
0

使用 next 方法,您可以轮询来自 2 个主题的事件,间隔为 N 秒。您必须有足够的时间调用 fetchEventFromOutputTopic。我将它与 kafka 流一起使用,它工作正常。

 private Map<String, List<Foo>> fetchEventFromOutputTopic(int seconds) throws Exception {
    Map<String, List<Foo>> result = new HashMap<>();
    result.put("topic-out-0", new ArrayList<>());
    result.put("topic-out-1", new ArrayList<>());

    int i = 0;
    while (i < seconds) {
        ConsumerRecords<String, Event> records = consumer.poll(Duration.ofSeconds(1));
        records.records("topic-out-0").forEach(record -> result.get("topic-out-0").add(record.value()));
        records.records("topic-out-1").forEach(record -> result.get("topic-out-1").add(record.value()));
        Thread.sleep(1000);
        i++;
    }
    return result;
}
于 2020-01-03T13:21:48.813 回答