1

我正在尝试使用 spring boot 对 kafka 的使用进行单元测试,但输入通道有问题。以下是我正在做的事情的摘录。

public interface MyCustomStreamBinding{
   @Input
   SubscribableChannel consumeChannel();

   @Output
   MessageChannel produceChannel();
}

@EnableBinding(value = { Source.class, MyCustomStreamBinding.class })
public class StreamConfiguration {
...
}

@Service
public class MyService {

  private final MyCustomStreamBinding streamBinding;
  public MyService(MyCustomStreamBinding streamBinding) {
    this.streamBinding = streamBinding;
  }

  public void sendMessage() {
    streamBinding.produceChannel().send(new SomeObject);
  }

  @StreamListener("consumeChannel")
  public void consumeChannel(SomeObject payload){
    // do processing of payload
  }
}

然后在我的测试用例中我有

@SpringBootTest(classes = {MyApp.class})
class MyServiceTest {
  private MyService myService;

  @Autowired
  private MyCustomStreamBinding streamBinding;
  @Autowired
  private MessageCollector messageCollector;

  @BeforeEach
  public void setup(){
    myService = new MyService(streamBinding);
  }

  @Test
  public void TestMessaging(){
   myService.sendMessage();

   Message<?> m = messageCollector.forChannel(streamBinding.produceChannel()).poll();
   assertThat(m.getPayload(), equalTo(new SomeObject()));
  }
}

如何测试 consumeChannel 以及它是否按预期实际执行了处理?

4

1 回答 1

0

在这里,我有一个示例,它由 2 个侦听器组成,用于使用数据和生成数据。您可以与您一起@SpringBootTest使用@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"}). 然后使用JUnit5 @ExtendWith(SpringExtension.class),使用嵌入式kafka集群@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)

以这个简单的服务为例,它在侦听器上接收数据process-in-0,将其转换为大写并在侦听器上发出新数据process-out-0

public interface KafkaListenerBinding {
    @Input("process-in-0")
    KStream<String, String> inputStream();

    @Output("process-out-0")
    KStream<String, String> outStream();
}

@Service
@EnableBinding(KafkaListenerBinding.class)
public class KafkaListenerService {

    @StreamListener("process-in-0")
    @SendTo("process-out-0")
    public KStream<String, String> transformToUpperCase(KStream<String, String> input) {
        input.peek((k, v) -> log.info("Received Input: {}", v));
        return input.mapValues(v -> v.toUpperCase());
    }
}

使用嵌入式 kafka 集群对其进行测试。请注意,实际的 kafka claster 不一定可用。然后您可以使用该属性brokers: ${spring.embedded.kafka.brokers}

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"server.port=0"})
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(topics = {"output-topic"}, partitions = 1)
@TestPropertySource(properties = {
        "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.admin.properties.bootstrap.servers=${spring.embedded.kafka.brokers}"
})
public class KafkaListenerServiceTest {

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;
    @SpyBean
    KafkaListenerService kafkaListenerServiceSpy;
    private Consumer<String, String> consumer;

    @BeforeEach
    public void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("group1", "true", embeddedKafkaBroker));
        consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
        embeddedKafkaBroker.consumeFromAllEmbeddedTopics(consumer);
    }

    @AfterEach
    public void tearDown() {
        consumer.close();
    }

    @Test
    public void SimpleProcessorApplicationTest() throws ExecutionException, InterruptedException {
        Set<String> actualResultSet = new HashSet<>();
        Set<String> expectedResultSet = new HashSet<>();
        expectedResultSet.add("HELLO1");
        expectedResultSet.add("HELLO2");

        Map<String, Object> senderProps = producerProps(embeddedKafkaBroker);
        DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        try {
            KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
            template.setDefaultTopic("input-topic");

            template.sendDefault("hello1").get();
            verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));

            template.sendDefault("hello2").get();
            verify(kafkaListenerServiceSpy, times(1)).transformToUpperCase(isA(KStream.class));

            int receivedAll = 0;
            while (receivedAll < 2) {
                ConsumerRecords<String, String> cr = getRecords(consumer);
                receivedAll = receivedAll + cr.count();
                cr.iterator().forEachRemaining(r -> {
                    System.out.println("result: " + r.value());
                    actualResultSet.add(r.value());
                });
            }

            assertThat(actualResultSet.equals(expectedResultSet)).isTrue();
        } finally {
            pf.destroy();
        }
    }
}

并像这样配置您的application.yml文件,并确保不使用以下方式启用架构注册表schema.registry.url: not-used

spring:
  kafka:
    consumer:
      group-id: group-01
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: input-topic
        process-out-0:
          destination: output-topic
        notification-input-channel:
          destination: pos-topic
      kafka:
        streams:
          binder:
            brokers: ${spring.embedded.kafka.brokers}
            configuration:
              schema.registry.url: not-used
              commit.interval.ms: 100
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          bindings:
            process-in-0:
              consumer:
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
            process-out-0:
              producer:
                valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
---

于 2021-04-05T12:13:24.783 回答