我创建了一个基于我命名为 gen_select 的 gen_server 的行为。使用它,您可以使用 module 属性编写回调模块-behaviour(gen_select)
。在您的 init/1 回调中,您打开一个 ets 或 dets 文件并定义匹配规范和限制。该过程将通过表handle_record/2
为每个记录调用您的回调进行分块,直到文件结束。对于我一直在做的一些“大数据”工作,我发现这是一个方便的范例。如果合适的话,你可以在你的 mnesia 表的底层 ets 表上使用它,或者修改它以使用 mnesia:select/4。
%%% gen_select.erl
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% @doc This module implements a behaviour pattern where a potentially
%%% large number of records are read from an {@link //stdlib/ets. ets}
%%% or {@link //stdlib/dets. dets} table. This is used in an application
%%% to have supervised workers mapping over all the records in a table.
%%% The user will call `gen_select:start_link/3', from a supervisor, to
%%% create a process which will iterate over the selected records of a
%%% table. The `init/1' callback should open the table to
%%% be read and construct a match specification to be used to select
%%% records from the table. It should return a tuple of the form:
%%% ```
%%% {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore
%%% TableType :: ets | dets
%%% Table :: ets:tid() | atom() % when Type=ets
%%% Table :: dets:tab_name() % when Type=dets
%%% MatchSpec :: match_spec() % see ets:select/2
%%% Limit :: integer() % see ets:select/3
%%% State :: term()
%%% Reason :: term()
%%% '''
%%% After initialization {@link //stdlib/ets:select/3. ets:select/3}
%%% or {@link //stdlib/dets:select/3. dets:select/3} will be called
%%% using the `match_spec()' and `Limit' returned by `init/`'. The
%%% callback function `handle_record/2' will then be called for each
%%% record returned then `select/1' will be called to get more records.
%%% This is repeated until the end of the table is reached when the
%%% callback `terminate/2' is called with `Reason=eof'..
%%%
-module(gen_select).
-author('vance@wavenet.lk').
%% export the gen_select API
-export([start_link/3]).
%% export the callbacks needed for a system process
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([format_status/2]).
%% exports used internally
-export([init_it/6]).
%% define the callback exports of a module behaving as gen_select
-type state() :: term().
-callback init(Args :: term()) ->
{ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(),
MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()}
| {stop, Reason :: term()} | ignore.
-callback handle_record(Record :: tuple(), State :: state()) ->
{next_record, NewState :: state()}
| {stop, Reason :: term(), NewState :: state()}.
-callback terminate(Reason :: eof | term(), State :: state()) ->
any().
-import(error_logger, [format/2]).
%%----------------------------------------------------------------------
%% gen_select API
%%----------------------------------------------------------------------
-spec start_link(Mod :: atom(), Args :: term(),
Options :: gen:options()) -> gen:start_ret().
%% @doc Creates a {@module} process as part of a supervision tree.
%%
start_link(Mod, Args, Options) ->
gen:start(?MODULE, link, Mod, Args, Options).
%%----------------------------------------------------------------------
%% internal exports
%%----------------------------------------------------------------------
-spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(),
CallBackMod :: atom(), Args :: term(), Options :: gen:options()) ->
no_return().
%% @doc Called by {@link //stdlib/gen:start/5. gen:start/5} to initialize
%% the process.
%% Copied from //stdlib/gen_server:init_it/6.
%% @hidden
init_it(Starter, Parent, Pid, CallBackMod, Args, Options) ->
Debug = debug_options(Pid, Options),
case catch CallBackMod:init(Args) of
{ok, TableMod, Table, MatchSpec, Limit, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
case catch ets:select(Table, MatchSpec, Limit) of
{Matches, Cont} when is_list(Matches) ->
loop1(Parent, CallBackMod, Debug, State,
TableMod, Cont, Matches);
'$end_of_table' ->
proc_lib:init_ack(Starter, {error, eof}),
exit(eof);
{error, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{'EXIT', Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason)
end;
{stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
end.
%%----------------------------------------------------------------------
%% system process callbacks
%%----------------------------------------------------------------------
-type misc() :: [CallBackMod :: atom() | [State :: state()
| [TableMod :: atom() | [Cont :: term()
| [Matches :: [tuple()] | []]]]]].
-spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()],
Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to continue.
%% @private
system_continue(Parent, Debug, [CallBackMod, State,
TableMod, Cont, Matches]) ->
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches).
-spec system_terminate(Reason :: term(), Parent :: pid(),
Debug :: [gen:dbg_opt()], Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to terminate.
%% @private
system_terminate(Reason, _Parent, Debug, [CallBackMod, State,
_TableMod, _Cont, _Matches]) ->
terminate(Reason, CallBackMod, Debug, State).
-spec system_code_change(Misc :: misc(), Module :: atom(),
OldVsn :: undefined | term(), Extra :: term()) ->
{ok, NewMisc :: misc()}.
%% @doc Called by {@link //sys:handle_system_msg/6} to update `Misc'.
%% @private
system_code_change([CallBackMod, State, TableMod, Cont, Matches],
_Module, OldVsn, Extra) ->
case catch CallBackMod:code_change(OldVsn, State, Extra) of
{ok, NewState} ->
{ok, [CallBackMod, NewState, TableMod, Cont, Matches]};
Other ->
Other
end.
-type pdict() :: [{Key :: term(), Value :: term()}].
-type status_data() :: [PDict :: pdict() | [SysState :: term()
| [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]].
-spec format_status(Opt :: normal | terminate, StatusData :: status_data()) ->
[tuple()].
%% @doc Called by {@link //sys:get_status/1} to print state.
%% @private
format_status(Opt, [PDict, SysState, Parent, Debug,
[CallBackMod, State, _TableMod, _Cont, _Matches]]) ->
Header = gen:format_status_header("Status for table reader", self()),
Log = sys:get_debug(log, Debug, []),
DefaultStatus = [{data, [{"State", State}]}],
Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of
true ->
case catch CallBackMod:format_status(Opt, [PDict, State]) of
{'EXIT', _} ->
DefaultStatus;
StatusList when is_list(StatusList) ->
StatusList;
Else ->
[Else]
end;
_ ->
DefaultStatus
end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log}]}
| Specfic].
%%----------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------
-spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state(), TableMod :: atom(),
Cont :: term(), Matches :: [tuple()]) -> no_return().
%% @doc Main loop.
%% Copied from //stdlib/gen_server:loop1/6.
%% @hidden
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) ->
receive
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[CallBackMod, State, TableMod, Cont, Matches]);
{'EXIT', Parent, Reason} ->
terminate(Reason, CallBackMod, Debug, State);
Msg ->
sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg})
after 0 ->
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches)
end.
-spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state(), TableMod :: atom(), Cont :: term(),
Matches :: [tuple()]) -> no_return().
%% @doc Run the `select/1' function.
%% @hidden
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) ->
case catch CallBackMod:handle_record(H, State) of
{next_record, NewState} ->
loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T);
{stop, Reason, NewState} ->
terminate(Reason, CallBackMod, Debug, NewState);
{'EXIT', Reason} ->
terminate(Reason, CallBackMod, Debug, State)
end;
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) ->
case catch TableMod:select(Cont) of
{Matches, NewCont} when is_list(Matches) ->
sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}),
loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches);
'$end_of_table' ->
terminate(eof, CallBackMod, Debug, State);
{error, Reason} ->
terminate(Reason, CallBackMod, Debug, State);
{'EXIT', Reason} ->
terminate(Reason, CallBackMod, Debug, State)
end.
-spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state()) -> no_return().
%% @doc Terminate the {@module} process.
%% Copied from //stdlib/gen_server:terminate/6.
%% @hidden
terminate(Reason, CallBackMod, Debug, State) ->
case catch CallBackMod:terminate(Reason, State) of
{'EXIT', R} ->
error_info(R, State, Debug),
exit(R);
_ ->
case Reason of
normal ->
exit(normal);
shutdown ->
exit(shutdown);
{shutdown, _} = Shutdown ->
exit(Shutdown);
_ ->
FmtState = case erlang:function_exported(CallBackMod,
format_status, 2) of
true ->
case catch CallBackMod:format_status(terminate,
[get(), State]) of
{'EXIT', _} ->
State;
Else ->
Else
end;
_ ->
State
end,
error_info(Reason, FmtState, Debug),
exit(Reason)
end
end.
-spec error_info(Reason :: term(), State :: state(),
Debug :: [gen:dbg_opt()]) -> ok.
%% @doc Print error log message.
%% Copied from //stdlib/gen_server:error_info/5.
%% @hidden
error_info(Reason, State, Debug) ->
Reason1 = case Reason of
{undef, [{M, F, A, L} | MFAs]} ->
case code:is_loaded(M) of
false ->
{'module could not be loaded', [{M, F, A, L} | MFAs]};
_ ->
case erlang:function_exported(M, F, length(A)) of
true ->
Reason;
false ->
{'function not exported', [{M, F, A, L} | MFAs]}
end
end;
_ ->
Reason
end,
format("** Table reader ~p terminating \n"
"** When Server state == ~p~n"
"** Reason for termination == ~n** ~p~n",
[self(), State, Reason1]),
sys:print_log(Debug),
ok.
%% Copied from //stdlib/gen_server:opt/2
opt(Op, [{Op, Value} | _]) ->
{ok, Value};
opt(Op, [_ | Options]) ->
opt(Op, Options);
opt(_, []) ->
false.
%% Copied from //stdlib/gen_server:debug_options/2
debug_options(Name, Opts) ->
case opt(debug, Opts) of
{ok, Options} ->
dbg_options(Name, Options);
_ ->
dbg_options(Name, [])
end.
%% Copied from //stdlib/gen_server:dbg_options/2
dbg_options(Name, []) ->
Opts = case init:get_argument(generic_debug) of
error ->
[];
_ ->
[log, statistics]
end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
%% Copied from //stdlib/gen_server:dbg_opts/2
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
{'EXIT',_} ->
format("~p: ignoring erroneous debug options - ~p~n",
[Name, Opts]),
[];
Dbg ->
Dbg
end.
-spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -> ok.
%% @doc Called by {@link //sys:handle_debug/4} to print trace events.
print_event(Dev, {in, Msg}, Pid) ->
io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]);
print_event(Dev, {read, Matches}, Pid) ->
io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]).