1

我在 Mnesia 有一张大桌子,由于各种原因(这里不重要,假设我正在远程执行选择,结果必须使用一些 3rd 方库通过网络发送)我无法一次选择所有行. 我已经将选择拆分为一次只检索特定数量的列。

例如,这是一个仅检索特定列的选择示例:

mnesia:dirty_select([table,[{{table,'$1','_','$3','$4','_','_','_'},[],['$$']}]]).

我使用不同的列集运行该选择两次,然后合并结果。但现在事实证明,其中一列也太大而无法在一次选择中检索。所以我想将一个大选择分成两个选择,每个选择只检索该列中的一半行。有没有一种简单的方法可以检索每隔一行?像只选择奇数行,然后只选择偶数行?或者也许是一种检索前半部分和后半部分行的方法?

我尝试从其中一列中选择所有行,然后将其用作检索特定行的索引。这可行,但需要相当长的时间来构建选择然后执行它。

编辑:对不起,我没有足够强调选择正在远程执行的事实。我知道迭代记录或直接访问文件,但这里的挑战是必须使用相对少量的命令检索这些记录,并且这些命令必须能够远程执行。

例如:

  1. 仅选择第一列(简单的单个mnesia:dirty_select命令)。
  2. 一旦检索到结果(通过网络),将其分成两组并用作键来构造选择以获取特定记录(每个选择将包含一长串要检索的键,但这很好,因为它们是简单的 Erlang 术语,可以通过网络发送)
  3. 使用在 2 中创建的这两组键分两步检索所有行。

这可行,但并不容易,也不是最优的,因为它双向发送了相当多的数据。可能没有简单的解决方案,除非选择的构造考虑到每列和行中包含的特定数据(例如,匹配第一列中名称以字母“A”到“M”开头的所有行)。我只是不确定使用标准 Mnesia 命令有什么可能,并希望有一个更简单的解决方案。

4

2 回答 2

0

我创建了一个基于我命名为 gen_select 的 gen_server 的行为。使用它,您可以使用 module 属性编写回调模块-behaviour(gen_select)。在您的 init/1 回调中,您打开一个 ets 或 dets 文件并定义匹配规范和限制。该过程将通过表handle_record/2为每个记录调用您的回调进行分块,直到文件结束。对于我一直在做的一些“大数据”工作,我发现这是一个方便的范例。如果合适的话,你可以在你的 mnesia 表的底层 ets 表上使用它,或者修改它以使用 mnesia:select/4。

