3

我有一个基本的 Apex 流,其中一个 Kafka 输入运算符输入到 Couchbase 集合运算符中。Kafka 双方工作正常,并通过移除沙发操作员证明了这一点。我曾尝试切换 Malhar 库的版本,看看它是否在最新版本中被破坏。

我也在使用 Apex 的 DataTorrent 风格。

当我添加沙发操作员时,我得到以下异常

java.lang.RuntimeException: Error creating local cluster

at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:124)
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:46)
at com.usaa.data.streams.apex.ApplicationTest.testApplication(ApplicationTest.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.codehaus.jackson.map.DeserializationConfig
Serialization trace:
_deserializationConfig (org.codehaus.jackson.map.ObjectMapper)
mapper (com.datatorrent.contrib.couchbase.CouchBaseJSONSerializer)
serializer (com.datatorrent.contrib.couchbase.CouchbasePOJOSetOperator)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:192)
at com.datatorrent.stram.plan.logical.LogicalPlan$OperatorMeta.readObject(LogicalPlan.java:898)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at com.datatorrent.stram.plan.logical.LogicalPlan.read(LogicalPlan.java:2326)
at com.datatorrent.stram.StramLocalCluster.cloneLogicalPlan(StramLocalCluster.java:323)
at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:285)
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:122)
... 24 more

这是相关的应用程序代码:

KafkaSinglePortInputOperator kafkaInput = dag.addOperator("kafkaInput", KafkaSinglePortInputOperator.class);
    kafkaInput.setTopics("testing2");
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("enable.auto.commit", "true");
    kafkaInput.setConsumerProps(props);
    kafkaInput.setClusters("localhost:9092");


    CouchbasePOJOSetOperator couchOutput = dag.addOperator("couchOutput", CouchbasePOJOSetOperator.class);

    CouchBaseWindowStore store = new CouchBaseWindowStore();
    store.setBucket("default");
    store.setUriString("localhost:8091,localhost:8091");
    store.setUserConfig("");
    store.setPasswordConfig("");
    store.setPassword("");
    couchOutput.setStore(store);
    try{
        store.connect();
    }catch (Exception e){
        e.printStackTrace();
    }

    CouchBaseJSONSerializer serializer = new CouchBaseJSONSerializer();
    couchOutput.setSerializer(serializer);
    ArrayList<String> expressions = new ArrayList<String>();
    expressions.add("getValue()");
    couchOutput.setExpressions(expressions);


    dag.addStream("kafkaInput", kafkaInput.outputPort, couchOutput.input).setLocality(Locality.CONTAINER_LOCAL);
4

0 回答 0