我有一个这样的测试场景:
- 将一些记录插入数据库
- 等待这些记录被 Kafka 处理
- 验证这些记录是否存在于 Kafka 主题中
第 2 步有时需要 5 秒。有时需要 2 分钟。
第一步返回一些我想在第 2 步(Kafka 主题)中查找的 ID。我想要实现的是检查这些 ID(从第一步开始)是否在给定的超时(例如 20 秒)内在 Kafka 中处理并继续执行其他步骤,例如:
从 Kafka 获取所有消息并检查这些 ID 是否在那里,如果没有,那么再过一个例如 20 秒。获取这些消息并检查 ID 是否在新获取的消息中。如果找到所有 ID,则返回这些消息的列表(在达到超时之前)或在达到超时之后 - 返回已获取消息的列表。
希望我解释得很好,因为它可能有点棘手,我想知道是否有可能完成它。
到目前为止,我只知道这样的事情。
def fetch_all_kafka_records_from_topic(self, items_to_verify: List, timeout: int = 30) -> List:
counter = 0
result = []
while counter <= timeout:
res = self._get_all_kafka_KM_values() # at the very bottom it's super(AvroConsumer,self).poll(timeout) from confluent_kafka
result.extend(res)
if all(item in res for item in items_to_verify):
break
time.sleep(1)
counter += 1
return result
有什么更好的方法吗?也许有线程,装饰器?我只是不知道如何检查列表中是否存在某些内容-如果是,则返回某些内容,如果不存在-再试一次,依此类推……直到达到超时。