我已经使用 ElixirFlow
有一段时间了,最近我尝试使用Flow
和Repo.stream
使用并行化我的工作流程:
endless_db_stream = MyRepo.stream(some_query)
MyRepo.transaction(fn ->
endless_db_stream
|> Flow.from_enumerable()
|> Flow.each(&process(&1))
|> Flow.run
end)
但它只是不起作用。现在我做了一些研究,偶然发现了 Jose Valim的这条评论Repo.stream
,他说基本上不兼容,GenStage
我相信它也不兼容Flow
(因为它建立在 之上GenStage
)。
我的问题是,以前有没有人使用 PSQL 作为无界数据源Flow
?
P/S:在上面的同一个 GitHub 线程中,有一个“hack”使用 aGenStage
来包装Repo.stream
then 充当生产者,但我正在寻找一种更简化的方法,因为我打算使用Flow
而不是GenStage