3

我有 2 个期货(对 db 表执行 2 个操作),我希望在保存修改之前检查两个期货是否已成功完成。

现在,我在第一个未来(作为依赖项)中开始第二个未来,但我知道这不是最好的选择。我知道我可以使用for-comprehension 并行执行两个期货,但即使一个失败,另一个也会被执行(尚未测试)

firstFuture.dropColumn(tableName) match {
  case Success(_) => secondFuture.deleteEntity(entity)
  case Failure(e) => throw new Exception(e.getMessage)
}

// the  first future alters a table, drops a column
// the second future deletes a row from another table

在这种情况下,如果第一个未来成功执行,第二个可能会失败。我想恢复第一个未来的更新。我听说过 SQL 事务,似乎是这样的,但是如何呢?

val futuresResult = for {
  first <- firstFuture.dropColumn(tableName)
  second <- secondFuture.deleteEntity(entity)
} yield (first, second)

在我的情况下, A -for理解要好得多,因为我在这两个期货之间没有依赖关系并且可以并行执行,但这并不能解决我的问题,结果可以是 (success, success) 或 (failed, success)例子。

4

1 回答 1

9

关于Future顺序运行与并行运行:

这有点棘手,因为 ScalaFuture被设计为渴望。各种 Scala 库中还有一些其他构造可以处理同步和异步效果,例如cat 、IOMonix等,它们是以惰性方式设计的,它们没有这种行为。TaskZIO

急切的事情Future是它会尽快开始计算。这里的“开始”意味着将它安排在一个ExecutionContext明确选择或隐含存在的地方。虽然从技术上讲,如果调度程序决定这样做,执行可能会暂停一点,但它很可能几乎是立即开始的。

所以如果你有一个 type 的值Future,它就会开始运行。如果你有一个惰性类型的值Future,或者一个返回类型值的函数/方法Future,那么它不是。

但是,即使您拥有的只是简单的值(没有惰性 val 或 defs),如果Future定义是在 for-comprehension 中完成的,那么这意味着它是 monadic flatMap 链的一部分(如果您不理解这一点,请忽略它)现在),它将按顺序运行,而不是并行运行。为什么?这不是特定于Futures 的;每个 for-comprehension 都具有作为顺序链的语义,您可以在其中将上一步的结果传递给下一步。因此,如果它依赖于步骤 n 中的某些内容,那么您不能在步骤 n + 1 中运行某些内容是合乎逻辑

这里有一些代码来证明这一点。

val program = for {
  _ <- Future { Thread.sleep(5000); println("f1") }
  _ <- Future { Thread.sleep(5000); println("f2") }
} yield ()

Await.result(program, Duration.Inf)

该程序将等待五秒钟,然后打印“f1”,然后再等待五秒钟,然后打印“f2”。

现在让我们来看看这个:

val f1 = Future { Thread.sleep(5000); println("f1") }
val f2 = Future { Thread.sleep(5000); println("f2") }

val program = for {
  _ <- f1
  _ <- f2
} yield ()

Await.result(program, Duration.Inf)

但是,该程序将在五秒钟后同时打印“f1”和“f2”。

请注意,在第二种情况下并没有真正违反序列语义。f2仍然有机会使用 的结果f1。但是f2没有使用f1;的结果 它是一个可以立即计算的独立值(用 a 定义val)。因此,如果我们更改val f2为一个函数,例如def f2(number: Int),那么执行会更改:

val f1 = Future { Thread.sleep(5000); println("f1"); 42 }
def f2(number: Int) = Future { Thread.sleep(5000); println(number) }

val program = for {
  number <- f1
  _ <- f2(number)
} yield ()

如您所料,这将在五秒后打印“f1”,然后另一个才会Future启动,因此它将在再过五秒后打印“42”。

关于交易:

正如评论中提到的@cbley,这听起来像是您想要数据库事务。例如,在 SQL 数据库中,这具有非常特定的含义,它确保了ACID 属性

如果这是你需要的,你需要在数据库层解决它。Future太笼统了;它只是一种模拟同步和异步计算的效果类型。当您看到一个Future值时,仅通过查看类型,您无法判断它是数据库调用的结果,还是某个 HTTP 调用的结果。

例如,doobie将每个数据库查询描述为一种ConnectionIO类型。您可以将多个查询排成一列以供理解,就像您使用以下内容一样Future

val program = for {
  a <- database.getA()
  _ <- database.write("foo")
  b <- database.getB()
} yield {
  // use a and b
}

但与我们之前的示例不同,这里getA()andgetB()不返回 type 的值Future[A],而是ConnectionIO[A]。很酷的是,doobie 完全处理了您可能希望这些查询在单个事务中运行的事实,因此如果getB()失败,“foo”将不会提交到数据库。

因此,在这种情况下,您要做的是获取查询集的完整描述,将其包装program为 type的单个值ConnectionIO,一旦您想要实际运行事务,您将执行类似的操作program.transact(myTransactor),其中myTransactor的实例在哪里Transactor,一个知道如何连接到您的物理数据库的doobie 构造。

一旦你进行交易,你ConnectionIO[A]的就变成了Future[A]. 如果事务失败,您将有一个 failed Future,并且不会真正向您的数据库提交任何内容。

如果您的数据库操作彼此独立并且可以并行运行,doobie 也将允许您这样做。通过 doobie 按顺序和并行提交事务在 docs 中有很好的解释。

于 2020-06-17T08:40:46.987 回答