我有一个 Cassandra 数据库,它分布在多个节点上。当用 Pig 查询它时,由 Pig 创建的 mapreduce 作业在 hadoop 节点上“崩溃”,但有以下异常:
2013-03-18 00:57:19,374 WARN org.apache.hadoop.mapred.Child:运行孩子时出错 java.lang.RuntimeException:org.apache.thrift.TException:超出消息长度:674 在 org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:384) 在 org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:390) 在 org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.computeNext(ColumnFamilyRecordReader.java:313) 在 com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) 在 com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) 在 org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getProgress(ColumnFamilyRecordReader.java:103) 在 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.getProgress(PigRecordReader.java:169) 在 org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.getProgress(MapTask.java:514) 在 org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:539) 在 org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67) 在 org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143) 在 org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 在 org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) 在 org.apache.hadoop.mapred.Child$4.run(Child.java:255) 在 java.security.AccessController.doPrivileged(本机方法) 在 javax.security.auth.Subject.doAs(Subject.java:396) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) 在 org.apache.hadoop.mapred.Child.main(Child.java:249) 引起:org.apache.thrift.TException:超出消息长度:674,readLength:192 在 org.apache.thrift.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:393) 在 org.apache.thrift.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363) 在 org.apache.cassandra.thrift.Column.read(Column.java:535) 在 org.apache.cassandra.thrift.ColumnOrSuperColumn.read(ColumnOrSuperColumn.java:507) 在 org.apache.cassandra.thrift.KeySlice.read(KeySlice.java:408) 在 org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:12905) 在 org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) 在 org.apache.cassandra.thrift.Cassandra$Client.recv_get_range_slices(Cassandra.java:734) 在 org.apache.cassandra.thrift.Cassandra$Client.get_range_slices(Cassandra.java:718) 在 org.apache.cassandra.hadoop.ColumnFamilyRecordReader$StaticRowIterator.maybeInit(ColumnFamilyRecordReader.java:346) ... 17 更多
最突出的是org.apache.thrift.TException: Message length exceeded: 674
。每次启动 Pig 查询时,异常中吐出的消息长度都会有所不同。从任务在 hadoop 节点上初始化的那一刻起,它需要不到一秒钟的时间来触发异常。
Cassandra 填充了大约 1GB 的数据。用于导致此异常的 Pig 查询如下:
rows = LOAD 'cassandra://[keyspace here]/[cf here]' USING org.apache.cassandra.hadoop.pig.CassandraStorage() AS([column definitions here]); testvals = foreach 行生成 mycolumn.$1; 名称 = 限制 testvals 57343; 转储名称;
为什么你问 57343 限制?任何低于 57343 的数字都会让 Pig 作业成功完成,任何 >= 57343 的数字都会让它崩溃。Cassandra 附带的 Pig 示例也因相同的异常而退出。此外,在 Cassandra 中使用较小的数据集可以让 Pig 成功完成工作。
我发现了一些类似的错误,Thrift 抱怨消息长度,但通常这是超过 cassandra.yaml 中指定的最大消息长度时。在这种情况下,cassandra.yaml 中的消息长度设置为 64MB 以测试它是否有帮助,但仍然发生相同的异常。此外,异常指出消息的长度太长,即使在异常中声明消息本身在这种情况下只有 674 个字节!
我尝试了什么:
- 增加cassandra.yaml中的
thrift_max_message_length_in_mb
和 值thrift_framed_transport_size_in_mb
- 重建 Thrift jar
- 删除 Cassandra 键空间,然后重新填充它
设置:
- Hadoop 1.0.4
- 卡桑德拉 1.2.2
- 猪 0.11.0
TL;DR
Pig + Cassandra 在更大的数据集上崩溃 ( org.apache.thrift.TException: Message length exceeded: 674
)。较小的数据集或较大数据集的较小子集可以正常工作。
编辑 Cassandra 日志显示没有错误。它按照作业的要求提供切片,并且在 Cassandra 执行此操作时作业终止。