2

我将拓扑从风暴移动到 flink。拓扑已简化为KafkaSpout->Bolt. 螺栓只是计算数据包而不是尝试解码它们。

编译后的 .jar 提交到 flink viaflink -c <entry point> <path to .jar>并遇到以下错误:

java.lang.Exception: Call to registerInputOutput() of invokable failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
        ... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
        ... 3 more

我的问题:

  1. 我是否错过了KafkaSpout的配置步骤?这在香草风暴中使用时有效。
  2. 我需要使用特定版本的风暴库吗?我的构建中包含 0.9.4。
  3. 还有什么我可能错过的吗?

我应该使用storm KafkaSpout还是使用flink KafkaSource编写自己的内容会更好?


编辑:

以下是相关的代码:

拓扑:

BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));

SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");

在里面:

FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());

螺栓基于BaseRichBoltexecute() fn 只记录要调试的任何数据包的存在。里面没有其他代码。

4

1 回答 1

1

我刚看过这个。现在有一个问题,但我让它在本地工作。您可以将此热修复应用到您的代码并自己构建兼容层。

  1. KafkaSpout注册指标。但是,兼容性层目前不支持度量标准。您需要删除异常FlinkTopologyContext.registerMetric(...)并返回null。(已经有一个开放的 PR 致力于指标的集成,因此我不想将此热修复推送到主分支)
  2. 此外,您需要手动向查询中添加一些配置参数:

我只是在这里编了一些值:

Config c = new Config();
List<String> zkServers = new ArrayList<String>();
zkServers.add("localhost");
c.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
c.put(Config.STORM_ZOOKEEPER_PORT, 2181);
c.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
c.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
  1. 您需要在项目中添加一些额外的依赖项:

此外,flink-storm您还需要:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>0.9.4</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.1.1</version>
</dependency>

这对我有用,使用Kafka_2.10-0.8.1.1FlinkLocalCluster在 Eclipse 中执行。

它也适用于通过bin/start-local-streaming.sh. 为此,使用bin/flink run命令,您需要使用FlinkSubmitter而不是FlinkLocalCluster. 此外,您的 jar 需要以下依赖项:

<include>org.apache.storm:storm-kafka</include>
<include>org.apache.kafka:kafka_2.10</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>com.google.guava:guava</include>
<include>com.yammer.metrics:metrics-core</include>
于 2015-10-29T15:46:19.973 回答