我使用这种方法编写了这个流应用程序:一个StreamConfigs
类:
@Configuration
@EnableKafka
public class StreamConfigs {
@Value(...)
private String applicationId;
@Value(...)
private String bootstrapServer;
@Bean
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBean streamBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig());
}
@Bean
public StreamsBuilderFactoryBean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
}
和这个其他类
@Component
public class MyStream {
@Value(value = ...)
private String inputTopicA;
@Value(value = ...)
private String inputTopicB;
@Value(value = ...)
private String outputTopic;
public MyStream() {}
public MyStream(String inputTopicA, String inputTopicB, String outputTopic) {
this.inputTopicA = inputTopicA;
this.inputTopicB = inputTopicB;
this.outputTopic = outputTopic;
}
@Bean
public KStream<String, String> kStream(StreamsBuilder streamBuilder) {
KTable<String, String> aKTable = streamBuilder.table(inputTopicA);
KTable<String, String> bKTable = streamBuilder.table(inputTopicB);
KTable<String, String> outputKTable = aKTable
.join(bKTable, (a, b) -> {...})
.toStream()
.groupByKey()
.reduce((aggregate, current) -> {...});
KStream<String, String> stream = outputKTable.toStream();
stream.to(outputTopic);
return stream;
}
}
在类的kStream()
方法中,MyStream
有我想要测试的应用程序的流逻辑。
所以我写了这个类进行测试,使用嵌入式kafka
@EmbeddedKafka(partitions = 1)
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyStreamApplicationTests {
@Value(...) private String topicA;
@Value(...) private String topicB;
@Value(...) private String outputTopic;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
private BlockingQueue<ConsumerRecord<String, String>> aTopicQueue;
private BlockingQueue<ConsumerRecord<String, String>> bTopicQueue;
private BlockingQueue<ConsumerRecord<String, String>> outputTopicQueue;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setupKafka() {
Map<String, Object> consumerConfigs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker)
);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(aTopic, bTopic, outputTopic);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
//init record queues
aTopiccQueue = new LinkedBlockingQueue<>();
bTopicQueue = new LinkedBlockingQueue<>();
outputTopicQueue = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) this::pushRecordIntoQueue);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getTopics().size() * embeddedKafkaBroker.getPartitionsPerTopic());
}
@Test
void testStream() throws Exception {
//test logic of the stream
}
}
这是testStream()
方法
@Test
void testStream() throws Exception {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
KafkaStreamsConfiguration configuration = new KafkaStreamsConfiguration(props);
//ERROR HERE!!!
StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getObject();
MyStream myStream = new MyStream(inputTopicA, inputTopicB, outputTopic);
myStream.kStream(builder);
//produce a record into inputTopicA
//produce a record into inputTopicB
ConsumerRecord<String, String> outputRecord = outputTopicQueue.take();
assertThat(outputRecord).isNotNull();
}
这不起作用,因为我在这行代码上遇到错误
StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getObject();
这是堆栈跟踪
org.springframework.beans.factory.FactoryBeanNotInitializedException: org.springframework.kafka.config.StreamsBuilderFactoryBean does not support circular references
at org.springframework.beans.factory.config.AbstractFactoryBean.getEarlySingletonInstance(AbstractFactoryBean.java:172)
at org.springframework.beans.factory.config.AbstractFactoryBean.getObject(AbstractFactoryBean.java:156)
at test.MyStreamApplicationTests.testStream(MyStreamApplicationTests.java:226)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
如何获取StreamBuilder
实例以便重用我kStream()
的测试方法和嵌入式代理?如果我尝试使用 来创建StreamBuilder
实例new
,我的测试会停留在等待我的机器上的活动代理。
你能帮助我吗?
编辑:我认为这解决了问题:我以这种方式编辑类的setupKafka()
方法MyStreamApplicationTest
:
@BeforeAll
void setupKafka() {
...
Properties streamConfig = new Properties();
streamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
streamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
streamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
MyStream myStream = new MergeStream(topicA, topicB, outputTopic);
myStream.process(streamsBuilder);
Topology topology = streamsBuilder.build();
new KafkaStreams(topology, streamConfig).start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getTopics().size() * embeddedKafkaBroker.getPartitionsPerTopic());
}