3

我试图用 Erlang 为我的 Java 应用程序实现一个服务器。似乎我的服务器正在工作,但仍然充满错误和死点。我需要将 Java 应用程序解析的 JSON 文件接收到地图中,并将其发送回所有客户端,包括上传文件的客户端。同时,我需要跟踪谁发出了请求以及消息的哪一部分被发送,以防出现任何问题,客户端应该从此时开始重新启动,而不是从头开始。除非客户端离开应用程序,否则它应该重新启动。

我的三段代码如下:

app.erl

-module(erlServer_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

%%%===================================================================
%%% Application callbacks
%%%===================================================================

start(_StartType, _StartArgs) ->
    erlServer_sup:start_link().


stop(_State) ->
    ok.

主管.erl:

-module(erlServer_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1, start_socket/0, terminate_socket/0, empty_listeners/0]).

-define(SERVER, ?MODULE).

%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%%
%% @end
%%--------------------------------------------------------------------
start_link() ->
  supervisor:start_link({local, ?SERVER}, ?MODULE, []).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

init([]) -> % restart strategy 'one_for_one': if one goes down only that one is restarted
  io:format("starting...~n"),
  spawn_link(fun() -> empty_listeners() end),
  {ok,
    {{one_for_one, 5, 30}, % The flag - 5 restart within 30 seconds
      [{erlServer_server, {erlServer_server, init, []}, permanent, 1000, worker, [erlServer_server]}]}}.


%%%===================================================================
%%% Internal functions
%%%===================================================================

start_socket() ->
  supervisor:start_child(?MODULE, []).

terminate_socket() ->
  supervisor:delete_child(?MODULE, []).

empty_listeners() ->
  [start_socket() || _ <- lists:seq(1,20)],
  ok.

server.erl:(我有很多调试io:format。)

-module(erlServer_server).

%% API
-export([init/0, start_server/0]).

%% Defining the port used.
-define(PORT, 8080).

%%%===================================================================
%%% API
%%%===================================================================

init() ->
  start_server().

%%%===================================================================
%%% Server callbacks
%%%===================================================================

start_server() ->
  io:format("Server started.~n"),
  Pid = spawn_link(fun() ->
    {ok, ServerSocket} = gen_tcp:listen(?PORT, [binary, {packet, 0}, 
      {reuseaddr, true}, {active, true}]),
    io:format("Baba~p", [ServerSocket]),
    server_loop(ServerSocket) end),
  {ok, Pid}.

server_loop(ServerSocket) ->
  io:format("Oba~p", [ServerSocket]),
  {ok, Socket} = gen_tcp:accept(ServerSocket),
  Pid1 = spawn(fun() -> client() end),
  inet:setopts(Socket, [{packet, 0}, binary, 
    {nodelay, true}, {active, true}]),
  gen_tcp:controlling_process(Socket, Pid1), 
  server_loop(ServerSocket).

%%%===================================================================
%%% Internal functions
%%%===================================================================

client() ->
  io:format("Starting client. Enter \'quit\' to exit.~n"),
  Client = self(),
  {ok, Sock} = gen_tcp:connect("localhost", ?PORT, [{active, false}, {packet, 2}]),
  display_prompt(Client),
  client_loop(Client, Sock).

%%%===================================================================
%%% Client callbacks
%%%===================================================================

send(Sock, Packet) ->
  {ok, Sock, Packet} = gen_tcp:send(Sock, Packet),
  io:format("Sent ~n").

recv(Packet) ->
  {recv, ok, Packet} = gen_tcp:recv(Packet),
  io:format("Received ~n").

display_prompt(Client) ->
  spawn(fun () ->
    Packet = io:get_line("> "),
    Client ! {entered, Packet}
        end),
  Client ! {requested},
  ok.


client_loop(Client, Sock) ->
  receive
    {entered, "quit\n"} ->
      gen_tcp:close(Sock);
    {entered, Packet}        ->
      % When a packet is entered we receive it,
      recv(Packet),
      display_prompt(Client),
      client_loop(Client, Sock);
    {requested, Packet}        ->
      % When a packet is requested we send it,
      send(Sock, Packet),
      display_prompt(Client),
      client_loop(Client, Sock);
    {error, timeout} ->
      io:format("Send timeout, closing!~n", []),
      Client ! {self(),{error_sending, timeout}},
      gen_tcp:close(Sock);
    {error, OtherSendError} ->
      io:format("Some other error on socket (~p), closing", [OtherSendError]),
      Client ! {self(),{error_sending, OtherSendError}},
      gen_tcp:close(Sock)
  end.

