从我在这里阅读的内容:https ://hexdocs.pm/gen_stage/ConsumerSupervisor.html
ConsumerSupervisor 的所有实现只为每个工作单元启动一个子项(上面链接中的打印机模块)。如果孩子死了,有没有办法让孩子重新开始?
对我来说,如果孩子没有正常关机,它可以使用名称“ConsumerSupervisor”重新启动孩子。有没有人这样做过?
在我的实现中,我让消费者主管启动一个实际上是 GenServer 的孩子来执行工作,然后关闭它自己。如果它异常崩溃,我希望它重新启动。
想法..我考虑过只实现一个消费者,它调用一个动态监督器然后启动孩子,但这并不能解释反向压力..
这是我实现它的方式,但希望孩子在崩溃时重新启动:
defmodule Client.Strategy.EventPushing.SendingConsumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
# Callbacks
def init(:ok) do
children = [
worker(Client.DynamicClient, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, subscribe_to: [{Client.Strategy.EventPushing.SendingProducerConsumer, max_demand: 10}]}
end
end
旋转起来的孩子看起来像这样:
defmodule Client.DynamicClient do
use GenServer
require Logger
# Public API's
def start_link(event) do
Client.Statix.timing("sending", 1)
GenServer.start_link(
__MODULE__,
event,
name: String.to_atom(event.sequence_number)
)
end
def schedule_work() do
send(self(), :start_work)
end
# Call Backs
def init(event) do
schedule_work()
{:ok, event}
end
def handle_info(
:start_work,
%{
:filter => filter,
:sequence_number => sequence_number,
:message => message,
:ip_address => ip,
:port => port
} = state
) do
opts = [:binary, active: true]
case :gen_tcp.connect(ip, port, opts) do
{:ok, socket} ->
:gen_tcp.send(socket, filter)
:gen_tcp.send(socket, message)
{:error, reason} ->
Logger.error("Error")
Process.exit(self(), :kill)
end
{:noreply,state}
end
def handle_info({:tcp, socket, msg}, state) do
# :inet.setopts(socket, active: :once)
:gen_tcp.close(socket)
Process.exit(self(), :kill)
{:noreply, state}
end
def handle_info({:tcp_closed, _socket,}, state) do
Client.SendingSupervisor.stop_child(self())
end
end
谢谢