2

我试图在 Flink 中执行的部分代码:

val pages = env.readCsvFile[(Long)]("/home/ppi.csv",
   fieldDelimiter = "\t", includedFields = Array(1))

我想pages用于其他目的,但是当我编译时,Flink 会向我抛出一条错误消息

线程“主”java.lang.ClassCastException 中的异常:
org.apache.flink.api.common.typeinfo.IntegerTypeInfo 无法转换为
org.apache.flink.api.java.typeutils.PojoTypeInfo

顺便说一句,我使用的是 Flink 的 0.9 快照版本。任何朝着正确方向的帮助都将受到高度赞赏。

4

1 回答 1

3

如果您从 CSV 文件中读取,则返回类型将是包含所有读取字段的 Scala 元组。在您的示例中,您只读取一个将给出 Tuple1 的字段。这是您尝试用“Long”周围的括号指定的内容:

readCsvFile[(Long)]

在 Scala 中,您只能使用括号指定具有两个或多个字段的元组。所以你需要改写

readCsvFile[Tuple1[Long]]

抛出异常是因为,Flink 的 CSVInputFormat 试图将所有非 Tuple 类型解释为 Pojo 类型。

于 2015-07-08T12:12:59.917 回答