0

假设我有字段列表,即 {field1,field2,field3,field4} 我对 field2 执行了一些操作,说我想将每个元组值增加某个值,比如 5,

performed this operation in a function which gave me modified field with "M_field2" as out field name now i want to write complete tuple in a file but in place of field2 i want "M_field2". How i will achieve this.
4

2 回答 2

1

从它说的三叉戟API页面

一个函数接受一组输入字段并发出零个或多个元组作为输出。输出元组的字段附加到流中的原始输入元组。如果一个函数没有发出元组,则过滤掉原始输入元组。否则,输入元组对每个输出元组重复

现在从trident 教程页面中挖掘更多内容,发现这个
使用分组流,输出将包含分组字段,然后是聚合器发出的字段。例如:

    stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

在此示例中,输出将包含字段"val1" and "sum"

我不确定,但我能想到的最接近的是做类似的事情

    stream.groupBy(new Fields("field1","field3","field4"))
     .aggregate(new Fields("field2"), new Sum(), new Fields("M_field2"))

可能会实现您正在寻找的东西。如果我错了,请纠正我。

于 2013-11-14T14:22:08.313 回答
1

我解决了这个问题.. 使用三叉戟只是你必须在输入字段列表中使用修改后的字段名称。例如 :-

topology.newStream("dummySpout",new DummySpout()).stateQuery(tridentState, new QueryFunctionClass(), new Fields("outLpi","outFileId"))
.each(new Fields("outLpi"),new DBReaderFunction((ArrayList<String>)conf.get("listOfFields")), new Fields((ArrayList<String>)conf.get("listOfFields")))
.each(new Fields((ArrayList<String>)conf.get("listOfFields")), new LoggerFilter())
.aggregate(new Fields("SAL"), new ApplyAggregator(),new Fields("sum"))
.each(new Fields("sum","SAL"),new LoggerFilter());

最后一行中的“sum”是修改后的字段,SAL 是原始字段。

于 2013-11-29T11:21:00.423 回答