0

我有一个这样的测试场景:

  1. 将一些记录插入数据库
  2. 等待这些记录被 Kafka 处理
  3. 验证这些记录是否存在于 Kafka 主题中

第 2 步有时需要 5 秒。有时需要 2 分钟。
第一步返回一些我想在第 2 步(Kafka 主题)中查找的 ID。我想要实现的是检查这些 ID(从第一步开始)是否在给定的超时(例如 20 秒)内在 Kafka 中处理并继续执行其他步骤,例如:
从 Kafka 获取所有消息并检查这些 ID 是否在那里,如果没有,那么再过一个例如 20 秒。获取这些消息并检查 ID 是否在新获取的消息中。如果找到所有 ID,则返回这些消息的列表(在达到超时之前)或在达到超时之后 - 返回已获取消息的列表。

希望我解释得很好,因为它可能有点棘手,我想知道是否有可能完成它。

到目前为止,我只知道这样的事情。

    def fetch_all_kafka_records_from_topic(self, items_to_verify: List, timeout: int = 30) -> List:
        counter = 0
        result = []
        while counter <= timeout:
            res = self._get_all_kafka_KM_values() # at the very bottom  it's super(AvroConsumer,self).poll(timeout) from confluent_kafka
            result.extend(res)
            if all(item in res for item in items_to_verify):
                break
            time.sleep(1)
            counter += 1
        return result

有什么更好的方法吗?也许有线程,装饰器?我只是不知道如何检查列表中是否存在某些内容-如果是,则返回某些内容,如果不存在-再试一次,依此类推……直到达到超时。

4

0 回答 0