我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否在做他们应该做的事情。
我希望能够检索运行拓扑后产生的所有值并将它们放入列表中,以便我可以“看到”它们并检查它们的条件。
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
我正在运行拓扑TrackedTopology
(从另一个 SO 问题中获取代码,感谢@brianghig的提问和@Thomas Kielbus的回复)
这就是我“启动”拓扑以及将样本值输入其中的方式:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
当我这样做时,我可以在日志消息中看到拓扑运行正确,并且值计算正确,但我想将结果“钓鱼”到一个List
(或任何结构,此时)所以我实际上可以Asserts
在我的测试中放一些。
我一直在尝试 [as**ton] 不同的方法,但它们都不起作用。
最新的想法是在聚合之后添加一个螺栓,以便它将我的值“持久化”到一个列表中:
下面你会看到这个类试图遍历由 发出的所有元组,aggregate
并将它们放在我之前初始化的列表中:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
this.results = results;
}
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector) {
for (TridentTuple tuple : tuples) {
results.add((AggregatedFoo) tuple.getValue(0));
}
}
}
所以现在代码看起来像:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results={}", results);
但什么都没有......日志显示Done. Checkpoint results=[]
(空列表)
有没有办法得到它?我想它一定是可行的,但我一直无法找到办法......
任何提示或链接到页面或任何类似的东西都将不胜感激。先感谢您。