CoGroupByKey 问题
资料说明。
我有两个数据集。
- 记录- 第一个,每个
(key,day)
. 对于测试,我使用 2-3 个密钥和 5-10 天的数据。我的目标是 1000+ 键。每条记录都包含键、时间戳(以微秒为单位)和一些其他数据。 - 配置- 第二个,相当小。它描述了时间的关键,例如,您可以将其视为元组列表:
(key, start date, end date, description)
。
为了探索,我将数据编码为以长度为前缀的协议缓冲区二进制编码消息的文件。此外,这些文件使用 gzip 打包。数据按日期分片。每个文件大约 10MB。
管道
我使用 Apache Beam 来表达管道。
- 首先,我向两个数据集添加键。对于 Records 数据集,它是
(key, day rounded timestamp)
. 对于 Configs,key 是,其中 day 是和(key, day)
之间的每个时间戳值(指向午夜)。start date
end date
- 使用 CoGroupByKey 合并数据集。
作为我使用org.apache.flink.api.java.tuple.Tuple2
来自Tuple2Coder
repo github.com/orian/tuple-coder的密钥类型。
问题
如果 Records 数据集很小,比如 5 天,那么一切似乎都很好(检查 normal_run.log)。
INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
当我运行管道超过 10 天时,我遇到一个错误,指出某些记录没有配置 (wrong_run.log)。
INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
然后我添加了一些额外的日志消息:
(a.java:144) - 68643 items for KeyValue3 on: 1462665600000000
(a.java:140) - no items for KeyValue3 on: 1463184000000000
(a.java:123) - missing for KeyValue3 on: 1462924800000000
(a.java:142) - 753707 items for KeyValue3 on: 1462924800000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1462752000000000
(a.java:142) - 749901 items for KeyValue3 on: 1462752000000000 marked as no-loc
(a.java:144) - 754578 items for KeyValue3 on: 1462406400000000
(a.java:144) - 751574 items for KeyValue3 on: 1463011200000000
(a.java:123) - missing for KeyValue3 on: 1462665600000000
(a.java:142) - 754758 items for KeyValue3 on: 1462665600000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1463184000000000
(a.java:142) - 694372 items for KeyValue3 on: 1463184000000000 marked as no-loc
您可以发现在第一行中,针对 KeyValue3 和时间 1462665600000000 处理了 68643 个项目。
稍后在第 9 行中,该操作似乎再次处理了相同的键,但它报告这些记录没有可用的配置。
第 10 行通知它们已被标记为 no-loc。
第 2 行表示 KeyValue3 和时间 1463184000000000 没有项目,但在第 11 行中,您可以读到此 (key,day) 对的项目是稍后处理的,并且它们缺少配置。
一些线索
在其中一次探索运行期间,我遇到了一个异常(exception_throw.log)。
05/26/2016 03:49:49 GroupReduce (GroupReduce at GroupByKey)(1/5) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.sort.LargeRecordHandler.finishWriteAndSortKeys(LargeRecordHandler.java:263)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1409)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IllegalAccessError: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:122)
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:297)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:706)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
解决方法(经过更多测试,不起作用,继续使用 Tuple2)
我已经从使用 Tuple2 切换到了 Protocol Buffer 消息:
message KeyDay {
optional ByteString key = 1;
optional int64 timestamp_usec = 2;
}
但是使用Tuple2.of()
起来比:KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build()
.
当切换到一个键是从 protobuf.Message 派生的类时,问题消失了 10-15 天(因此数据大小是 Tuple2 的问题),但是将数据大小增加到 20 天表明它就在那里。