%%% gen_select.erl
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% @doc This module implements a behaviour pattern where a potentially
%%%     large number of records are read from an {@link //stdlib/ets. ets}
%%%     or {@link //stdlib/dets. dets} table.  This is used in an application
%%%     to have supervised workers mapping over all the records in a table.
%%%     The user will call `gen_select:start_link/3', from a supervisor, to
%%%     create a process which will iterate over the selected records of a
%%%     table.  The `init/1' callback should open the table to
%%%     be read and construct a match specification to be used to select 
%%%     records from the table.  It should return a tuple of the form:
%%%     ```
%%%     {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore
%%%         TableType :: ets | dets
%%%         Table :: ets:tid() | atom()  % when Type=ets
%%%         Table :: dets:tab_name()     % when Type=dets
%%%         MatchSpec :: match_spec()    % see ets:select/2
%%%         Limit :: integer()           % see ets:select/3
%%%         State :: term()
%%%         Reason :: term()
%%%     '''
%%% After initialization {@link //stdlib/ets:select/3. ets:select/3}
%%% or {@link //stdlib/dets:select/3. dets:select/3} will be called
%%% using the `match_spec()' and `Limit' returned by `init/`'.  The
%%% callback function `handle_record/2' will then be called for each
%%% record returned then `select/1' will be called to get more records.
%%% This is repeated until the end of the table is reached when the
%%% callback `terminate/2' is called with `Reason=eof'..
%%% 
-module(gen_select).
-author('vance@wavenet.lk').

%% export the gen_select API
-export([start_link/3]).

%% export the callbacks needed for a system process
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([format_status/2]).

%% exports used internally
-export([init_it/6]).

%% define the callback exports of a module behaving as gen_select
-type state() :: term().
-callback init(Args :: term()) ->
    {ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(),
        MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()}
        | {stop, Reason :: term()} | ignore.
-callback handle_record(Record :: tuple(), State :: state()) ->
    {next_record, NewState :: state()}
        | {stop, Reason :: term(), NewState :: state()}.
-callback terminate(Reason :: eof | term(), State :: state()) ->
    any().

-import(error_logger, [format/2]).

%%----------------------------------------------------------------------
%%  gen_select API
%%----------------------------------------------------------------------

-spec start_link(Mod :: atom(), Args :: term(),
        Options :: gen:options()) -> gen:start_ret().
%% @doc Creates a {@module} process as part of a supervision tree.
%% 
start_link(Mod, Args, Options) ->
    gen:start(?MODULE, link, Mod, Args, Options).

%%----------------------------------------------------------------------
%%  internal exports
%%----------------------------------------------------------------------

-spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(),
        CallBackMod :: atom(), Args :: term(), Options :: gen:options()) ->
    no_return().
%% @doc Called by {@link //stdlib/gen:start/5. gen:start/5} to initialize
%%  the process.
%%  Copied from //stdlib/gen_server:init_it/6.
%% @hidden
init_it(Starter, Parent, Pid, CallBackMod, Args, Options) ->
    Debug = debug_options(Pid, Options),
    case catch CallBackMod:init(Args) of
        {ok, TableMod, Table, MatchSpec, Limit, State} ->
            proc_lib:init_ack(Starter, {ok, self()}),
            case catch ets:select(Table, MatchSpec, Limit) of
                {Matches, Cont} when is_list(Matches) ->
                    loop1(Parent, CallBackMod, Debug, State,
                            TableMod, Cont, Matches);
                '$end_of_table' ->
                    proc_lib:init_ack(Starter, {error, eof}),
                    exit(eof);
                {error, Reason} ->
                    proc_lib:init_ack(Starter, {error, Reason}),
                    exit(Reason);
                {'EXIT', Reason} ->
                    proc_lib:init_ack(Starter, {error, Reason}),
                    exit(Reason)
            end;
        {stop, Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        ignore ->
            proc_lib:init_ack(Starter, ignore),
            exit(normal);
        {'EXIT', Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        Else ->
            Error = {bad_return_value, Else},
            proc_lib:init_ack(Starter, {error, Error}),
            exit(Error)
    end.

%%----------------------------------------------------------------------
%%  system process callbacks
%%----------------------------------------------------------------------

-type misc() :: [CallBackMod :: atom() | [State :: state()
        | [TableMod :: atom() | [Cont :: term()
        | [Matches :: [tuple()] | []]]]]].

-spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()],
        Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to continue.
%% @private
system_continue(Parent, Debug, [CallBackMod, State,
        TableMod, Cont, Matches]) ->
    loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches).

-spec system_terminate(Reason :: term(), Parent :: pid(),
        Debug :: [gen:dbg_opt()], Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to terminate.
%% @private
system_terminate(Reason, _Parent, Debug, [CallBackMod, State,
        _TableMod, _Cont, _Matches]) ->
    terminate(Reason, CallBackMod, Debug, State).

-spec system_code_change(Misc :: misc(), Module :: atom(),
        OldVsn :: undefined | term(), Extra :: term()) ->
    {ok, NewMisc :: misc()}.
%% @doc Called by {@link //sys:handle_system_msg/6} to update `Misc'.
%% @private
system_code_change([CallBackMod, State, TableMod, Cont, Matches],
        _Module, OldVsn, Extra) ->
    case catch CallBackMod:code_change(OldVsn, State, Extra) of
        {ok, NewState} ->
            {ok, [CallBackMod, NewState, TableMod, Cont, Matches]};
        Other ->
            Other
    end.

-type pdict() :: [{Key :: term(), Value :: term()}].
-type status_data() :: [PDict :: pdict() | [SysState :: term()
        | [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]].
-spec format_status(Opt :: normal | terminate, StatusData :: status_data()) ->
    [tuple()].
%% @doc Called by {@link //sys:get_status/1} to print state.
%% @private
format_status(Opt, [PDict, SysState, Parent, Debug, 
        [CallBackMod, State, _TableMod, _Cont, _Matches]]) ->
    Header = gen:format_status_header("Status for table reader", self()),
    Log = sys:get_debug(log, Debug, []),
    DefaultStatus = [{data, [{"State", State}]}],
    Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of
        true ->
            case catch CallBackMod:format_status(Opt, [PDict, State]) of
                {'EXIT', _} ->
                    DefaultStatus;
                StatusList when is_list(StatusList) ->
                    StatusList;
                Else ->
                    [Else]
            end;
        _ ->
            DefaultStatus
    end,
    [{header, Header},
            {data, [{"Status", SysState},
                    {"Parent", Parent},
                    {"Logged events", Log}]}
            | Specfic].

%%----------------------------------------------------------------------
%%  internal functions
%%----------------------------------------------------------------------

-spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state(), TableMod :: atom(),
        Cont :: term(), Matches :: [tuple()]) -> no_return().
%% @doc Main loop.
%%  Copied from //stdlib/gen_server:loop1/6.
%% @hidden
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) ->
    receive
        {system, From, Req} ->
            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
                    [CallBackMod, State, TableMod, Cont, Matches]);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, CallBackMod, Debug, State);
        Msg ->
            sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg})
    after 0 ->
        loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches)
    end.

-spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state(), TableMod :: atom(), Cont :: term(),
        Matches :: [tuple()]) -> no_return().
