4

我想要做的是让一个 gen_server 进程接受一个新客户端并立即产生一个新的孩子来处理下一个。我看到的问题是,当套接字完成并随之终止时,它也会关闭侦听套接字,我不知道为什么,即使它不再引用它。

知道我做错了什么吗?

gen_server:

-module(simple_tcp).
-behaviour(gen_server).

%% API
-export([start_link/1, stop/0, start/0, start/1]).

%% gen-server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-define(DEFAULT_PORT, 1055).

-record(state, {port, lsock}).

start_link({port, Port}) ->
    gen_server:start_link(?MODULE, [{port, Port}], []);

start_link({socket, Socket}) ->
    gen_server:start_link(?MODULE, [{socket, Socket}], []).

start({port, Port}) ->
    simple_tcp_sup:start_child({port, Port});

start({socket, Socket}) ->
    simple_tcp_sup:start_child({socket, Socket}).

start() ->
    start({port, ?DEFAULT_PORT}).

stop() ->
    gen_server:cast(?SERVER, stop).

% Callback functions
init([{port, Port}]) ->
    {ok, LSock} = gen_tcp:listen(Port, [{active, true},{reuseaddr, true}]),
    init([{socket, LSock}]);

init([{socket, Socket}]) ->
    io:fwrite("Starting server with socket: ~p~n", [self()]),
    {ok, Port} = inet:port(Socket),
    {ok, #state{port=Port, lsock=Socket}, 0}. 

handle_call(_Msg, _From, State) ->
    {noreply, State}.

handle_cast(stop, State) ->
    {stop, ok, State}.

handle_info({tcp, Socket, RawData}, State) ->
    gen_tcp:send(Socket, io_lib:fwrite("Received raw data: ~p~n", [RawData])),
    {noreply, State};

handle_info({tcp_error, _Socket, Reason}, State) ->
    io:fwrite("Error: ~p~n", [Reason]),
    {stop, normal, State};

handle_info(timeout, #state{lsock = LSock} = State) ->
    case gen_tcp:accept(LSock) of
        {ok, Sock} ->
            io:fwrite("Accepting connection...~p~n", [self()]),
            start({socket, LSock}),
            {noreply, #state{lsock=Sock}};

        {error, Reason} ->
            io:fwrite("Error: ~p, ~p~n", [Reason, self()]),
            {stop, normal, State}
    end;

handle_info({tcp_closed, _Port}, State) ->
    io:fwrite("Socket closed: ~p~n", [self()]),
    simple_tcp_sup:kill_child(self()),
    {stop, normal, State}.

terminate(_Reason, _State) ->
    io:fwrite("Shutting down server: ~p~n", [self()]),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

导师:

-module(simple_tcp_sup).

-behaviour(supervisor).

-export([start_link/0,
         start_child/1
        ]). 

-export([init/1]).

-define(SERVER, ?MODULE).

start_link() ->
    supervisor:start_link({local, ?SERVER}, ?MODULE, []).

start_child({socket, Socket}) ->
    io:fwrite("Spawning child with socket...~n"),
    supervisor:start_child(?SERVER, [{socket, Socket}]);

start_child({port, Port}) ->
    io:fwrite("Spawning child with port...~n"),
    supervisor:start_child(?SERVER, [{port, Port}]).

init([]) ->
    Element = {simple_tcp, {simple_tcp, start_link, []},
               temporary, brutal_kill, worker, [simple_tcp]},
    Children = [Element],
    RestartStrategy = {simple_one_for_one, 0, 1}, 
    {ok, {RestartStrategy, Children}}.
4

2 回答 2

6

您的第三个handle_info颠倒了Sock和的角色LSock。它应该传递Sock给子进程并保持其自身状态不变。

State顺便说一句:从头开始重建( #state{lsock=Sock})是不好的业力,您应该始终State从当前StateState#state{lsock=Sock})派生新的,以防万一您以后添加更多状态变量。实际上,这现在有一个错误(尽管是良性的),因为您正在丢弃端口号。

于 2011-09-03T05:42:45.320 回答
2

好吧,我建议你让 Socket 的东西由与 gen_server 异步通信的单独进程处理,并linked与它一起使用。我有一个示例代码片段,它将向您展示如何做到这一点。gen_server 启动并生成一个 TCP 侦听器,在成功获得侦听套接字后通知我们的 gen_server 更改其内部状态。我已经从上到下安排了代码。已显示所有相关功能。关注套接字处理过程以及它们如何与 gen_server 交互

-定义(PEER_CLIENT_TIMEOUT,计时器:秒(20))。
-定义(PORT_RANGE,{10245,10265})。
-define(调试(X,Y),error_logger:info_msg(X,Y))。
-定义(错误(L),错误记录器:错误报告(L))。
-define(SOCKET_OPTS(IP),[inet,binary,{backlog,100},{packet,0},
                            {reuseaddr,true},{active,true},
                            {ip,IP}])。

%%------------------------------------------------ ----
%% gen_server 从这里开始......

开始(对等名称)->
    gen_server:start_link({local,?MODULE},?MODULE,PeerName,[])。

%%%-------------------------------------------
%% Gen_server 初始化/1 函数

初始化(对等名称)->
    process_flag(trap_exit,true),
    %% 开始下面的整个 Socket 链..
    start_link_listener(),
    %% Socket 东西开始了,gen_server 现在可以等待异步了
    %% 消息
    {好的,[]}。

%%% ---- 套接字处理函数 ---------

%% 功能:start_link_listener/0
%% 目的:启动整个聆听链
%% 并等待连接。执行
%% 直接由 gen_server 进程,但是
%% 产生一个单独的进程来完成其余的工作

start_link_listener()->
    ip_address = get_myaddr(),  
    spawn_link(fun() -> listener(?SOCKET_OPTS(Ip_address)) end)。

%%%------------------------------------------   
%% 函数:get_myaddr/0
%% 目的:选择我机器上的活动 IP 地址
%% 听

get_myaddr()->
    ?DEBUG("Server> 试图提取我的本地 IP 地址....",[]),
    {ok,Name} = inet:gethostname(),
    {ok,IP} = inet:getaddr(Name,inet),
    ?DEBUG("Server> Found Alive Local IP address: ~p.....~n",[IP]),
    知识产权。

%%%---------------------------------------------------------- ---
%% 功能:listener/1,在单独的进程中执行
%% 目的:使用给定的 Socket 选项尝试给定的 ?PORT_RANGE
%% 一旦它获得了一个 ListenSocket,它就会强制转换 gen_server!

监听器(SocketOpts)->
    process_flag(trap_exit,true),
    端口 = 列表:seq(element(1,?PORT_RANGE),element(2,?PORT_RANGE)),
    案例 try_listening(SocketOpts,Ports) 的
        {ok,端口,LSocket}->              
                PP = proplists:get_value(ip,SocketOpts),
                ?MODULE:started_listener(端口,PP,LSocket),              
                接受连接(LSocket);
        {错误,失败} - > {错误,失败,SocketOpts}
    结尾。

try_listening(_Opts,[])-> {error,failed};
try_listening(Opts,[Port|Rest])->
    case gen_tcp:listen(Port,Opts) of
        {ok,Listen_Socket} -> {ok,Port,Listen_Socket};
        {error,_} -> try_listening(Opts,Rest)
    结尾。
%%%---------------------------------------------------------- ----------
用于从元组转换 IP 地址的 %% 辅助函数
%% 到字符串,反之亦然

str(X) 当 is_integer(X)-> integer_to_list(X) 时。

formise_ipaddress({A,B,C,D})->
    str(A) ++ "." ++ str(B) ++ "." ++ str(C) ++ "." ++ 字符串(D)。

unformalise_address(String)->
    [A,B,C,D] = string:tokens(String,"."),
    {list_to_integer(A),list_to_integer(B),list_to_integer(C),list_to_integer(D)}。

%%%---------------------------------------------------------- ---
%% 函数:get_source_connection/1
%% 目的:检索对方的IP和端口
%% 连接结束

get_source_connection(Socket)->
    尝试 inet:peername(Socket) 的
        {ok,{IP_Address, 端口}} ->
            [{ipAddress,formalise_ipaddress(IP_Address)},{port,Port}];
        _ -> failed_to_retrieve_address
    抓住
        _:_ -> failed_to_retrieve_address
    结尾。

%%%---------------------------------------------------------- ------
%% 功能:accept_connection/1
%% 目的:等待连接并重新使用
%% ListenSocket 通过产生另一个线程
%% 拿去听。它投射 gen_server
%% 在每个连接上,并提供有关它的详细信息。

接受连接(ListenSocket)->    
    case gen_tcp:accept(ListenSocket,infinity) 的
        {好的,套接字}->
            %% 重用下面的 ListenSocket .....
            spawn_link(fun() -> accept_connection(ListenSocket) end),            
            OtherEnd = get_source_connection(Socket),
            ?MODULE:accepted_connection(OtherEnd),          
            循环(套接字,另一端);
        {error,_} = 原因 ->
            ?ERROR(["监听器接受连接失败",
                    {listener,self()},{原因,原因}])
    结尾。

%%%---------------------------------------------------------- --------------------------
%% 功能:循环/2
%% 目的:TCP 接收循环,它强制转换 gen_server
%% 一旦收到东西。gen_server
%% 负责生成响应
%% OtherEnd ::= [{ipAddress,StringIPAddress},{Port,Port}] 或 'failed_to_retrieve_address'

循环(套接字,OtherEnd)->
    收到
        {tcp、套接字、数据}->
            ?DEBUG("Acceptor: ~p has received a binary message from: ~p~n",[self(),OtherEnd]),
            回复 = ?MODULE:incoming_binary_message(Data,OtherEnd),
            gen_tcp:send(Socket,Reply),         
            gen_tcp:关闭(套接字),
            退出(正常);
        {tcp_closed, 套接字} ->
            ?DEBUG("Acceptor: ~p. Socket 被另一端关闭: ~p~n",[self(),OtherEnd]),
            ?MODULE:socket_closed(OtherEnd),
            退出(正常);
        Any -> ?DEBUG("Acceptor: ~p has received a message: ~p~n",[self(),Any])
    结尾。

%%%------------------------------------------
%% Gen_server 异步 API

接受的连接(failed_to_retrieve_address)-> 好的;
接受连接([{ipAddress,StringIPAddress},{Port,Port}])->     
    gen_server:cast(?MODULE,{connected,StringIPAddress,Port})。

socket_closed(failed_to_retrieve_address)-> 好的;
socket_closed([{ipAddress,StringIPAddress},{Port,Port}])->
    gen_server:cast(?MODULE,{socket_closed,StringIPAddress,Port})。

incoming_binary_message(Data,_OtherEnd)-> %% 期待二进制回复
    case analyse_protocol(Data) of
        错误 -> term_to_binary("违反协议!");
        Val -> gen_server:call(?MODULE,{request,Val},infinity)
    结尾。

%%% -------- 处理强制转换 ------------- -----------------

handle_cast({listener_starts,_Port,_MyTupleIP,_LSocket} = 对象,状态)->
    NewState = do_something_with_the_listen_report(对象),
    {noreply,新州};
handle_cast({connected,_StringIPAddress,_Port} = 对象,状态)->
    NewState = do_something_with_the_connection_report(对象),
    {noreply,新州};
handle_cast({socket_closed,_StringIPAddress,_Port} = 对象,状态)->
    NewState = do_something_with_the_closed_connection_report(对象),
    {noreply,新州};
handle_cast(任何,状态)->
    ?DEBUG("Server> 我收到了一些未知信息:~p~n",[Any]),
    {noreply,州}。


%%%% ---------- 处理调用 --------------
handle_call({request,Val},_,State)->
    {NewState,Reply} = req(Val,State),
    {回复,回复,新州};
handle_call(_,_,State)-> {reply,[],State}。

req(Val,State)->
    %% 修改 gen_server 状态和
    %% 构建回复
    {NewState,Reply} = modify_state_and_get_reply(State,Val),
    {新州,回复}。

%%-------------------- 终止/2 --------

终止(_Reason,_State)-> 好的。  

%%----------------- code_change/3 ------------------

code_change(_,State,_)-> {ok,State}。

使用 gen_server 的异步功能,我们可以处理来自不同链接进程的套接字详细信息。然后,这些进程将通过cast并且不阻止 gen_server 的并发性质与 gen_server 通信。

于 2011-09-03T06:17:32.080 回答