2

我尝试联合一些 flink 数据集。它们包含在 Seq. 以下是产生问题的代码

case class clickZap ( date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ... 

val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]]  = for (i  <- Range (0,min_n))
      yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all =  ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)

我得到的是一个

线程“主”org.apache.flink.api.common.InvalidProgramException 中的异常:无法联合不同类型的输入。Input1=scala.Tuple2(_1: Integer, _2: Option[scala.Tuple4(_1: GenericType [java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)]), input2=scala.Tuple2( _1:整数,_2:选项[scala.Tuple4(_1:GenericType[java.time.LocalDateTime],_2:字符串,_3:整数,_4:布尔值)])

我错过了什么?类型没有区别,是吗?工会运营商应该是便宜的,所以绕过这个问题似乎没有吸引力。我提供了前两行代码作为 DataSet 中数据类型相同的参数。我使用了 flink 版本 0.9.0 和 0.9.1

4

1 回答 1

2

问题是 Flink 自己的打字系统中的一个错误。OptionTypeInfo代表 Scala的Option没有定义正确的equals方法。因此,两者OptionTypeInfos没有被检测到相等。

我创建了一个JIRA 问题并打开了一个拉取请求来解决这个问题。拉取请求应在两天内合并。如果您然后使用最新0.10-SNAPSHOT版本,那么您的问题应该得到解决。

于 2015-09-07T23:17:04.107 回答