6

我们使用 Kafka 流已经有一段时间了,但从来没有编写测试来覆盖我们的拓扑。我们决定试一试并使用流库提供的拓扑测试驱动程序。不幸的是,我们遇到了无法解决的问题。这是我们生产代码的虚拟版本,具有相同的语义。

它连接了包含 2 种类型文档的 2 个主题。我们的目标是将文档汇总到每个人的“文件夹”中,其中使用来自不同文档的信息。在运行测试时,我们遇到了一个异常,这是由从 PersonKey 到 DocumentA 的错误转换引起的。在下面,您可以看到测试设置、数据结构的模式和异常的堆栈跟踪。

package com.zenjob.me.indexer.application.domain;

import com.demo.DocumentA;
import com.demo.DocumentB;
import com.demo.DocumentFolder;
import com.demo.PersonKey;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@SuppressWarnings("SimplifiableJUnitAssertion")
@Log4j2
class DemoTest {

    private SchemaRegistryClient schemaRegistryClient     = new MockSchemaRegistryClient();
    private String               documentATopicName       = "documentATopicName";
    private String               documentBTopicName       = "documentBTopicName";
    private String               documentFoldersTopicName = "documentFoldersTopicName";

    private <T extends SpecificRecord> SpecificAvroSerde<T> getSerde(boolean isForKey) {
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wat-ever-url-anyway-it-is-mocked");

        SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);
        serde.configure(serdeConfig, isForKey);
        return serde;
    }

    @Test
    void test() {

        StreamsBuilder builder = new StreamsBuilder();

        SpecificAvroSerde<PersonKey> keySerde = this.getSerde(true);
        SpecificAvroSerde<DocumentA> documentASerde = this.getSerde(false);
        SpecificAvroSerde<DocumentB> documentBSerde = this.getSerde(false);
        SpecificAvroSerde<DocumentFolder> documentFolderSerde = this.getSerde(false);

        KTable<PersonKey, DocumentA> docATable = builder.table(documentATopicName, Consumed.with(keySerde, documentASerde), Materialized.with(keySerde, documentASerde));
        KTable<PersonKey, DocumentB> docBTable = builder.table(documentBTopicName, Consumed.with(keySerde, documentBSerde), Materialized.with(keySerde, documentBSerde));

        docATable
                .mapValues(documentA ->
                                DocumentFolder.newBuilder()
                                        .setPropertyA(documentA.getPropertyA())
                                        .build(),
                        Materialized.with(keySerde, documentFolderSerde))
                .leftJoin(docBTable,
                        (folder, documentB) -> {
                            if (documentB == null) {
                                return folder;
                            }
                            return DocumentFolder.newBuilder(folder)
                                    .setPropertyB(documentB.getPropertyB())
                                    .build();
                        },
                        Materialized.with(keySerde, documentFolderSerde)
                )
                .toStream()
                .to(documentFoldersTopicName, Produced.with(keySerde, documentFolderSerde));

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

        ConsumerRecordFactory<PersonKey, DocumentA> documentAConsumerRecordFactory = new ConsumerRecordFactory<>(documentATopicName, keySerde.serializer(), documentASerde.serializer());
        ConsumerRecordFactory<PersonKey, DocumentB> documentBConsumerRecordFactory = new ConsumerRecordFactory<>(documentBTopicName, keySerde.serializer(), documentBSerde.serializer());

        // When
        String personId = "person-id";
        PersonKey key = PersonKey.newBuilder().setPropertyA(personId).build();
        DocumentA documentA = DocumentA.newBuilder().setPropertyA("docA-propA").build();
        DocumentB documentB = DocumentB.newBuilder().setPropertyB("docB-propB").build();

        driver.pipeInput(documentAConsumerRecordFactory.create(key, documentA));
        driver.pipeInput(documentBConsumerRecordFactory.create(key, documentB));

        ProducerRecord<PersonKey, DocumentFolder> output1 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());
        ProducerRecord<PersonKey, DocumentFolder> output2 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());

        log.info(output1);
        log.info(output2);

        Assert.assertEquals(documentA.getPropertyA(), output1.value().getPropertyA());
        Assert.assertEquals(null, output1.value().getPropertyB());

        Assert.assertEquals(documentA.getPropertyA(), output2.value().getPropertyA());
        Assert.assertEquals(documentB.getPropertyB(), output2.value().getPropertyB());

        driver.close();
    }
}

文件A

{
  "type" : "record",
  "name" : "DocumentA",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    }
  ]
}

文件B

{
  "type" : "record",
  "name" : "DocumentB",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyB",
      "type" : "string"
    }
  ]
}

文件夹

{
  "type" : "record",
  "name" : "DocumentFolder",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    },
    {
      "name" : "propertyB",
      "type" : [
        "null",
        "string"
      ],
      "default" : null
    }
  ]
}

个人密钥

{
  "type" : "record",
  "name" : "PersonKey",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    }
  ]
}

例外

task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
    at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:421)
    at com.zenjob.me.indexer.application.domain.DemoTest.test(DemoTest.java:97)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:92)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$100(JUnitPlatformTestClassProcessor.java:77)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:73)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.demo.PersonKey cannot be cast to com.demo.DocumentA
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues.computeValue(KTableMapValues.java:78)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues.access$400(KTableMapValues.java:27)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:117)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:97)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:237)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
    ... 68 more

Kafka v2.3.0 Avro v1.9.1' KafkaAvroSerde v5.2.1'

更新

我尝试使用处理器 API 重写拓扑,但没有任何运气。之后我尝试使用真正的模式注册表并且测试通过了,所以看起来问题出在 MockSchemaRegistry 上。当我找到原因时将发布另一个更新。

更新 2

我设法让它与模拟模式注册表一起工作,但我必须手动注册所有模式,包括状态存储和内部状态存储更改日志主题的模式

4

0 回答 0