我们的方法是使用可序列化工厂的构造函数注入到 spout/bolt 中。然后喷口/螺栓在其打开/准备方法中咨询工厂。工厂的唯一职责是以可序列化的方式封装获取 spout/bolt 的依赖项。这种设计允许我们的单元测试注入假/测试/模拟工厂,当被咨询时,它们会返回模拟服务。通过这种方式,我们可以使用模拟(例如 Mockito)对 spout/bolts 进行狭隘的单元测试。
下面是一个螺栓的通用示例及其测试。我省略了工厂的实现,UserNotificationFactory
因为它取决于您的应用程序。您可以使用服务定位器来获取服务、序列化配置、HDFS 可访问的配置,或者实际上任何方式来获取正确的服务,只要工厂可以在一个 serde 循环之后执行此操作。您应该涵盖该类的序列化。
螺栓
public class NotifyUserBolt extends BaseBasicBolt {
public static final String NAME = "NotifyUser";
private static final String USER_ID_FIELD_NAME = "userId";
private final UserNotifierFactory factory;
transient private UserNotifier notifier;
public NotifyUserBolt(UserNotifierFactory factory) {
checkNotNull(factory);
this.factory = factory;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
notifier = factory.createUserNotifier();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// This check ensures that the time-dependency imposed by Storm has been observed
checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?");
long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);
notifier.notifyUser(userId);
collector.emit(new Values(userId));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(USER_ID_FIELD_NAME));
}
}
测试
public class NotifyUserBoltTest {
private NotifyUserBolt bolt;
@Mock
private TopologyContext topologyContext;
@Mock
private UserNotifier notifier;
// This test implementation allows us to get the mock to the unit-under-test.
private class TestFactory implements UserNotifierFactory {
private final UserNotifier notifier;
private TestFactory(UserNotifier notifier) {
this.notifier = notifier;
}
@Override
public UserNotifier createUserNotifier() {
return notifier;
}
}
@Before
public void before() {
MockitoAnnotations.initMocks(this);
// The factory will return our mock `notifier`
bolt = new NotifyUserBolt(new TestFactory(notifier));
// Now the bolt is holding on to our mock and is under our control!
bolt.prepare(new Config(), topologyContext);
}
@Test
public void testExecute() {
long userId = 24;
Tuple tuple = mock(Tuple.class);
when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
BasicOutputCollector collector = mock(BasicOutputCollector.class);
bolt.execute(tuple, collector);
// Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
// the call to execute, too.
verify(notifier).notifyUser(userId);
verify(collector).emit(new Values(userId));
}
}