我的 Genserver 消费者看起来像这样:
defmodule Consumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :ok, subscribe_to: [Broadcaster]}
end
def handle_events(documents, _from, state) do
for document <- documents do
Task.async(fn -> Processor.process_document(document) end)
end
{:noreply, [], state}
end
def handle_info({_reference, {_int_state, _msg}}, state) do
{:noreply, [], state}
end
def handle_info({:DOWN, _reference, :process, pid, int_state}, state) do
{:stop, int_state, state}
end
end
如果前一台机器很忙,例如等待任务完成,我想在多台机器上“拆分”Processor
任务(即 中的任务)。handle_events/3
我该怎么做呢?
我已经阅读了关于分布式任务的指南,我只是不确定如何设计它,以便 1. 它在机器之间拆分,以及 2. 它知道何时拆分,即,当它很忙时:
task = Task.Supervisor.async {Something, :"server@other-machine"}, fn ->
{:ok, node()}
end
我知道这涉及到一些这样的安排,但我不知道从哪里开始。有人对这种事情有任何经验吗?
更新
我需要这个的原因Processor.process_document(document)
是大约需要 30 秒才能完成,并且一次只能运行一个。添加能够承担一半工作量的第二个节点基本上可以将处理时间减半。