1

CoGroupByKey 问题

资料说明。

我有两个数据集。

  • 记录- 第一个,每个(key,day). 对于测试,我使用 2-3 个密钥和 5-10 天的数据。我的目标是 1000+ 键。每条记录都包含键、时间戳(以微秒为单位)和一些其他数据。
  • 配置- 第二个,相当小。它描述了时间的关键,例如,您可以将其视为元组列表:(key, start date, end date, description)

为了探索,我将数据编码为以长度为前缀的协议缓冲区二进制编码消息的文件。此外,这些文件使用 gzip 打包。数据按日期分片。每个文件大约 10MB。

管道

我使用 Apache Beam 来表达管道。

  1. 首先,我向两个数据集添加键。对于 Records 数据集,它是(key, day rounded timestamp). 对于 Configs,key 是,其中 day 是和(key, day)之间的每个时间戳值(指向午夜)。start dateend date
  2. 使用 CoGroupByKey 合并数据集。

作为我使用org.apache.flink.api.java.tuple.Tuple2来自Tuple2Coderrepo github.com/orian/tuple-coder的密钥类型。

问题

如果 Records 数据集很小,比如 5 天,那么一切似乎都很好(检查 normal_run.log)。

INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0

当我运行管道超过 10 天时,我遇到一个错误,指出某些记录没有配置 (wrong_run.log)。

INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0

然后我添加了一些额外的日志消息:

(a.java:144) - 68643 items for KeyValue3 on: 1462665600000000
(a.java:140) - no items for KeyValue3 on: 1463184000000000
(a.java:123) - missing for KeyValue3 on: 1462924800000000
(a.java:142) - 753707 items for KeyValue3 on: 1462924800000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1462752000000000
(a.java:142) - 749901 items for KeyValue3 on: 1462752000000000 marked as no-loc
(a.java:144) - 754578 items for KeyValue3 on: 1462406400000000
(a.java:144) - 751574 items for KeyValue3 on: 1463011200000000
(a.java:123) - missing for KeyValue3 on: 1462665600000000
(a.java:142) - 754758 items for KeyValue3 on: 1462665600000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1463184000000000
(a.java:142) - 694372 items for KeyValue3 on: 1463184000000000 marked as no-loc

您可以发现在第一行中,针对 KeyValue3 和时间 1462665600000000 处理了 68643 个项目。
稍后在第 9 行中,该操作似乎再次处理了相同的键,但它报告这些记录没有可用的配置。
第 10 行通知它们已被标记为 no-loc。

第 2 行表示 KeyValue3 和时间 1463184000000000 没有项目,但在第 11 行中,您可以读到此 (key,day) 对的项目是稍后处理的,并且它们缺少配置。

一些线索

在其中一次探索运行期间,我遇到了一个异常(exception_throw.log)。

05/26/2016 03:49:49 GroupReduce (GroupReduce at GroupByKey)(1/5) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
  at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
  ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  at org.apache.flink.runtime.operators.sort.LargeRecordHandler.finishWriteAndSortKeys(LargeRecordHandler.java:263)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1409)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IllegalAccessError: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:122)
  at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:297)
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
  at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:706)
  at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
  at org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

解决方法(经过更多测试,不起作用,继续使用 Tuple2)

我已经从使用 Tuple2 切换到了 Protocol Buffer 消息:

message KeyDay {
  optional ByteString key = 1;
  optional int64 timestamp_usec = 2;
}

但是使用Tuple2.of()起来比:KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().

当切换到一个键是从 protobuf.Message 派生的类时,问题消失了 10-15 天(因此数据大小是 Tuple2 的问题),但是将数据大小增加到 20 天表明它就在那里。

4

0 回答 0