我正在尝试创建一个能够从 hono 接收命令并回复它的原型设备。
我已经安装了 hono 1.10.0 并运行以下 python 代码
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
from hono import tenantId, deviceId, devicePassword, device_uri, biz_app_uri
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for the command reply--------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, "consumer@HONO", "verysecret"))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
# Give it some time to link
time.sleep(5)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', f'{deviceId}@{tenantId}', devicePassword))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
# Give it some time to link
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], "consumer@HONO", "verysecret", address=f'command/{tenantId}')).run()
time.sleep(2)
print("Device sending a command response-----------------------------------------------------------------")
resp = Message(
address=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
properties={
'status': 200,
'device_id': deviceId,
'tenant_id': tenantId
},
subject="call",
body="Hello Alice!"
)
Container(AmqpSender(device_uri, [resp], f'{deviceId}@{tenantId}', devicePassword)).run()
time.sleep(2)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
print("Business application stops listening for command responsets---------------------------------------")
cr_container.stop()
cr_thread.join(timeout=5)
print("everything stopped")
我在发送 AMQP 1.0 消息的难度的帮助下并根据我对https://www.eclipse.org/hono/docs/api/command-and-control/和https://www.eclipse .org/hono/docs/user-guide/amqp-adapter/#sending-a-response-to-a-command。
目前我似乎并没有那么错,因为设备收到了命令,并且发送消息也没有显示任何错误。然而,在接收端什么都没有到达。澄清一下,AmqpReceiver 实现适用于我侦听遥测数据的场景。因此,如果实现应该是相同的(除了不同的地址),那么这应该不是问题。
我非常相信我对消息中的地址/reply_to 做错了,但我无法确认,因为 hono pod 中的日志没有告诉我任何事情:(
br 阿明
======更新================================
我当前运行的代码如下
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import AtLeastOnce
class Amqp(MessagingHandler):
def __init__(self, server, address, user, password, options=None):
super(Amqp, self).__init__()
self.server = server
self.address = address
self.user = user
self.password = password
self.options = options
self.connection = None
def create_connection(self, event):
self.connection = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=self.user,
password=self.password
)
print("Connection established")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
def on_link_opened(self, event):
if event.link.is_sender:
print("Opened sender link")
if event.link.is_receiver:
print("Opened receiver link for source address '{0}'".format(event.receiver.source.address))
class AmqpReceiver(Amqp):
def __init__(self, server, address, user, password, options=None):
super(AmqpReceiver, self).__init__(server, address, user, password, options)
self.server = server
self.user = user
self.password = password
def on_start(self, event):
self.create_connection(event)
event.container.create_receiver(context=self.connection, source=self.address, options=self.options)
print("Receiver created")
def on_message(self, event):
print(f'Receiver [{self.address}] got message:')
print(f' {event.message.reply_to}')
print(f' {event.message.correlation_id}')
print(f' {event.message.properties}')
print(f' {event.message.subject}')
print(f' {event.message.body}')
#just for test purposes - the device sends imediatelly the reply if a reply_to is given
if event.message.reply_to is not None:
reply_to = event.message.reply_to.split('/')
tenant_id = reply_to[1]
device_id = reply_to[2]
resp = Message(
address=event.message.reply_to,
correlation_id=event.message.correlation_id,
content_type="text/plain",
properties={
'status': 200,
'tenant_id': tenant_id,
'device_id': device_id
},
body=f'Reply on {event.message.body}'
)
sender = event.container.create_sender(self.connection, None, options=AtLeastOnce())
sender.send(resp)
sender.close()
print("Reply send")
class AmqpSender(Amqp):
def __init__(self, server, messages, user, password, address=None, options=None):
super(AmqpSender, self).__init__(server, address, user, password, options)
self.messages = messages
def on_start(self, event):
self.create_connection(event)
event.container.create_sender(context=self.connection, target=self.address)
print("Sender created")
def on_sendable(self, event):
print("In Msg send")
for msg in self.messages:
event.sender.send(msg)
event.sender.close()
event.connection.close()
print("Sender & connection closed")
在测试脚本中我使用如下
from __future__ import print_function, unicode_literals
import threading
import time
from proton import Message
from proton.reactor import Container
from amqp import AmqpSender, AmqpReceiver
biz_app_uri = f'amqp://localhost:15672'
device_uri = f'amqp://localhost:5672'
tenantId = 'ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8'
deviceId = 'b932fb15-fdbd-4c12-9ed7-40aaa8763412'
biz_app_user = 'consumer@HONO'
biz_app_pw = 'verysecret'
device_user = f'{deviceId}@{tenantId}'
device_pw = 'my-secret-password'
correlation_id = 'myCorrelationId'
command_reply_to = f'command_response/{tenantId}/{correlation_id}'
print("Business application subscribing for command replies-------------------------------------------")
cr_container = Container(AmqpReceiver(biz_app_uri, command_reply_to, biz_app_user, biz_app_pw))
cr_thread = threading.Thread(target=lambda: cr_container.run(), daemon=True)
cr_thread.start()
time.sleep(2)
print("Device subscribing for commands-------------------------------------------------------------------")
c_container = Container(AmqpReceiver(device_uri, f'command', device_user, device_pw))
c_thread = threading.Thread(target=lambda: c_container.run(), daemon=True)
c_thread.start()
time.sleep(2)
print("Business application sending a command------------------------------------------------------------")
msg = Message(
address=f'command/{tenantId}/{deviceId}',
reply_to=command_reply_to,
correlation_id=correlation_id,
content_type="text/plain",
subject="call",
body="Hello Bob!"
)
#as in example https://stackoverflow.com/questions/64698271/difficulty-in-sending-amqp-1-0-message
Container(AmqpSender(biz_app_uri, [msg], biz_app_user, biz_app_pw, address=f'command/{tenantId}')).run()
time.sleep(10)
print("Device stops listeing for commands----------------------------------------------------------------")
c_container.stop()
c_thread.join(timeout=5)
#print("Business application stops listening ---------------------------------------")
#cr_container.stop()
#cr_thread.join(timeout=5)
#print("everything stopped")
如果我运行该代码示例,我会得到以下日志(见下文)并且代码被卡住,因为命令回复接收器保持打开状态。
登录hono调度路由器:
2021-11-14 19:08:29.420176 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:29.429734 +0000 SERVER (info) [C115] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36742
2021-11-14 19:08:29.447479 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:29.448213 +0000 ROUTER (info) [C115] Connection Opened: dir=in host=10.42.0.70:36742 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=a782f51c-9679-41fb-a682-8ea603ccf1ac props=
2021-11-14 19:08:29.448316 +0000 ROUTER_CORE (info) [C115][L123] Link attached: dir=out source={command_response/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8/myCorrelationId expire:sess} target={<none> expire:sess}
2021-11-14 19:08:33.423325 +0000 SERVER (info) enabling remote authentication service hono-1635540280-service-auth:5671
2021-11-14 19:08:33.430810 +0000 SERVER (info) [C116] Accepted connection to 0.0.0.0:5672 from 10.42.0.70:36868
2021-11-14 19:08:33.445574 +0000 AUTHSERVICE (info) authenticated as consumer@HONO
2021-11-14 19:08:33.446328 +0000 ROUTER (info) [C116] Connection Opened: dir=in host=10.42.0.70:36868 vhost= encrypted=no auth=PLAIN user=consumer@HONO container_id=92cb7173-2940-4330-a995-f26eccef0905 props=
2021-11-14 19:08:33.446388 +0000 ROUTER_CORE (info) [C116][L124] Link attached: dir=in source={<none> expire:sess} target={command/ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8 expire:sess}
2021-11-14 19:08:33.447762 +0000 ROUTER_CORE (info) [C116][L124] Link detached: del=1 presett=0 psdrop=0 acc=0 rej=0 rel=0 mod=0 delay1=0 delay10=0 blocked=no
登录 amqp 适配器
2021-11-14 19:08:31,511 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Connected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null
2021-11-14 19:19:29,875 INFO [org.ecl.hon.ada.mon.LoggingConnectionEventProducer] (vert.x-eventloop-thread-1) Disconnected - ID: 100b1859-e8a0-4bff-ad91-a48dce4babb5, Protocol Adapter: hono-amqp, Device: device [device-id: b932fb15-fdbd-4c12-9ed7-40aaa8763412, tenant-id: ea8b6601-6fb7-4fb5-a097-2d9a3cdea0d8], Data: null