这是我正在做的第一台服务器,我可能在中间迷路了。当我跑步时,它似乎正在工作,但挂起。有人能帮我吗?我的本地主机永远不会加载它永远加载的任何东西。

我的 java 应用程序如何从同一个端口接收它?

我必须使用 Erlang 并且我必须使用端口连接到 java 应用程序。

谢谢你帮助我!

4

1 回答 1

3

让我们稍微修改一下...

首先:命名

我们在 Erlang 中不使用 camelCase。这是令人困惑的,因为大写的变量名和小写(或单引号)原子意味着不同的东西。此外,模块名称必须与文件名相同,这会导致不区分大小写的文件系统出现问题。

另外,我们真的想要一个比“服务器”更好的名字。在这样的系统中,服务器可能意味着很多东西,虽然整个系统可能是一个用 Erlang 编写的服务,但这并不一定意味着我们可以调用“服务器”内的所有东西而不会变得非常模糊!这很令人困惑。我现在将您的项目命名为“ES”。所以你会有es_app等等es_sup。当我们想要开始定义新模块时,这将派上用场,也许其中一些称为“服务器”,而不必到处写“server_server”。

二:输入数据

一般来说,我们希望将参数传递给函数,而不是将文字(或更糟糕的是,宏重写)隐藏在代码中。如果我们要拥有幻数和常量,让我们尽最大努力将它们放入配置文件中,以便我们可以以编程方式访问它们,或者甚至更好,让我们在初始启动调用中将它们用作从属进程的参数,这样我们只能通过在主应用程序模块中搞乱启动调用函数来修改系统的行为(一旦编写)。

-module(es).
-behaviour(application).

-export([listen/1, ignore/0]).
-export([start/0, start/1]).
-export([start/2, stop/1]).

listen(PortNum) ->
    es_client_man:listen(PortNum).

ignore() ->
    es_client_man:ignore().

start() ->
    ok = application:ensure_started(sasl),
    ok = application:start(?MODULE),
    io:format("Starting es...").


start(Port) ->
    ok = start(),
    ok = es_client_man:listen(Port),
    io:format("Startup complete, listening on ~w~n", [Port]).

start(normal, _Args) ->
    es_sup:start_link().

stop(_State) ->
    ok.

我在上面添加了 start/1 函数以及 start/0、listen/1 和 ignore/0,稍后您将在 es_client_man 中再次看到它们。这些主要是您可以更明确地调用的东西的便利包装,但可能不想一直输入。

这个应用程序模块通过让应用程序主机为我们启动项目(通过调用application:start/1)开始,然后下一行调用 erl_server_server 告诉它开始监听。在早期开发中,我发现这种方法比将自动启动功能埋藏在所有组件中要有用得多,后来它为我们提供了一种非常简单的方法来编写可以打开和关闭各种组件的外部接口。

啊,还有……我们将把它作为一个真正的 Erlang 应用程序启动,所以我们需要一个 app 文件ebin/(或者如果你使用 erlang.mk 或类似的 app.src 文件src/) :

ebin/es.app 看起来像这样:

{application,es,
             [{description,"An Erlang Server example project"},
              {vsn,"0.1.0"},
              {applications,[stdlib,kernel,sasl]},
              {modules,[es,
                        es_sup,
                          es_clients,
                            es_client_sup,
                              es_client,
                            es_client_man]},
              {mod,{es,[]}}]}.

下面的列表modules实际上反映了监督树的布局,如下所示。

上面的 start/2 函数现在断言我们只会以normalmode 启动(这可能合适也可能不合适),并且忽略启动参数(这也可能合适也可能不合适)。

第三:监督

-module(es_sup).
-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    RestartStrategy = {one_for_one, 1, 60},
    Clients   = {es_clients,
                 {es_clients, start_link, []},
                 permanent,
                 5000,
                 supervisor,
                 [es_clients]},
    Children  = [Clients],
    {ok, {RestartStrategy, Children}}.

