0

我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否在做他们应该做的事情。

我希望能够检索运行拓扑后产生的所有值并将它们放入列表中,以便我可以“看到”它们并检查它们的条件。

   FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
   TridentTopology topology = new TridentTopology();
   topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    // Soo... how do I retrieve the "aggregated_foos" from here?

我正在运行拓扑TrackedTopology(从另一个 SO 问题中获取代码,感谢@brianghig的提问和@Thomas Kielbus的回复)

这就是我“启动”拓扑以及将样本值输入其中的方式:

TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());

feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));

当我这样做时,我可以在日志消息中看到拓扑运行正确,并且值计算正确,但我想将结果“钓鱼”到一个List(或任何结构,此时)所以我实际上可以Asserts在我的测试中放一些。

我一直在尝试 [as**ton] 不同的方法,但它们都不起作用。

最新的想法是在聚合之后添加一个螺栓,以便它将我的值“持久化”到一个列表中:

下面你会看到这个类试图遍历由 发出的所有元组,aggregate并将它们放在我之前初始化的列表中:

class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> {
    final List<AggregatedFoo> results;

    public FieldFetcherStateUpdater(List<AggregatedFoo> results) {
        this.results = results;
    }

    @Override
    public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
                            TridentCollector collector) {
        for (TridentTuple tuple : tuples) {
            results.add((AggregatedFoo) tuple.getValue(0));
        }
    }
}

所以现在代码看起来像:

// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    .partitionPersist(new FieldFetcherFactory(),
                        new Fields("aggregated_foos"),
                        new FieldFetcherStateUpdater(results));

     LOGGER.info("Done. Checkpoint results={}", results);

但什么都没有......日志显示Done. Checkpoint results=[](空列表)

有没有办法得到它?我想它一定是可行的,但我一直无法找到办法......

任何提示或链接到页面或任何类似的东西都将不胜感激。先感谢您。

4

1 回答 1

0

您需要使用静态成员变量 result。如果您有多个并行任务正在运行(即parallelism_hint > 1),您还需要synchronizeresult.

在您的情况下,result将为空,因为 Storm 在内部创建了一个新的螺栓实例(包括一个新的实例ArrayList)。使用静态变量可确保您访问正确的对象(因为在所有螺栓实例中只有一个)。

于 2016-01-20T13:20:34.423 回答