0

如果客户端配置创建临时队列接收消息,是否可以配置只允许创建一个队列?如果有缺陷的客户端在服务器上创建了太多队列,对服务器来说会是一个大问题吗?

如何配置以防止客户端浪费资源?

-module(amqp_example).

-include("amqp_client.hrl").

-compile([export_all]).

test() ->
    %% Start a network connection
    {ok, Connection} = amqp_connection:start(#amqp_params_network{}),
    %% Open a channel on the connection
    {ok, Channel} = amqp_connection:open_channel(Connection),

    %% Declare a queue
    #'queue.declare_ok'{queue = Q}
        = amqp_channel:call(Channel, #'queue.declare'{}),
    ok = create_temp_queue(Channel,10),

    %% Publish a message
    Payload = <<"foobar">>,
    Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
    amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),

    %% Get the message back from the queue
    Get = #'basic.get'{queue = Q},
    {#'basic.get_ok'{delivery_tag = Tag}, _Content}
        = amqp_channel:call(Channel, Get),

    %% Do something with the message payload
    %% (some work here)

    %% Ack the message
    amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),

    %% Close the channel
    amqp_channel:close(Channel),
    %% Close the connection
    amqp_connection:close(Connection),

    ok.

create_temp_queue(Channel,Loop)->
    [     %% Declare a queue
          #'queue.declare_ok'{queue = _Q}
          = amqp_channel:call(Channel, #'queue.declare'{})
          ||
        _X <- lists:seq(1,Loop)].


(emacs@yus-iMac.local)58> amqp_example:test().
** exception error: no match of right hand side value 
                    [{'queue.declare_ok',
                         <<"amq.gen-AqAaMLydgMf43y_XoYSdq5">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-A75g--nsvheNbwYMr34M-E">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-wmkOrALHBIj6Ot6ZuZZOQJ">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-wX2NmwMHBeDaKLvoZgJhEh">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-gcvScDp-RFMVwxWpyWjI-9">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-Q4CS7jNu3cde0RNdVdO3PJ">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-QKNrG8IJPVvfAlLukq38x_">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-wqJ2V1HQDaJjOzRDhv8gT4">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-AYeZiuNYsFOUMVw6xKcZh4">>,0,0},
                     {'queue.declare_ok',
                         <<"amq.gen-AJDqT2h2fq9cZOsVbNESi0">>,0,0}]
     in function  amqp_example:test/0 (src/amqp_example.erl, line 16)


yus-iMac:~ yuchen$ sudo rabbitmqctl list_queues
Password:
Listing queues ...
amq.gen-Q4CS7jNu3cde0RNdVdO3PJ  0
amq.gen-QKNrG8IJPVvfAlLukq38x_  0
amq.gen-AqAaMLydgMf43y_XoYSdq5  0
amq.gen-AJDqT2h2fq9cZOsVbNESi0  0
amq.gen-wqJ2V1HQDaJjOzRDhv8gT4  0
amq.gen-AYeZiuNYsFOUMVw6xKcZh4  0
amq.gen-wzvWzxXo2MJVZsyrwfzM8A  0
amq.gen-A75g--nsvheNbwYMr34M-E  0
amq.gen-gcvScDp-RFMVwxWpyWjI-9  0
amq.gen-wX2NmwMHBeDaKLvoZgJhEh  0
 amq.gen-wmkOrALHBIj6Ot6ZuZZOQJ 0
 ...done.
4

2 回答 2

0

好吧。现在让我们假设你有一个正确的设置Erlang Rabbitmq client在此处使用用户指南。

1. 通用 RABBITMQ 连接器

-模块(rabbit_utils)。
-编译(export_all)。
-定义(兔子服务器,“本地主机”)。
-定义(兔子端口,9001)。

-记录(管道,{连接,通道})。

创建管道()->    
    尝试 amqp_connection:start(#amqp_params_network{host = ?RABBIT_SERVER,
                                                   端口 = RABBIT_PORT}) 的
        {好的,连接}->
            试试 amqp_connection:open_channel(Connection) 的
                {好的,频道} ->
                    #管道{
                        连接=连接,
                        频道 = 频道
                     };
                _ -> amqp_connection:关闭(连接),错误
            抓住
                _:_ -> amqp_connection:close(Connection),错误
            结尾;
        _ -> 错误
    抓住
        _:_ -> 错误
    结尾。

close_pipe(通道,连接)->
    尝试 amqp_channel:close(Channel) of _ -> ok catch _:_ -> ok end,
    尝试 amqp_connection:close(Connection) of _ -> ok catch _:_ -> ok end,
    好的。

随着我们继续,您会注意到,我一直在创建管道和关闭管道。这是因为,在最新版本的 中erlang rabbitmq client,连接和通道变量是 Erlang 进程。每当这个库中的某些事情没有按计划进行时,这些进程就会在运行时终止。出于这个原因,我将大多数代码困在try ... catch ... end代码块中。如果您正在使用gen_servers,则制作它们trap_exit = true,然后将它们链接到 RabbitMQ 客户端进程(连接和通道)。因此,每当连接中断时,Connection进程就会终止,将其捕获在您的 gen_server 中,然后尝试另一个连接,如果失败,也许您有很多RABBITMQ服务器可以fail over连接。概念在这里很深,但让我们继续。


2.交易所经理
这可确保交换存在于RABBITMQ服务器上。交换必须是binary. 我假设我们需要持久的交换和队列。持久交换和队列的一个优点是您只需创建一次,即使 RABBITMQ 服务重新启动,它也会使用这些交换和队列启动。

