我们的 Apache Storm 拓扑使用 KafkaSpout 监听来自 Kafka 的消息,并在进行大量映射/减少/丰富/聚合等操作后,最终将数据插入 Cassandra。还有另一个 kafka 输入,如果拓扑找到响应,我们会在其中接收用户对数据的查询,然后将其发送到第三个 kafka 主题。现在我们想使用 Junit 编写 E2E 测试,其中我们可以直接以编程方式将数据插入拓扑,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的。
为了实现这一点,我们考虑启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一的 Junit 测试的上下文中启动拓扑。
在开始我们的实际测试之前,我们创建拓扑并将其提交到 LocalCluster。它在不同的线程上启动拓扑,然后从 Before 中出来并开始执行我们的测试。在那之前,拓扑还没有准备好,因为它需要一些时间来准备好处理。是否有任何 java API 可以告诉我们拓扑何时准备好进行处理(意味着准备好从 Spout 读取第一条消息)?