进而...

-module(es_clients).
-behavior(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, none).

init(none) ->
    RestartStrategy = {rest_for_one, 1, 60},
    ClientSup = {es_client_sup,
                 {es_client_sup, start_link, []},
                 permanent,
                 5000,
                 supervisor,
                 [es_client_sup]},
    ClientMan = {es_client_man,
                 {es_client_man, start_link, []},
                 permanent,
                 5000,
                 worker,
                 [es_client_man]},
    Children  = [ClientSup, ClientMan],
    {ok, {RestartStrategy, Children}}.

哇!这里发生了什么?!?好吧, es_sup 是一个supervisor,而不是一个一次性的东西,它只会产生其他一次性的东西。(误解主管是你核心问题的一部分。)

主管很无聊。主管应该很无聊。作为代码阅读者,他们真正做的就是监督树的结构在里面。就 OTP 结构而言,它们为我们所做的非常重要,但它们不需要我们编写任何程序代码,只需声明它应该具有哪些子级。我们在这里实现的称为服务-> 工作人员结构。因此,我们为您的整个应用程序设置了顶级主管,称为es_sup. 在此之下,我们(目前)有一个名为es_clients.

es_clients 进程也是一个主管。这样做的原因是为客户端连接部分定义一个明确的方法,以不影响以后可能存在于系统其余部分中的任何正在进行的状态。仅仅接受来自客户端的连接是没有用的——肯定有一些状态在其他地方很重要,比如与某个 Java 节点的长期连接或其他什么。那将是一个单独的服务组件,可能会被调用es_java_nodes,并且程序的这一部分将从它自己的、单独的主管开始。这就是为什么它被称为“监督树”而不是“监督列表”的原因。

所以回到客户...我们将有客户连接。这就是为什么我们称它们为“客户端”,因为从这个 Erlang 系统的角度来看,连接的东西是客户端,接受这些连接的进程抽象了客户端,因此我们可以将每个连接处理程序视为客户端本身——因为那是正是它所代表的。如果我们稍后连接到上游服务,我们会想调用它们抽象的任何东西,以便我们在系统中的语义是理智的。

然后,您可以考虑“es_client 向 es_java_node 发送消息以查询 [thingy]”,而不是试图保持直截了当,例如“server_server 要求 java_server_client 到 server_server service_server”(这实际上是多么愚蠢的事情,如果从内部系统的角度来看,您没有保持命名原则直截了当)。

等等等等等等...

所以,这里是 es_client_sup:

-module(es_client_sup).
-behaviour(supervisor).

-export([start_acceptor/1]).
-export([start_link/0]).
-export([init/1]).

start_acceptor(ListenSocket) ->
    supervisor:start_child(?MODULE, [ListenSocket]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, none).

init(none) ->
    RestartStrategy = {simple_one_for_one, 1, 60},
    Client    = {es_client,
                 {es_client, start_link, []},
                 temporary,
                 brutal_kill,
                 worker,
                 [es_client]},
    {ok, {RestartStrategy, [Client]}}.

你在挑选模式吗?当我说“主管应该很无聊......”时,我不是在开玩笑 :-) 请注意,这里我们实际上是在传递一个参数,并且我们已经定义了一个接口函数。所以如果我们需要一个套接字接受器来启动,我们就有一个合乎逻辑的地方可以调用。

第四:客户服务本身

我们来看看客户经理:

-module(es_client_man).
-behavior(gen_server).

-export([listen/1, ignore/0]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

-record(s, {port_num = none :: none | inet:port_number(),
            listener = none :: none | gen_tcp:socket()}).

listen(PortNum) ->
    gen_server:call(?MODULE, {listen, PortNum}).

ignore() ->
    gen_server:cast(?MODULE, ignore).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).

init(none) ->
    ok = io:format("Starting.~n"),
    State = #s{},
    {ok, State}.

handle_call({listen, PortNum}, _, State) ->
    {Response, NewState} = do_listen(PortNum, State),
    {reply, Response, NewState};
handle_call(Unexpected, From, State) ->
    ok = io:format("~p Unexpected call from ~tp: ~tp~n", [self(), From, Unexpected]),
    {noreply, State}.

