0

我使用这种方法编写了这个流应用程序:一个StreamConfigs类:

@Configuration
@EnableKafka
public class StreamConfigs {
  @Value(...)
  private String applicationId;
    
  @Value(...)
  private String bootstrapServer;
    
  @Bean
  public KafkaStreamsConfiguration streamsConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    return new KafkaStreamsConfiguration(props);
  }
    
  @Bean
  public StreamsBuilderFactoryBean streamBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig());
  }
    
  @Bean
  public StreamsBuilderFactoryBean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
  }
}

和这个其他类

@Component
public class MyStream {
  @Value(value = ...)
  private String inputTopicA;
    
  @Value(value = ...)
  private String inputTopicB;
    
  @Value(value = ...)
  private String outputTopic;
    
  public MyStream() {}
    
  public MyStream(String inputTopicA, String inputTopicB, String outputTopic) {
    this.inputTopicA = inputTopicA;
    this.inputTopicB = inputTopicB;
    this.outputTopic = outputTopic;
  }
    
  @Bean
  public KStream<String, String> kStream(StreamsBuilder streamBuilder) {
    KTable<String, String> aKTable = streamBuilder.table(inputTopicA);
    KTable<String, String> bKTable = streamBuilder.table(inputTopicB);    
    KTable<String, String> outputKTable = aKTable
        .join(bKTable, (a, b) -> {...})
        .toStream()
        .groupByKey()
        .reduce((aggregate, current) -> {...});
    
    KStream<String, String> stream = outputKTable.toStream();
    stream.to(outputTopic);
    
    return stream;
  }
}

在类的kStream()方法中,MyStream有我想要测试的应用程序的流逻辑。

所以我写了这个类进行测试,使用嵌入式kafka

@EmbeddedKafka(partitions = 1)
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyStreamApplicationTests {
  @Value(...) private String topicA;
  @Value(...) private String topicB;
  @Value(...) private String outputTopic;

  @Autowired
  private EmbeddedKafkaBroker embeddedKafkaBroker;

  private BlockingQueue<ConsumerRecord<String, String>> aTopicQueue;
  private BlockingQueue<ConsumerRecord<String, String>> bTopicQueue;
  private BlockingQueue<ConsumerRecord<String, String>> outputTopicQueue;

  KafkaMessageListenerContainer<String, String> container;

  @BeforeAll
  void setupKafka() {
    Map<String, Object> consumerConfigs = new HashMap<>(
      KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker)
    );

    consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    DefaultKafkaConsumerFactory<String, String> consumerFactory =
            new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new StringDeserializer());

    ContainerProperties containerProperties = new ContainerProperties(aTopic, bTopic, outputTopic);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

    //init record queues
    aTopiccQueue = new LinkedBlockingQueue<>();
    bTopicQueue = new LinkedBlockingQueue<>();
    outputTopicQueue = new LinkedBlockingQueue<>();

    container.setupMessageListener((MessageListener<String, String>) this::pushRecordIntoQueue);
    container.start();

    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getTopics().size() * embeddedKafkaBroker.getPartitionsPerTopic());
  }

  @Test
  void testStream() throws Exception {
    //test logic of the stream
  }

}

这是testStream()方法

 @Test
 void testStream() throws Exception {
   Map<String, Object> props = new HashMap<>();
   props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
   props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
   props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
   props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
   props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        
   KafkaStreamsConfiguration configuration = new KafkaStreamsConfiguration(props);
   
   //ERROR HERE!!!
   StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getObject();

   MyStream myStream = new MyStream(inputTopicA, inputTopicB, outputTopic);
   myStream.kStream(builder);

   //produce a record into inputTopicA
   //produce a record into inputTopicB

   ConsumerRecord<String, String> outputRecord = outputTopicQueue.take();

   assertThat(outputRecord).isNotNull();
}

这不起作用,因为我在这行代码上遇到错误

StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getObject();

这是堆栈跟踪

org.springframework.beans.factory.FactoryBeanNotInitializedException: org.springframework.kafka.config.StreamsBuilderFactoryBean does not support circular references

    at org.springframework.beans.factory.config.AbstractFactoryBean.getEarlySingletonInstance(AbstractFactoryBean.java:172)
    at org.springframework.beans.factory.config.AbstractFactoryBean.getObject(AbstractFactoryBean.java:156)
    at test.MyStreamApplicationTests.testStream(MyStreamApplicationTests.java:226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

如何获取StreamBuilder实例以便重用我kStream()的测试方法和嵌入式代理?如果我尝试使用 来创建StreamBuilder实例new,我的测试会停留在等待我的机器上的活动代理。

你能帮助我吗?

编辑:我认为这解决了问题:我以这种方式编辑类的setupKafka()方法MyStreamApplicationTest

@BeforeAll
void setupKafka() {
  ...
  Properties streamConfig = new Properties();
  streamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
  streamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
  streamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  streamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  streamConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

  StreamsBuilder streamsBuilder = new StreamsBuilder();
  MyStream myStream = new MergeStream(topicA, topicB, outputTopic);
  myStream.process(streamsBuilder);
  Topology topology = streamsBuilder.build();
  new KafkaStreams(topology, streamConfig).start();

  ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getTopics().size() * embeddedKafkaBroker.getPartitionsPerTopic());
}
4

1 回答 1

0

你不能这样做 - Spring 需要管理它。

只需添加bootstrapServersProperty = ...@EmbeddedKafka注释中,然后将bootstrapServer您的字段StreamConfigs设置为嵌入式代理的地址。将其设置为您的@Value财产。

然后您可以简单地@AutowiredStreamBuilder工厂创建的 bean 放入您的测试中。

此外,您只需要其中之一

  @Bean
  public StreamsBuilderFactoryBean streamBuilder() {
    return new StreamsBuilderFactoryBean(streamsConfig());
  }
    
  @Bean
  public StreamsBuilderFactoryBean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
  }
于 2021-02-01T16:19:00.553 回答