0
| WARN  | Transport Connection to: tcp://ip:port failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long | org.apache.activemq.broker.TransportConnection.Transport | AmqpInactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@7e641927[State = -1, empty queue]

我正在尝试从 java[publisher] 向活动 mq 发送消息,该消息正在活动 mq 中排队。订阅者代码是用 python[使用这个库 python-qpid-proton 0.31.0] 编写的。当没有请求发送到活动 mq 时,订阅者保持活动状态。但是,当请求发送到活动 mq 时,“活动持久主题订阅者”中的客户端在处理请求的一段时间后进入“离线持久主题订阅者”。当在 pycharm 中运行相同的代码时可以正常工作,但在 exe 上可以看到这个问题 python 接收器代码如下所示:-

P1 = Receiver(url, subscriptionname)
        from proton.reactor import Container
        Container(P1).run()


class Receiver(MessagingHandler):
super(Receiver, self).__init__()
    def __init__(self, url, subscriptionname):
        self.url = Url(url)
        self.stopping = False
        self.messages_actually_received = 0
        self.subscriptionName = subscriptionname

    def on_start(self, event):
        durable = DurableSubscription()
        event.container.container_id = "client"
        connection = event.container.connect(self.url)
        event.container.create_receiver(connection , self.url.path, name=self.subscriptionName, options=durable)
        

     def on_message(self, event):
        if self.stopping:
            return
        
        self.messages_actually_received += 1
        if event.message.body == 'message':
               pass
4

1 回答 1

0

在上述情况和您刚刚在 ActiveMQ 邮件列表上发布的消息之间,您说您的“巨大请求处理”实际上需要 >30 秒,这听起来很像您正在利用客户端容器线程在 on_message 中完成 > 30 秒的工作,你真的不应该这样做。它是一个单线程客户端,因此这样做意味着在您为其他事情独占线程期间,它根本无法处理连接。这意味着如果需要满足代理(默认 30 秒)不活动超时,它不能发送心跳帧,同时没有其他消息传递流量被视为活动。由于客户端在 30 秒内未向代理发送任何数据,因此连接将被视为已死并被终止,因为日志消息清楚地显示发生了。

您通常应该避免长时间阻塞容器线程,例如禁用接收器上的自动接受,并在需要时将长时间运行的消息处理卸载到工作线程,然后再传递回容器线程进行接受。或者,您将需要为代理配置更高的连接超时,这样它就不会杀死您明显的死连接。

请注意,如果您卸载消息处理,将任务传递回容器线程以例如接受/其他消息需要安全完成,因为连接再次是单线程的,并且只能直接从容器线程使用,直接从另一个使用它,而容器线程也可能在使用它是不安全的。

我相信 EventInjector 是这样的事情的预期路线,激发了与容器线程之外的连接工作:http: //qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/docs/ proton.reactor.html#proton.reactor.EventInjector

它在这些示例中使用:http: //qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/examples/db_recv.py.html http://qpid.apache.org/releases/qpid -proton-0.36.0/proton/python/examples/db_send.py.html http://qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/examples/tx_recv_interactive.py.html

您还可以在容器上触发您自己的计划回调,因此更简单的替代方案可能只是计划(同时仍然使用容器线程,而不是您自己的)您自己的任务以定期运行,例如获取已完成的“已处理”消息从线程安全的工作队列中接受/其他。定时器示例:http: //qpid.apache.org/releases/qpid-proton-0.36.0/proton/python/examples/recurring_timer.py.html

于 2022-02-14T10:05:06.980 回答