我正在尝试创建一些测试来验证通过 Apache Storm 拓扑的数据(使用 Trident API)
我创建了这个简单的过滤器来访问回调:
public class CallbackFilter extends BaseFilter {
private final TupleCallback callback;
public CallbackFilter(TupleCallback callback) {
this.callback = callback;
}
@Override
public boolean isKeep(TridentTuple tuple) {
if (callback != null) {
callback.callback(tuple);
}
return true;
}
public interface TupleCallback extends Serializable{
void callback(TridentTuple tuple);
}
}
如果我尝试这个,我会得到一个运行时异常,说 CountdownLatch 不可序列化:
@Test
public void testState() throws Exception {
CountDownLatch latch = new CountDownLatch(4);
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = ...
TridentState state = ...
// problematic code:
CallbackFilter.TupleCallback callback = (CallbackFilter.TupleCallback & Serializable) tuple -> {
System.out.println("tuple = " + tuple);
latch.countDown(); //latch is not serializable - exception!
};
CallbackFilter latchFilter = new CallbackFilter(callback);
tridentTopology.stuff()
.each(new Fields("foo", "bar"), latchFilter);
...
因此,Storm 似乎正在序列化拓扑的所有组件,然后以序列化的形式提交它们,可能是为了集群或诸如此类。
有什么方法可以从 Storm 回调到调用测试?也许某种不序列化拓扑的测试模式?从测试的角度来看,很难看到拓扑内部发生了什么,尤其是在拓扑的每个阶段。
更新:
即使做这样的事情也行不通!
List<TridentTuple> tupleList = new ArrayList<>();
CallbackFilter.TupleCallback callback = (CallbackFilter.TupleCallback & Serializable) tuple -> {
tupleList.add(tuple);
};
我看到在调试器中添加了 tupleList,但在测试空间中,列表保持为零。就像拓扑在自己的 JVM 中运行一样。