-1

我们的 Apache Storm 拓扑使用 KafkaSpout 监听来自 Kafka 的消息,并在进行大量映射/减少/丰富/聚合等操作后,最终将数据插入 Cassandra。还有另一个 kafka 输入,如果拓扑找到响应,我们会在其中接收用户对数据的查询,然后将其发送到第三个 kafka 主题。现在我们想使用 Junit 编写 E2E 测试,其中我们可以直接以编程方式将数据插入拓扑,然后通过插入用户查询消息,我们可以在第三点断言我们的查询收到的响应是正确的。

为了实现这一点,我们考虑启动 EmbeddedKafka 和 CassandraUnit,然后用它们替换实际的 Kafka 和 Cassandra,然后我们可以在这个单一的 Junit 测试的上下文中启动拓扑。

在开始我们的实际测试之前,我们创建拓扑并将其提交到 LocalCluster。它在不同的线程上启动拓扑,然后从 Before 中出来并开始执行我们的测试。在那之前,拓扑还没有准备好,因为它需要一些时间来准备好处理。是否有任何 java API 可以告诉我们拓扑何时准备好进行处理(意味着准备好从 Spout 读取第一条消息)?

4

1 回答 1

2

这取决于您说“准备处理”时的意思。

如果您为 LocalCluster 启用时间模拟,则可以使用Time.advanceClusterTime以逐步推进时间。如果在提交拓扑后调用此方法,它只会在集群大部分空闲时返回。参见例如https://github.com/apache/storm/blob/8f49e06998abb4dfc50f51d78b6784ebd04844fb/storm-core/test/jvm/org/apache/storm/integration/TopologyIntegrationTest.java#L233

如果您愿意用存根(例如 FixedTupleSpout)替换您的 spout,您可以使用Testing.completeTopology等到拓扑完成处理您设置存根发出的所有元组。

等待拓扑处理一些元组的另一种方法是将一些消息放入 Kafka,启动拓扑,然后让测试线程轮询 Cassandra 以查看您期望的消息是否已通过。这样,您可以在测试线程中设置超时,如果在几秒内不满足条件,则测试失败。您可以为此https://github.com/awaitility/awaitility使用像 Awaitility 这样的实用程序,或者只编写自己的轮询逻辑。

如果您通过“准备处理”表示其他意思,请更详细地描述您的意思。

于 2019-07-24T14:01:17.210 回答