0

我是并行编程和 ZIO 的新手,我正在尝试通过并行请求从 API 获取数据。

import sttp.client._
import zio.{Task, ZIO}


ZIO.foreach(files) { file =>
    getData(file)
    Task(file.getName)
  }


def getData(file: File) = {

  val data: String = readData(file)
  val request = basicRequest.body(data).post(uri"$url")
      .headers(content -> "text", char -> "utf-8")
      .response(asString)

  implicit val backend: SttpBackend[Identity, Nothing, NothingT] = HttpURLConnectionBackend()
  request.send().body

  resquest.Response match {
    case Success(value) => {
        val src = new PrintWriter(new File(filename))
        src.write(value.toString)
        src.close()
      }
    case Failure(exception) => log error
  }

当我按顺序执行程序时,如果我尝试并行运行,它会按预期工作,方法是更改ZIO.foreach​​为ZIO.foreachPar. 该程序过早终止,我明白了,我在这里缺少一些基本的东西,任何帮助都可以帮助我解决问题。

4

2 回答 2

2

一般来说,我不建议将同步阻塞代码与异步非阻塞代码混合,这是 ZIO 的主要角色。可以这么说,关于如何有效地将 ZIO 与“世界”一起使用,有一些很棒的演讲。

我要提出两个关键点,一是 ZIO 让您可以通过附加分配和完成步骤来有效地管理资源,二是“效果”我们可以说是“实际与世界交互的事物”应该尽可能地包含在最紧凑的范围内* .

所以让我们稍微看一下这个例子,首先,我不建议使用默认Identity的后端后端ZIO,我建议使用AsyncHttpClientZioBackend

import sttp.client._
import zio.{Task, ZIO}
import zio.blocking.effectBlocking
import sttp.client.asynchttpclient.zio.AsyncHttpClientZioBackend

// Extract the common elements of the request
val baseRequest = basicRequest.post(uri"$url")
      .headers(content -> "text", char -> "utf-8")
      .response(asString)

// Produces a writer which is wrapped in a `Managed` allowing it to be properly
// closed after being used
def managedWriter(filename: String): Managed[IOException, PrintWriter] =
  ZManaged.fromAutoCloseable(UIO(new PrintWriter(new File(filename))))


// This returns an effect which produces an `SttpBackend`, thus we flatMap over it
// to extract the backend.
val program = AsyncHttpClientZioBackend().flatMap { implicit backend => 
  ZIO.foreachPar(files) { file =>
    for {
      // Wrap the synchronous reading of data in a `Task`, but which allows runs this effect on a "blocking" threadpool instead of blocking the main one.
      data <- effectBlocking(readData(file))
      // `send` will return a `Task` because it is using the implicit backend in scope
      resp <- baseRequest.body(data).send()
      // Build the managed writer, then "use" it to produce an effect, at the end of `use` it will automatically close the writer.
      _    <- managedWriter("").use(w => Task(w.write(resp.body.toString))) 
    } yield ()
  }
}

此时,您将只program需要使用其中一种unsafe方法运行,或者如果您使用的是zio.App通过main方法。

*并非总是可行或方便,但它很有用,因为它通过将任务返回给运行时进行调度来防止资源占用。

于 2020-07-07T13:05:58.040 回答
0

当你使用像 ZIO 这样的纯函数式 IO 库时,你不能调用任何副作用函数(比如getData),除非调用像Task.effector这样的工厂方法Task.apply

ZIO.foreach(files) { file =>
  Task {
    getData(file)
    file.getName
  }
}
于 2020-07-05T19:49:06.000 回答