2

我有一些需要测试的异步(ZIO)代码。如果我使用Thread.sleep()它创建一个测试部分工作正常,我总是得到响应:

for {
 saved <- database.save(smth)
 result <- eventually {
   Thread.sleep(20000)
   database.search(...) 
 }
} yield result

但是,如果我使用timeoutinterval从那时起使用相同的逻辑,eventually它就永远无法正常工作(我有超时):

for {
   saved <- database.save(smth)
   result <- eventually(timeout(Span(20, Seconds)), interval(Span(20, Seconds))) {
     database.search(...) 
   }
} yield result

我不明白为什么timeoutinterval工作方式不同Thread.sleep。它应该做完全相同的事情。有人可以向我解释一下并告诉我应该如何将这段代码更改为不需要使用Thread.sleep()吗?

4

2 回答 2

3

假设database.search(...)返回ZIO[]对象。

eventually{database.search(...)}很可能在第一次尝试后立即成功。

它成功创建了一个查询数据库的任务。然后在没有任何重试逻辑的情况下查询数据库。

关于如何使其工作:

val search: ZIO[Any, Throwable, String] = ???
val retried: ZIO[Any with Clock, Throwable, Option[String]] = search.retry(Schedule.spaced(Duration.fromMillis(1000))).timeout(Duration.fromMillis(20000))

像这样的东西应该工作。但我相信存在更优雅的解决方案。

于 2020-11-25T13:17:05.333 回答
2

@simpadjo 的另一个答案非常简洁地解决了“什么”问题。我将添加一些额外的上下文来说明您为什么会看到这种行为。

for {
  saved <- database.save(smth)
  result <- eventually {
    Thread.sleep(20000)
    database.search(...) 
  }
} yield result

这里混合了三种不同的技术,这引起了一些混乱。

首先是ZIO,它是一个异步编程库,它使用自己的自定义运行时和执行模型来执行任务。第二个eventually来自ScalaTest,通过有效地轮询值的状态来检查异步计算很有用。第三,有Thread.sleep一个 Java api 可以暂停当前线程并阻止任务继续进行,直到计时器到期。

eventually使用一个简单的重试机制,根据您使用的是普通值还是Future来自 scala 标准库的值而有所不同。基本上它运行块中的代码,如果它抛出然后它休眠当前线程,然后根据一些间隔配置重试它,最终超时。值得注意的是,在这种情况下,行为是完全同步的,这意味着只要 中的值{}不引发异常,它就不会继续重试。

Thread.sleep是一个重量级的操作,在这种情况下,它有效地阻止了正在传递的函数eventually进行 20 秒。这意味着在database.search调用时操作可能已经完成。

第二个变体不同,它eventually立即执行块中的代码,如果它抛出异常,那么它将根据您提供的间隔/超时逻辑再​​次尝试。在这种情况下,保存可能尚未完成(如果最终一致,则可能尚未传播)。因为您正在返回一个ZIO设计为抛出并且eventually不理解ZIO它只会返回search没有重试逻辑的尝试。

接受的答案:

val retried: ZIO[Any with Clock, Throwable, Option[String]] = search.retry(Schedule.spaced(Duration.fromMillis(1000))).timeout(Duration.fromMillis(20000))

之所以有效,是因为retryandtimeout正在使用内置ZIO运算符,这些运算符确实了解如何实际retrytimeouta ZIO. 这意味着如果搜索失败,retry将处理它直到它成功。

于 2020-12-01T03:13:37.940 回答