handle_cast(ignore, State) ->
    NewState = do_ignore(State),
    {noreply, NewState};
handle_cast(Unexpected, State) ->
    ok = io:format("~p Unexpected cast: ~tp~n", [self(), Unexpected]),
    {noreply, State}.

handle_info(Unexpected, State) ->
    ok = io:format("~p Unexpected info: ~tp~n", [self(), Unexpected]),
    {noreply, State}.

code_change(_, State, _) ->
    {ok, State}.

terminate(_, _) ->
    ok.

do_listen(PortNum, State = #s{port_num = none}) ->
    SocketOptions =
        [{active,    once},
         {mode,      binary},
         {keepalive, true},
         {reuseaddr, true}],
    {ok, Listener} = gen_tcp:listen(PortNum, SocketOptions),
    {ok, _} = es_client:start(Listener),
    {ok, State#s{port_num = PortNum, listener = Listener}};
do_listen(_, State = #s{port_num = PortNum}) ->
    ok = io:format("~p Already listening on ~p~n", [self(), PortNum]),
    {{error, {listening, PortNum}}, State}.

do_ignore(State = #s{listener = none}) ->
    State;
do_ignore(State = #s{listener = Listener}) ->
    ok = gen_tcp:close(Listener),
    State#s{listener = none}.

嗯,这是怎么回事?这里的基本思想是,我们有一个服务监督者来管理整个客户端概念(es_clients,如上所述),然后我们有 simple_one_for_one 来处理刚刚发生的任何客户端(es_client_sup),这里我们有子系统的管理接口。这个管理器所做的只是跟踪我们正在侦听的端口,并拥有我们正在侦听的套接字(如果此时有一个打开)。请注意,这可以很容易地重写以允许同时侦听任意数量的端口,或跟踪所有活动的客户端,或其他任何东西。您可能想做的事情确实没有限制。

那么我们如何启动可以接受连接的客户端呢?通过告诉他们生成并监听我们作为参数传入的监听套接字。再去看看es_client_sup上面。我们传入一个空列表作为它的第一个参数。当我们调用它的 start_link 函数时会发生什么,我们作为列表传入的任何其他内容都将被添加到整个参数列表中。在这种情况下,我们将传入监听套接字,因此它将以参数 开始[ListenSocket]

每当客户端侦听器接受连接时,其下一步将是生成其继任者,并将原始ListenSocket参数交给它。啊,生命的奇迹。

-module(es_client).

-export([start/1]).
-export([start_link/1, init/2]).
-export([system_continue/3, system_terminate/4,
         system_get_state/1, system_replace_state/2]).

-record(s, {socket = none :: none | gen_tcp:socket()}).

start(ListenSocket) ->
    es_client_sup:start_acceptor(ListenSocket).

start_link(ListenSocket) ->
    proc_lib:start_link(?MODULE, init, [self(), ListenSocket]).

init(Parent, ListenSocket) ->
    ok = io:format("~p Listening.~n", [self()]),
    Debug = sys:debug_options([]),
    ok = proc_lib:init_ack(Parent, {ok, self()}),
    listen(Parent, Debug, ListenSocket).

listen(Parent, Debug, ListenSocket) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} ->
            {ok, _} = start(ListenSocket),
            {ok, Peer} = inet:peername(Socket),
            ok = io:format("~p Connection accepted from: ~p~n", [self(), Peer]),
            State = #s{socket = Socket},
            loop(Parent, Debug, State);
        {error, closed} ->
            ok = io:format("~p Retiring: Listen socket closed.~n", [self()]),
            exit(normal)
     end.

loop(Parent, Debug, State = #s{socket = Socket}) ->
    ok = inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"bye\r\n">>} ->
            ok = io:format("~p Client saying goodbye. Bye!~n", [self()]),
            ok = gen_tcp:send(Socket, "Bye!\r\n"),
            ok = gen_tcp:shutdown(Socket, read_write),
            exit(normal);
        {tcp, Socket, Message} ->
            ok = io:format("~p received: ~tp~n", [self(), Message]),
            ok = gen_tcp:send(Socket, ["You sent: ", Message]),
            loop(Parent, Debug, State);
        {tcp_closed, Socket} ->
            ok = io:format("~p Socket closed, retiring.~n", [self()]),
            exit(normal);
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, Debug, State);
        Unexpected ->
            ok = io:format("~p Unexpected message: ~tp", [self(), Unexpected]),
            loop(Parent, Debug, State)
    end.

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, _State) ->
    exit(Reason).

system_get_state(Misc) -> {ok, Misc}.

system_replace_state(StateFun, Misc) ->
    {ok, StateFun(Misc), Misc}.

请注意,上面我们编写了一个纯 Erlang 进程,它以 gen_server 的方式与 OTP 集成,但有一个更直接的循环,只处理套接字。这意味着我们没有 gen_server 调用/转换机制(可能需要自己实现,但通常仅异步是套接字处理的更好方法)。这是通过 proc_lib 模块启动的,该模块专门设计用于引导任意类型的 OTP 兼容进程。

如果您要使用主管,那么您真的很想一路走好并正确使用 OTP

所以我们现在上面有一个非常基本的 Telnet echo 服务。与其在服务器模块中编写一个神奇的客户端进程来将你的大脑打结(Erlangers 不喜欢他们的大脑打结),你可以启动它,让它监听某个端口,然后自己 telnet 到它并查看结果。

我添加了一些脚本来自动启动东西,但基本上取决于codemake模块。您的项目布局如下

es/
   Emakefile
   ebin/es.app
   src/*.erl

Emakefile 的内容会让我们更轻松。在这种情况下,它只是一行:

enter code here{"src/*", [debug_info, {i, "include/"}, {outdir, "ebin/"}]}。

在主es/目录中,如果我们输入一个 erl shell,我们现在可以...

1> code:add_patha("ebin").
true
2> make:all().
up_to_date
3> es:start().

你会看到一堆 SASL 开始报告在屏幕上滚动。

从那里开始es:listen(5555)

4> es:listen(5555).
<0.94.0> Listening.
ok

凉爽的!所以看起来事情正在发挥作用。让我们尝试 telnet 到自己:

ceverett@changa:~/vcs/es$ telnet localhost 5555
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello es thingy
You sent: Hello es thingy
Yay! It works!
You sent: Yay! It works!
bye
Bye!
Connection closed by foreign host.

另一边是什么样子的?

<0.96.0> Listening.
<0.94.0> Connection accepted from: {{127,0,0,1},60775}
<0.94.0> received: <<"Hello es thingy\r\n">>
<0.94.0> received: <<"Yay! It works!\r\n">>
<0.94.0> Client saying goodbye. Bye!

啊,这里我们看到了“聆听”。来自第一个侦听器的下一个侦听器的消息。<0.96.0><0.94.0>

并发连接怎么样?

<0.97.0> Listening.
<0.96.0> Connection accepted from: {{127,0,0,1},60779}
<0.98.0> Listening.
<0.97.0> Connection accepted from: {{127,0,0,1},60780}
<0.97.0> received: <<"Screen 1\r\n">>
<0.96.0> received: <<"Screen 2\r\n">>
<0.97.0> received: <<"I wonder if I could make screen 1 talk to screen 2...\r\n">>
<0.96.0> received: <<"Time to go\r\n">>
<0.96.0> Client saying goodbye. Bye!
<0.97.0> Client saying goodbye. Bye!

哦,好样的。并发服务器!

从这里您可以使用工具并进行此基本结构更改以执行您可能想象的几乎任何事情。

请注意,此代码缺少很多内容。我已经去掉了edoc符号和typespecs(由 Dialyzer 使用这是一个大型项目中非常重要的工具)——这对于生产系统来说是一件坏事。

有关一个小到足以让您绕开的生产风格项目的示例(只有 3 个模块 + 完整文档),请参阅zuuid。它是专门为用作代码示例而编写的,尽管它恰好是一个功能齐全的 UUID 生成器。

原谅这个答案对您(更短)的问题的巨大影响。这不时出现,我想写一个完整的网络套接字服务示例,我可以在未来将人们推荐给它,当我读到你的问题时,碰巧得到了这样做的渴望。:-) 希望 SO 纳粹分子能够原谅这种严重的违规行为。

于 2017-10-26T02:44:28.840 回答