我将拓扑从风暴移动到 flink。拓扑已简化为KafkaSpout->Bolt
. 螺栓只是计算数据包而不是尝试解码它们。
编译后的 .jar 提交到 flink viaflink -c <entry point> <path to .jar>
并遇到以下错误:
java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
... 3 more
我的问题:
- 我是否错过了KafkaSpout的配置步骤?这在香草风暴中使用时有效。
- 我需要使用特定版本的风暴库吗?我的构建中包含 0.9.4。
- 还有什么我可能错过的吗?
我应该使用storm KafkaSpout还是使用flink KafkaSource编写自己的内容会更好?
编辑:
以下是相关的代码:
拓扑:
BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");
在里面:
FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());
螺栓基于BaseRichBolt。execute() fn 只记录要调试的任何数据包的存在。里面没有其他代码。