我们的方法是使用可序列化工厂的构造函数注入到 spout/bolt 中。然后喷口/螺栓在其打开/准备方法中咨询工厂。工厂的唯一职责是以可序列化的方式封装获取 spout/bolt 的依赖项。这种设计允许我们的单元测试注入假/测试/模拟工厂,当被咨询时,它们会返回模拟服务。通过这种方式,我们可以使用模拟(例如 Mockito)对 spout/bolts 进行狭隘的单元测试。  
下面是一个螺栓的通用示例及其测试。我省略了工厂的实现,UserNotificationFactory因为它取决于您的应用程序。您可以使用服务定位器来获取服务、序列化配置、HDFS 可访问的配置,或者实际上任何方式来获取正确的服务,只要工厂可以在一个 serde 循环之后执行此操作。您应该涵盖该类的序列化。
螺栓
public class NotifyUserBolt extends BaseBasicBolt {
  public static final String NAME = "NotifyUser";
  private static final String USER_ID_FIELD_NAME = "userId";
  private final UserNotifierFactory factory;
  transient private UserNotifier notifier;
  public NotifyUserBolt(UserNotifierFactory factory) {
    checkNotNull(factory);
    this.factory = factory;
  }
  @Override
  public void prepare(Map stormConf, TopologyContext context) {
    notifier = factory.createUserNotifier();
  }
  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    // This check ensures that the time-dependency imposed by Storm has been observed
    checkState(notifier != null, "Unable to execute because user notifier is unavailable.  Was this bolt successfully prepared?");
    long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);
    notifier.notifyUser(userId);
    collector.emit(new Values(userId));
  }
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields(USER_ID_FIELD_NAME));
  }
}
测试
public class NotifyUserBoltTest {
  private NotifyUserBolt bolt;
  @Mock
  private TopologyContext topologyContext;
  @Mock
  private UserNotifier notifier;
  // This test implementation allows us to get the mock to the unit-under-test.
  private class TestFactory implements UserNotifierFactory {
    private final UserNotifier notifier;
    private TestFactory(UserNotifier notifier) {
      this.notifier = notifier;
    }
    @Override
    public UserNotifier createUserNotifier() {
      return notifier;
    }
  }
  @Before
  public void before() {
    MockitoAnnotations.initMocks(this);
    // The factory will return our mock `notifier`
    bolt = new NotifyUserBolt(new TestFactory(notifier));
    // Now the bolt is holding on to our mock and is under our control!
    bolt.prepare(new Config(), topologyContext);
  }
  @Test
  public void testExecute() {
    long userId = 24;
    Tuple tuple = mock(Tuple.class);
    when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
    BasicOutputCollector collector = mock(BasicOutputCollector.class);
    bolt.execute(tuple, collector);
    // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
    //  the call to execute, too.
    verify(notifier).notifyUser(userId);
    verify(collector).emit(new Values(userId));
  }
}