在“大数据”Lambda 架构书之后,我有一个输入目录,里面充满了类型化的 Thift 数据对象,其中包含一个 DataPailStructure 定义的 pail.meta 文件
我拍摄了这些数据的快照:
Pail snapshotPail = newDataPail.snapshot(PailFactory.snapshot);
传入的文件和元数据文件是重复的,pail.meta文件也有
structure: DataPailStructure
现在我想切碎这些数据,将其分成垂直分区。与本书一样,我创建了两个 PailTap 对象,一个用于 Snapshot 和 SplitDataStructure,一个用于新的Shredded文件夹。
PailTap source = dataTap(PailFactory.snapshot);
PailTap sink = splitDataTap(PailFactory.shredded);
/Shredded文件夹有一个 pail.meta 文件,其中包含structure: SplitDataPailStructure
按照说明,我执行 JCascalog 查询以强制减速器:
Api.execute(sink, new Subquery(data).predicate(reduced, empty, data));
现在,在本地模式下,这工作正常。在 /Shredded 下创建了一个“临时”子文件夹,它以预期的“1/1”结构垂直分区。在本地模式下,然后将其移至 /Shredded 文件夹,我可以毫无问题地合并并合并到 master。
但是在 Hadoop 内部运行,此时它失败了,并出现错误:
cascading.tuple.TupleException: unable to sink into output identifier: /tmp/swa/shredded
...
Caused by: java.lang.IllegalArgumentException: 1/1/part-000000 is not valid with the pail structure {structure=com.hibu.pail.SplitDataPailStructure, args={}, format=SequenceFile} --> [1, _temporary, attempt_1393854491571_12900_r_000000_1, 1, 1] at com.backtype.hadoop.pail.Pail.checkValidStructure(Pail.java:563)
不用说,如果我将 Shredded Sink 结构类型更改为 DataPailStructure,那么它可以正常工作,但这是一个相当没有意义的操作,因为一切都在 Incoming 文件夹中。现在没关系,因为我只使用一种数据类型,但这很快就会改变,我需要那个分区。
有任何想法吗?我最初不想在这里发布我所有的源代码,但我几乎可以肯定错过了一些东西。