我无法对监督发表评论,但我会将其作为一个包含过期项目的队列来实现。
我已经实现了一些你可以在下面使用的东西。
我把它做成了 gen_server;当你创建它时,你给它一个旧项目的最大年龄。
它的接口是你可以向它发送待处理的项目,你可以请求尚未出队的项目。它记录了它收到每个项目的时间。每次收到要处理的项目时,它都会检查队列中的所有项目,将那些超过最大年龄的项目出列并丢弃。(如果您希望始终遵守最大年龄,可以在返回排队项目之前过滤队列)
您的数据源会将数据 ( {process_this, Anything}
) 投射到工作队列,并且您的(可能很慢的)消费者进程将调用 ( gimme
) 来获取数据。
-module(work_queue).
-behavior(gen_server).
-export([init/1, handle_cast/2, handle_call/3]).
init(DiscardAfter) ->
{ok, {DiscardAfter, queue:new()}}.
handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
Instant = now(),
Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
Queue2 = queue:in({Instant, Data}, Queue1),
{noreply, {DiscardAfter, Queue2}}.
handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
case queue:is_empty(Queue0) of
true ->
{reply, no_data, State};
false ->
{{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
{reply, {data, Data}, {DiscardAfter, Queue1}}
end.
delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.
too_old(Stamp, Instant, DiscardAfter) ->
delta(Stamp, Instant) > DiscardAfter.
REPL 上的小演示:
c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),
timer:sleep(11 * 1000),
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.