%% @doc Run the `select/1' function.
%% @hidden
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) ->
    case catch CallBackMod:handle_record(H, State) of
        {next_record, NewState} ->
            loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T);
        {stop, Reason, NewState} ->
            terminate(Reason, CallBackMod, Debug, NewState);
        {'EXIT', Reason} ->
            terminate(Reason, CallBackMod, Debug, State)
    end;
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) ->
    case catch TableMod:select(Cont) of
        {Matches, NewCont} when is_list(Matches) ->
            sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}),
            loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches);
        '$end_of_table' ->
            terminate(eof, CallBackMod, Debug, State);
        {error, Reason} ->
            terminate(Reason, CallBackMod, Debug, State);
        {'EXIT', Reason} ->
            terminate(Reason, CallBackMod, Debug, State)
    end.

-spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state()) -> no_return().
%% @doc Terminate the {@module} process.
%%  Copied from //stdlib/gen_server:terminate/6.
%% @hidden
terminate(Reason, CallBackMod, Debug, State) ->
    case catch CallBackMod:terminate(Reason, State) of
        {'EXIT', R} ->
            error_info(R, State, Debug),
            exit(R);
        _ ->
            case Reason of
                normal ->
                    exit(normal);
                shutdown ->
                    exit(shutdown);
                {shutdown, _} = Shutdown ->
                    exit(Shutdown);
                _ ->
                    FmtState = case erlang:function_exported(CallBackMod,
                            format_status, 2) of
                        true ->
                            case catch CallBackMod:format_status(terminate,
                                    [get(), State]) of
                                {'EXIT', _} ->
                                    State;
                                Else ->
                                    Else
                            end;
                        _ ->
                            State
                    end,
                    error_info(Reason, FmtState, Debug),
                    exit(Reason)
            end
    end.

-spec error_info(Reason :: term(), State :: state(),
        Debug :: [gen:dbg_opt()]) -> ok.
%% @doc Print error log message.
%%  Copied from //stdlib/gen_server:error_info/5.
%% @hidden
error_info(Reason, State, Debug) ->
    Reason1 = case Reason of
        {undef, [{M, F, A, L} | MFAs]} ->
            case code:is_loaded(M) of
                false ->
                    {'module could not be loaded', [{M, F, A, L} | MFAs]};
                _ ->
                    case erlang:function_exported(M, F, length(A)) of
                        true ->
                            Reason;
                        false ->
                            {'function not exported', [{M, F, A, L} | MFAs]}
                    end
            end;
        _ ->
            Reason
   end,
    format("** Table reader ~p terminating \n"
            "** When Server state == ~p~n"
            "** Reason for termination == ~n** ~p~n",
            [self(), State, Reason1]),
    sys:print_log(Debug),
    ok.

%% Copied from //stdlib/gen_server:opt/2
opt(Op, [{Op, Value} | _]) ->
    {ok, Value};
opt(Op, [_ | Options]) ->
    opt(Op, Options);
opt(_, []) ->
    false.

%% Copied from //stdlib/gen_server:debug_options/2
debug_options(Name, Opts) ->
    case opt(debug, Opts) of
        {ok, Options} ->
            dbg_options(Name, Options);
        _ ->
            dbg_options(Name, [])
    end.

%% Copied from //stdlib/gen_server:dbg_options/2
dbg_options(Name, []) ->
    Opts = case init:get_argument(generic_debug) of
        error ->
            [];
        _ ->
            [log, statistics]
    end,
    dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
    dbg_opts(Name, Opts).

%% Copied from //stdlib/gen_server:dbg_opts/2
dbg_opts(Name, Opts) ->
    case catch sys:debug_options(Opts) of
        {'EXIT',_} ->
            format("~p: ignoring erroneous debug options - ~p~n",
                    [Name, Opts]),
            [];
        Dbg ->
            Dbg
    end.

-spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -> ok.
%% @doc Called by {@link //sys:handle_debug/4} to print trace events.
print_event(Dev, {in, Msg}, Pid) ->
    io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]);
print_event(Dev, {read, Matches}, Pid) ->
    io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]).
于 2013-05-13T11:01:16.030 回答
0

如果你在你的表上放了一个自动递增整数的键,那么你可以用 QLC 粗略地做到这一点。

Evens = qlc:q([Rec || Rec <- Table, (Rec#table.int_key rem 2) =:= 0]).
Odds  = qlc:q([Rec || Rec <- Table, (Rec#table.int_key rem 2) =:= 1]).

如果您没有或想要这样的密钥,另一种方法是使用qlc:fold/3您的表来过滤掉每一秒的记录。如果出于内存原因需要,Mnesia 应该使用临时文件来迭代数据。

CollectOddsFun = fun(Rec, {N, List}) ->
                     NewList = case N rem 2 of
                       0 -> [Rec|List];
                       1 -> List
                     end,
                     {N+1, NewList}
                  end,

qlc:fold(CollectOddsFun, {0, []}, Table).
于 2013-05-12T08:08:55.980 回答