1

我已经使用 ElixirFlow有一段时间了,最​​近我尝试使用FlowRepo.stream使用并行化我的工作流程:

endless_db_stream = MyRepo.stream(some_query)
MyRepo.transaction(fn ->
  endless_db_stream
  |> Flow.from_enumerable()
  |> Flow.each(&process(&1))
  |> Flow.run
end)

但它只是不起作用。现在我做了一些研究,偶然发现了 Jose Valim的这条评论Repo.stream,他说基本上不兼容,GenStage我相信它也不兼容Flow(因为它建立在 之上GenStage)。

我的问题是,以前有没有人使用 PSQL 作为无界数据源Flow

P/S:在上面的同一个 GitHub 线程中,有一个“hack”使用 aGenStage来包装Repo.streamthen 充当生产者,但我正在寻找一种更简化的方法,因为我打算使用Flow而不是GenStage

4

1 回答 1

2

在我们的项目中有两种实现它的方法。

  1. 更简单的方法:使用资源 ID 作为数据源。

    您可以先列出所有资源 id,然后在流中单独获取每个资源。

  2. 正常方式:用于Stream.resource/3自定义数据源。

    您还可以通过创建流Stream.resource/3,使用分页查询一次获取一些资源。

在此处查看更多详细信息,如何使用 Stream.resource/3 Awesomeness 在 Elixir 中轻松构建 Streams

于 2018-10-17T08:02:26.560 回答