1

我的 Genserver 消费者看起来像这样:

defmodule Consumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :ok, subscribe_to: [Broadcaster]}
  end

  def handle_events(documents, _from, state) do
    for document <- documents do
      Task.async(fn -> Processor.process_document(document) end)
    end
    {:noreply, [], state}
  end

  def handle_info({_reference, {_int_state, _msg}}, state) do
    {:noreply, [], state}
  end

  def handle_info({:DOWN, _reference, :process, pid, int_state}, state) do
    {:stop, int_state, state}
  end
end

如果前一台机器很忙,例如等待任务完成,我想在多台机器上“拆分”Processor任务(即 中的任务)。handle_events/3我该怎么做呢?

我已经阅读了关于分布式任务的指南,我只是不确定如何设计它,以便 1. 它在机器之间拆分,以及 2. 它知道何时拆分,即,当它很忙时:

task = Task.Supervisor.async {Something, :"server@other-machine"}, fn ->
  {:ok, node()}
end

我知道这涉及到一些这样的安排,但我不知道从哪里开始。有人对这种事情有任何经验吗?

更新

我需要这个的原因Processor.process_document(document)是大约需要 30 秒才能完成,并且一次只能运行一个。添加能够承担一半工作量的第二个节点基本上可以将处理时间减半。

4

0 回答 0