确保交换存在(交换)->
    案例 create_pipe() 的
        错误->错误;
        #pipe{connection = Connection,channel = Channel} ->             
            RandomQueue = list_to_binary(guid()),            
            #'queue.declare_ok'{} = amqp_channel:call(Channel,#'queue.declare'{queue = RandomQueue}),
            绑定 = #'queue.bind'{queue = RandomQueue,exchange = Exchange,
                                    routing_key = <<"测试">>},
            试试 amqp_channel:call(Channel, Binding) 的
                #'queue.bind_ok'{} ->                         
                        删除 = #'queue.delete'{queue = RandomQueue},
                        #'queue.delete_ok'{} = amqp_channel:call(Channel, Delete),
                        close_pipe(通道,连接),
                        好的;
                _ ->                     
                    close_pipe(通道,连接),
                    create_exchange(交换,服务器)
            抓住
                _R:_T ->                     
                    close_pipe(通道,连接),
                    create_exchange(交换,服务器)
            结尾;
        _ -> 错误
    结尾。

create_exchange(交换)->
    案例 create_pipe() 的
        错误->错误;
        #pipe{connection = Connection,channel = Channel} ->
            Exc = #'exchange.declare'{exchange = Exchange,durable = true},
            尝试 amqp_channel:call(Channel,Exc) 的
                #'exchange.declare_ok'{} -> 好的;
                _ -> close_pipe(通道,连接),错误
            抓住
                _:_ -> close_pipe(通道,连接),错误
            结尾
    结尾。

引导()->
    随机:种子(现在()),
    MD5 = erlang:md5(term_to_binary([random:uniform(7677771995517),{self(),time(),node(), now(), make_ref()}])),
    MD5List = 列表:nthtail(2, binary_to_list(MD5)),
    F = fun(N) -> f("~2.16.0B", [N]) 结束,
    L = [F(N) || N <- MD5List],
    列表:展平(L)。

f(S)-> f(S,[])。
f(S,Args) -> 列表:flatten(io_lib:format(S,Args))。


3. 队列管理器
对于大型系统,我们发现使用一个交换器(或几个交换器)更容易,但系统需要交互的队列数量也多。想象一下,在一个集群中,每个系统都有一个队列,当其他系统需要向该系统询问某些内容时,它们会在其中发送消息。此外,当一个系统收到另一个系统的请求时,它会通过将答案发送到该系统的队列来回复该请求。使用良好的消息格式协议,它可以实现系统识别、区分请求和回复、时间戳、系统 ping、远程过程调用等,您最终会在集群中拥有一个消息路由器。在最近的一个项目中,我们提出了一个远程过程协议,使用JSON作为消息格式。系统可以通过 AMQP 接口调用方法并将参数传递给远程系统。我不能在这里详细介绍,但只知道,这是Genuis我们现在依赖它的原因100%

无论如何,我们首先需要确保队列存在,如果存在,则检查它是否绑定到我们的交换器。我们走吧 !!

ensure_queue(Queue,Exchange)-> 
#pipe{connection = Connection,channel = Channel} = create_pipe(), Get = #'basic.get'{queue = Queue}, 当然 = 尝试 amqp_channel:call(Channel, Get) of #'basic.get_empty'{} -> queue_exists;
{#'basic.get_ok'{}, _Content} -> queue_exists; _ -> queue_missing 抓住 _EE:_EE2 -> queue_missing 结尾, close_pipe(通道,连接), 肯定的 queue_exists -> ensure_bound(Queue,Exchange); queue_missing -> create_queue_and_ensure_bound(队列,交换) 结尾。
ensure_bound(Queue,Exchange)->
#pipe{connection = Connection,channel = Channel} = create_pipe(), B1 = #'queue.bind'{queue = Queue,exchange = Exchange,routing_key = Queue}, 尝试 amqp_channel:call(Channel, B1) of #'queue.bind_ok'{} -> close_pipe(通道,连接), 好的; _OtherErr -> 错误 抓住 _ET:_EY -> 错误 结尾。

create_queue_and_ensure_bound(队列,交换)-> #pipe{connection = Connection,channel = Channel} = create_pipe(), TheQ = #'queue.declare'{queue = Queue,durable = true}, 尝试 amqp_channel:call(Channel,TheQ) 的 #'queue.declare_ok'{} -> 绑定 = #'queue.bind'{queue = Queue,exchange = Exchange,routing_key = Queue}, 试试 amqp_channel:call(Channel, Binding) 的 #'queue.bind_ok'{} -> close_pipe(通道,连接), 错误; _其他2 -> close_pipe(通道,连接), 错误 抓住 _ER11:_ER22 -> close_pipe(通道,连接), 错误 结尾; _其他-> close_pipe(通道,连接), 错误 抓住 _ER1:_ER2 -> close_pipe(Channel2,Connection2), 错误 结尾。


将容错添加到具体的 erlang 应用程序中,利用上面的一些代码,您将拥有一个非常紧凑的集群。RABBITMQ 的可用性非常高,特别是如果您在了解自己在做什么的同时使用它。大多数问题都是由网络引起的。如果您的应用程序失去与 Rabbitmq 服务器的连接,您的应用程序会做什么?冗余在这里出现。您可以拥有RABBITMQ所有集群节点都知道的各种服务器,并且您在所有 RABBITMQ 服务器上安装了所有相同的交换和队列,这样如果一台服务器丢失,其他服务器可以用于继续服务。

于 2013-05-25T15:12:00.077 回答
0

客户端应创建 1 个具有特定名称的队列,并确保将其设置为自动删除。这样,客户端将不会重新创建预先存在的队列。每个客户端都会创建自己的特定队列,因为每个客户端都会相应地命名其队列。

于 2013-05-25T07:54:00.820 回答