5

抱歉这个问题太笼统了,但是有人有一些关于如何使用嵌入的 kafka 执行生产者和消费者测试的教程或指南。我已经尝试了几个,但是有几个版本的依赖项并且没有一个真正有效=/

我正在使用 Spring Cloud Stream 卡夫卡。

4

1 回答 1

12

我们通常建议在测试中使用Test Binder,但如果您想使用嵌入式 kafka 服务器,可以这样做......

将此添加到您的 POM...

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

测试应用...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {

    public static void main(String[] args) {
        SpringApplication.run(So43330544Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public byte[] handle(byte[] in){
        return new String(in).toUpperCase().getBytes();
    }

}

应用程序.属性...

spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544

测试用例...

@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

    @Autowired
    private KafkaTemplate<byte[], byte[]> template;

    @Autowired
    private KafkaProperties properties;

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void testSendReceive() {
        template.send("so0544in", "foo".getBytes());
        Map<String, Object> configs = properties.buildConsumerProperties();
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
        Consumer<byte[], byte[]> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("so0544out"));
        ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
        consumer.commitSync();
        assertThat(records.count()).isEqualTo(1);
        assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
    }

}
于 2017-04-11T14:45:20.110 回答