0

从我在这里阅读的内容:https ://hexdocs.pm/gen_stage/ConsumerSupervisor.html

ConsumerSupervisor 的所有实现只为每个工作单元启动一个子项(上面链接中的打印机模块)。如果孩子死了,有没有办法让孩子重新开始?

对我来说,如果孩子没有正常关机,它可以使用名称“ConsumerSupervisor”重新启动孩子。有没有人这样做过?

在我的实现中,我让消费者​​主管启动一个实际上是 GenServer 的孩子来执行工作,然后关闭它自己。如果它异常崩溃,我希望它重新启动。

想法..我考虑过只实现一个消费者,它调用一个动态监督器然后启动孩子,但这并不能解释反向压力..

这是我实现它的方式,但希望孩子在崩溃时重新启动:

 defmodule Client.Strategy.EventPushing.SendingConsumer do
  use ConsumerSupervisor

  def start_link() do
    ConsumerSupervisor.start_link(__MODULE__, :ok)
  end

  # Callbacks

  def init(:ok) do
    children = [
      worker(Client.DynamicClient, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
  end
end

旋转起来的孩子看起来像这样:

defmodule Client.DynamicClient do
  use GenServer
  require Logger

  # Public API's
  def start_link(event) do
    Client.Statix.timing("sending", 1)
    GenServer.start_link(
      __MODULE__,
      event,
      name: String.to_atom(event.sequence_number)
    )
  end

  def schedule_work() do
    send(self(), :start_work)
  end

  # Call Backs
  def init(event) do
    schedule_work()
    {:ok, event}
  end

  def handle_info(
        :start_work,
        %{
          :filter => filter,
          :sequence_number => sequence_number,
          :message => message,
          :ip_address => ip,
          :port => port
        } = state
      ) do

    opts = [:binary, active: true]

    case :gen_tcp.connect(ip, port, opts) do
      {:ok, socket} ->
        :gen_tcp.send(socket, filter)
        :gen_tcp.send(socket, message)

      {:error, reason} ->
        Logger.error("Error")
        Process.exit(self(), :kill)
    end

    {:noreply,state}
  end

  def handle_info({:tcp, socket, msg}, state) do
    # :inet.setopts(socket, active: :once)

    :gen_tcp.close(socket)
     Process.exit(self(), :kill)
    {:noreply, state}
  end

  def handle_info({:tcp_closed, _socket,}, state) do
    Client.SendingSupervisor.stop_child(self())
  end
end

谢谢

4

1 回答 1

0

问题就在这里:worker(Client.DynamicClient, [], restart: :temporary) 如果您希望在崩溃时重新启动“DynamicClient”,您应该使用restart: :permanent

于 2018-08-04T18:28:04.647 回答