我有一个基本的 Apex 流,其中一个 Kafka 输入运算符输入到 Couchbase 集合运算符中。Kafka 双方工作正常,并通过移除沙发操作员证明了这一点。我曾尝试切换 Malhar 库的版本,看看它是否在最新版本中被破坏。
我也在使用 Apex 的 DataTorrent 风格。
当我添加沙发操作员时,我得到以下异常
java.lang.RuntimeException: Error creating local cluster
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:124)
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:46)
at com.usaa.data.streams.apex.ApplicationTest.testApplication(ApplicationTest.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): org.codehaus.jackson.map.DeserializationConfig
Serialization trace:
_deserializationConfig (org.codehaus.jackson.map.ObjectMapper)
mapper (com.datatorrent.contrib.couchbase.CouchBaseJSONSerializer)
serializer (com.datatorrent.contrib.couchbase.CouchbasePOJOSetOperator)
at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:192)
at com.datatorrent.stram.plan.logical.LogicalPlan$OperatorMeta.readObject(LogicalPlan.java:898)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at com.datatorrent.stram.plan.logical.LogicalPlan.read(LogicalPlan.java:2326)
at com.datatorrent.stram.StramLocalCluster.cloneLogicalPlan(StramLocalCluster.java:323)
at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:285)
at org.apache.apex.engine.EmbeddedAppLauncherImpl.getController(EmbeddedAppLauncherImpl.java:122)
... 24 more
这是相关的应用程序代码:
KafkaSinglePortInputOperator kafkaInput = dag.addOperator("kafkaInput", KafkaSinglePortInputOperator.class);
kafkaInput.setTopics("testing2");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
kafkaInput.setConsumerProps(props);
kafkaInput.setClusters("localhost:9092");
CouchbasePOJOSetOperator couchOutput = dag.addOperator("couchOutput", CouchbasePOJOSetOperator.class);
CouchBaseWindowStore store = new CouchBaseWindowStore();
store.setBucket("default");
store.setUriString("localhost:8091,localhost:8091");
store.setUserConfig("");
store.setPasswordConfig("");
store.setPassword("");
couchOutput.setStore(store);
try{
store.connect();
}catch (Exception e){
e.printStackTrace();
}
CouchBaseJSONSerializer serializer = new CouchBaseJSONSerializer();
couchOutput.setSerializer(serializer);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("getValue()");
couchOutput.setExpressions(expressions);
dag.addStream("kafkaInput", kafkaInput.outputPort, couchOutput.input).setLocality(Locality.CONTAINER_LOCAL);