5

我有一个以潜在高速率产生点的数据源,我想对每个点执行可能很耗时的操作;但我也希望系统在过载时通过丢弃多余的数据点来优雅地降级。

据我所知,使用 gen_event 永远不会跳过事件。从概念上讲,我希望 gen_event 做的是在再次运行处理程序之前删除除最新待处理事件之外的所有事件。

有没有办法用标准 OTP 做到这一点?还是有充分的理由为什么我不应该那样处理事情?

到目前为止,我最好的方法是使用 gen_server 并依靠超时来触发昂贵的事件:

-behaviour(gen_server).
init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    gen_event:sync_notify(Pid,Data),
    {noreply, {Pid,Data}}.

这种方法正确吗?(特别是关于监督?)

4

2 回答 2

1

我无法对监督发表评论,但我会将其作为一个包含过期项目的队列来实现。

我已经实现了一些你可以在下面使用的东西。

我把它做成了 gen_server;当你创建它时,你给它一个旧项目的最大年龄。

它的接口是你可以向它发送待处理的项目,你可以请求尚未出队的项目。它记录了它收到每个项目的时间。每次收到要处理的项目时,它都会检查队列中的所有项目,将那些超过最大年龄的项目出列并丢弃。(如果您希望始终遵守最大年龄,可以在返回排队项目之前过滤队列)

您的数据源会将数据 ( {process_this, Anything}) 投射到工作队列,并且您的(可能很慢的)消费者进程将调用 ( gimme) 来获取数据。

-module(work_queue).
-behavior(gen_server).

-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}
  end.

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

REPL 上的小演示:

c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.        
于 2011-08-15T16:33:56.200 回答
0

有没有办法用标准 OTP 做到这一点?

不。

我不应该这样处理事情有充分的理由吗?

不,提前超时可以提高整个系统的性能。在这里阅读如何。

这种方法正确吗?(特别是关于监督?)

不知道,你还没有提供监管代码。


作为您第一个问题的一些额外信息:

如果您可以在 OTP 之外使用 3rd 方库,那么有一些可以添加抢占式超时,这就是您所描述的。

有两个我比较熟悉,第一个是dispcount,第二个是chick(我是chick的作者,这里尽量不给项目做广告)。

Dispcount 非常适用于只有有限数量的作业可以同时运行并且不排队的单一资源。你可以在这里阅读它(警告很多非常有趣的信息!)。

Dispcount 对我不起作用,因为我必须生成 4000 多个进程池来处理我的应用程序内不同队列的数量。我写小鸡是因为我需要一种方法来动态增加和减少我的队列长度,以及能够排队请求并拒绝其他请求,而不必产生 4000 多个进程池。

如果我是你,我会先尝试折扣(因为大多数解决方案不需要小鸡),然后如果你需要一些更动态的东西,那么可以响应一定数量的请求的池试试小鸡。

于 2013-10-14T22:11:44.047 回答