0

我正在尝试创建一些测试来验证通过 Apache Storm 拓扑的数据(使用 Trident API)

我创建了这个简单的过滤器来访问回调:

public class CallbackFilter extends BaseFilter {

    private final TupleCallback callback;

    public CallbackFilter(TupleCallback callback) {
        this.callback = callback;
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        if (callback != null) {
            callback.callback(tuple);
        }
        return true;
    }

    public interface TupleCallback extends Serializable{
        void callback(TridentTuple tuple);
    }
}

如果我尝试这个,我会得到一个运行时异常,说 CountdownLatch 不可序列化:

    @Test
    public void testState() throws Exception {
        CountDownLatch latch = new CountDownLatch(4);
        TridentTopology tridentTopology = new TridentTopology();
        FeederBatchSpout spout = ...
        TridentState state = ...
        // problematic code:
        CallbackFilter.TupleCallback callback = (CallbackFilter.TupleCallback & Serializable) tuple -> {
            System.out.println("tuple = " + tuple);
            latch.countDown();  //latch is not serializable - exception!
        };
        CallbackFilter latchFilter = new CallbackFilter(callback);
        tridentTopology.stuff()
          .each(new Fields("foo", "bar"), latchFilter);
        ...

因此,Storm 似乎正在序列化拓扑的所有组件,然后以序列化的形式提交它们,可能是为了集群或诸如此类。

有什么方法可以从 Storm 回调到调用测试?也许某种不序列化拓扑的测试模式?从测试的角度来看,很难看到拓扑内部发生了什么,尤其是在拓扑的每个阶段。

更新:

即使做这样的事情也行不通!

    List<TridentTuple> tupleList = new ArrayList<>();
    CallbackFilter.TupleCallback callback = (CallbackFilter.TupleCallback & Serializable) tuple -> {
        tupleList.add(tuple);
    };

我看到在调试器中添加了 tupleList,但在测试空间中,列表保持为零。就像拓扑在自己的 JVM 中运行一样。

4

0 回答 0