问题标签 [apache-flink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
654 浏览

java - kafka ->storm -> flink : 意外的块数据

我将拓扑从风暴移动到 flink。拓扑已简化为KafkaSpout->Bolt. 螺栓只是计算数据包而不是尝试解码它们。

编译后的 .jar 提交到 flink viaflink -c <entry point> <path to .jar>并遇到以下错误:

我的问题:

  1. 我是否错过了KafkaSpout的配置步骤?这在香草风暴中使用时有效。
  2. 我需要使用特定版本的风暴库吗?我的构建中包含 0.9.4。
  3. 还有什么我可能错过的吗?

我应该使用storm KafkaSpout还是使用flink KafkaSource编写自己的内容会更好?


编辑:

以下是相关的代码:

拓扑:

在里面:

螺栓基于BaseRichBoltexecute() fn 只记录要调试的任何数据包的存在。里面没有其他代码。

0 投票
0 回答
1160 浏览

apache-kafka - apache flink kafka 流异常 java.lang.ClassNotFoundException: kafka.javaapi.consumer.SimpleConsumer

尝试运行 Apache Flink kafka 流式传输示例并获得异常-

我写了一个非常基本的例子 -

0 投票
1 回答
2459 浏览

java - flink:scala版本冲突?

我试图在 IntelliJ 中从这里编译 kafka 示例。在对依赖关系大惊小怪之后,我遇到了这个我似乎无法解决的问题:

我遇到了一些概念,表明这是 scala 版本的问题。当前图书馆列表:

关于我误入歧途的地方的建议?

0 投票
1 回答
968 浏览

apache-kafka - flink:从kafka获取字节[]数据

我使用flink-1.0-SNAPSHOT来使用来自 kafka 的数据。数据以Snappy 压缩字节 [] 的形式传入,传递给 thrift 供以后使用。

当我使用flink检索数据时,它会以某种方式损坏或处理不当,以至于无法解压缩。代码源自示例,如下所示:

isValidCompressedBuffer每次都返回 false

当通过其他途径使用时,数据被认为是好的。

我错过了什么?


解决方案:

我发布这个是因为我找不到任何使用RawSchema.

0 投票
1 回答
1326 浏览

apache-flink - flink:Flink Shell 抛出 NullPointerException

  1. 我正在使用 Flink Interactive Shell 来执行 WordCount。它适用于 10MB 的文件大小。但是对于 100MB 的文件,shell 会抛出 NullPointerException:

我在 linux 系统(16MB RAM)上工作。那里可能有什么问题?

我的代码(改编自https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/scala_api_quickstart.html):

  1. 我还注意到,flink 只在一个内核上执行程序。在使用 env.getConfig.setParallelism(4) 设置并行度并再次运行程序后,发生了另一个异常:

第1部分:

第2部分:

这是否意味着 taskmanager.numberOfTaskSlots?在我的 flink-conf.yaml 中,此键设置为 4。但是如何在 shell 中设置它?

0 投票
2 回答
1185 浏览

apache-flink - Apache Flink DataStream API 没有 mapPartition 转换

Spark DStream 有mapPartitionAPI,而 Flink DataStreamAPI 没有。有没有人可以帮忙解释一下原因。我想做的是reduceByKey在 Flink 上实现一个类似于 Spark 的 API。

0 投票
1 回答
6650 浏览

java - 如何根据数据将一个数据流输出到不同的输出?

在 Apache Flink 中,我有一个元组流。让我们假设一个非常简单的Tuple1<String>. 元组可以在其值字段中具有任意值(例如“P1”、“P2”等)。可能值的集合是有限的,但我事先不知道完整的集合(因此可能存在“P362”)。我想根据元组内部的值将该元组写入某个输出位置。因此,例如,我希望具有以下文件结构:

  • /output/P1
  • /output/P2

在文档中,我只发现了写入我事先知道的位置(例如stream.writeCsv("/output/somewhere"))的可能性,但没有办法让数据的内容决定数据的实际结束位置。

我在文档中阅读了有关输出拆分的信息,但这似乎没有提供一种将输出重定向到不同目的地的方法,就像我想要的那样(或者我只是不明白这是如何工作的)。

这可以用 Flink API 来完成吗,如果可以,怎么做?如果没有,是否有第三方图书馆可以做到这一点,还是我必须自己构建这样的东西?

0 投票
1 回答
966 浏览

java - Flink Streaming java.lang.Exception:无法加载任务的可调用类

我正在尝试在本地运行基本的 Flink 流式传输作业(在 java 中)。当我使用 eclipse 运行我的应用程序时,它就像一个魅力。但是当我使用 Flink 命令行界面运行时,我得到以下异常

我正在运行 Flink-Kafka 集成示例: data-artisans

0 投票
1 回答
790 浏览

java - Apache Flink fromCollection java.lang.IllegalStateException:未读块数据

我正在使用 Scala 和 Flink 1.0-SNAPSHOT 在 DataSet 上执行 leftOuterJoin,但出现以下异常:

我使用一个简单的 Scala 案例类作为 DataSet 的类型:

我使用以下方法生成案例类实例:

我初始化环境并通过以下方式创建 DataSet[RawValue]:

我怀疑是序列化问题导致了错误,我正在使用 Scala 2.10.5 和 Java 7 系统库来编译项目。我正在使用 Eclipse,该项目是由示例项目生成脚本生成的。

任何有关解决问题的帮助或提示将不胜感激:-) 谢谢,丹尼尔

0 投票
1 回答
608 浏览

perf - Flinks执行过程的PID怎么查?

我想用性能计数器(perf)来衡量 flinks 的性能。我的代码:

我知道jobmanager的PID。我还可以看到在执行期间运行 execute() 命令的线程(链数据源)的 TID。但是对于每次执行,TID 都会发生变化,因此它不会与 TID 一起使用。有没有办法找出运行execute()命令的jobmanagers子进程的PID?rdd 的每个转换(例如 flatMap)是否有不同的子进程?如果是这样,是否有可能找出它们不同的 PID?