-1

我正在尝试创建一个能够从 hono 接收命令并回复它的原型设备。

我已经安装了 hono 1.10.0 并运行以下 python 代码

import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri


correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'

print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)


print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)


print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)


print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
    address=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    properties={
        'status': 200,
        'device_id': deviceId,
        'tenant_id': tenantId
    },
    subject="call",
    body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)


print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")

在发送 AMQP 1.0 消息的难度的帮助下并根据我对https://www.eclipse.org/hono/docs/api/command-and-control/https://www.eclipse .org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command

目前我似乎并没有那么错,因为设备收到了命令,并且发送消息也没有显示任何错误。然而,在接收端什么都没有到达。澄清一下,AmqpReceiver 实现适用于我侦听遥测数据的场景。因此,如果实现应该是相同的(除了不同的地址),那么这应该不是问题。

我非常相信我对消息中的地址/reply_to 做错了,但我无法确认,因为 hono pod 中的日志没有告诉我任何事情:(

br 阿明

======更新================================

我当前运行的代码如下

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce


class Amqp(MessagingHandler):
    def __init__(self, server, address, user, password, options=None):
        super(Amqp, self).__init__()
        self.server = server
        self.address = address
        self.user = user
        self.password = password
        self.options = options
        self.connection = None

    def create_connection(self, event):
        self.connection = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=self.user,
            password=self.password
        )
        print("Connection established")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

    def on_link_opened(self, event):
        if event.link.is_sender:
            print("Opened sender link")
        if event.link.is_receiver:
            print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))


class AmqpReceiver(Amqp):
    def __init__(self, server, address, user, password, options=None):
        super(AmqpReceiver, self).__init__(server, address, user, password, options)
        self.server = server
        self.user = user
        self.password = password

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
        print("Receiver created")

    def on_message(self, event):
        print(f'Receiver [{self.address}] got message:')
        print(f'  {event.message.reply_to}')
        print(f'  {event.message.correlation_id}')
        print(f'  {event.message.properties}')
        print(f'  {event.message.subject}')
        print(f'  {event.message.body}')
        #just for test purposes - the device sends imediatelly the reply if a reply_to is given
        if event.message.reply_to is not None:
            reply_to = event.message.reply_to.split('/')
            tenant_id = reply_to[1]
            device_id = reply_to[2]
            resp = Message(
                address=event.message.reply_to,
                correlation_id=event.message.correlation_id,
                content_type="text/plain",
                properties={
                    'status': 200,
                    'tenant_id': tenant_id,
                    'device_id': device_id
                },
                body=f'Reply on {event.message.body}'
            )
            sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
            sender.send(resp)
            sender.close()
            print("Reply send")


class AmqpSender(Amqp):
    def __init__(self, server, messages, user, password, address=None, options=None):
        super(AmqpSender, self).__init__(server, address, user, password, options)
        self.messages = messages

    def on_start(self, event):
        self.create_connection(event)
        event.container.create_sender(context=self.connection, target=self.address)
        print("Sender created")

    def on_sendable(self, event):
        print("In Msg send")
        for msg in self.messages:
            event.sender.send(msg)
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

在测试脚本中我使用如下

from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver


biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'

biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'

correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'


print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)

print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)

print("Business application sending a command------------------------------------------------------------")
msg = Message(
    address=f'command/{tenantId}/{deviceId}',
    reply_to=command_reply_to,
    correlation_id=correlation_id,
    content_type="text/plain",
    subject="call",
    body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()

time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")

如果我运行该代码示例,我会得到以下日志(见下文)并且代码被卡住,因为命令回复接收器保持打开状态。

登录hono调度路由器:

2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no

登录 amqp 适配器

2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
4

1 回答 1

0

您的设备发送的命令响应似乎包含错误的地址。正如AMQP Adapter User Guide中所指出的,响应的address属性必须设置为reply-to命令的属性值。该值通常与reply-to您的应用程序在命令消息中设置的值不同,因为协议适配器需要将一些附加信息编码到对地址的回复中,以便在向下游转发命令响应时能够确定正确的设备 ID .

因此,在您的代码中,您需要检查设备端的命令消息并将其reply-to值用作命令响应的address值。

除此之外,AMQP 适配器期望命令响应中的状态属性是 AMQP 1.0 类型int(一个 32 位有符号整数)。但是,使用您的代码,属性值默认编码为 AMQP 1.0 long(64 位有符号整数)。为了正确编码,您需要从导入int32proton._data,然后将属性值设置为int32(200). 然后适配器接受命令响应并将其转发到下游。

于 2021-11-03T07:50:18.083 回答