6

我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String test_dummy = "dummy:1234";

@Before
public void setup() {
    Topology topology = new Topology();

    topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);

    topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);

    topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
            , ProcessRouter.class.getSimpleName());

    topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
            , WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
            , WorkforceVisit.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);       
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());

    testDriver = new TopologyTestDriver(topology, properties);

    //setup test topics
    inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
    outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());

}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    Object b =  outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);

}

我使用EventSerde类来序列化和反序列化值。

当我运行此代码时,它会java.util.NoSuchElementException: Uninitialized topic: processed_events通过以下堆栈跟踪给出错误:

java.util.NoSuchElementException: Uninitialized topic: processed_events

at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

如您所见,我已经初始化了输入和输出主题。我还调试了代码,当我从输出主题读取值时发生错误

outputTopic.readValue();

我不明白我还应该做什么来初始化 outputTopic。谁能帮我解决这个问题?

我正在使用 apache kafka-streams-test-utils 2.4.0 和 kafka-streams 2.4.0

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>2.4.0</version>
        <scope>test</scope>
    </dependency>
4

1 回答 1

8

为避免/克服此异常,您需要在尝试读取之前检查输出主题是否为空。

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    assert(outputTopic.isEmpty(), false);
    Object b = outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);
}
于 2020-02-05T17:46:00.217 回答