1

我正在尝试使用 fs2 流 0.10.0-M9 和 doobie 版本 0.5.0-M9 从 http 调用中获取一系列对象,然后将其插入到 postgres 数据库中,但我在构建此代码时遇到问题,得到以下错误:

错误:(49, 12) 无法证明 Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]。.join(100)

我想要做的是在对 Web 服务的调用返回后同时运行插入语句。这是代码:

fetchProducts(date).map{items  =>
        items.map( i =>
          Stream.eval(upsertProductIO(i).transact(xa))
        )
      }
      .join(100)
      .run
      .unsafeRunSync()

//rest call
def fetchProducts(changeDate: String): Stream[IO, Seq[Product]] = {
//rest call here
}

//DAO code
def upsertProductIO(p: Product): ConnectionIO[Int] = {
  upsertProduct(p).run
}
4

1 回答 1

2

问题是你有一个Seq[Stream[IO, Seq[Product]],你想要的,而是一个Stream[IO, Seq[Product]],你可以用一个Foldable类型类实例来做:

import cats.implicits._
import scala.concurrent.ExecutionContext.Implicits.global

fetchProducts(date).map { items =>
   items.map(i => Stream.eval(upsertProductIO(i).transact(xa)))
}.map(seqOfStream.toList.sequence)
 .join(100)
 .run
 .unsafeRunSync()
于 2018-01-04T16:26:04.437 回答