我目前正在尝试通过 AMPQ 适配器向 Hono 沙箱发送遥测消息。尽管我接管了在Hono Noth 桥示例中看到的部分代码示例(这也适用于南桥),但我似乎对 SASL 有点挣扎。
这是我的代码
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server, address):
super(AmqpMessageSender, self).__init__()
self.server = server
self.address = address
def on_start(self, event):
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
msg = Message(
address=f'{self.address}/{deviceId}',
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
)
event.sender.send(self.msg)
event.sender.close()
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()
如果我运行代码,我会收到上下文条件的传输错误
'预期的 SASL 协议头:未找到协议头(连接中止)'
我还尝试使用端口 5672 导致链接错误并使用端口 15672(实际上是北桥端口) - 令我惊讶的是,这并没有导致 SASL 错误,而是让我得到了预期的“未授权”错误(如设备不允许通过北桥连接)
======= 更新=======
再次感谢您的时间。
关于a),因为一旦将代码作为问题的答案,这里的评论就相当有限。我用来模拟设备的代码如下
from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'
class AmqpMessageSender(MessagingHandler):
def __init__(self, server):
super(AmqpMessageSender, self).__init__()
self.server = server
def on_start(self, event):
print("In start")
conn = event.container.connect(
self.server,
sasl_enabled=True,
allowed_mechs="PLAIN",
allow_insecure_mechs=True,
user=f'{deviceId}@{tenantId}',
password=devicePassword
)
print("connection established")
event.container.create_sender(context=conn, target=None)
print("sender created")
def on_sendable(self, event):
print("In Msg send")
event.sender.send(Message(
address=f'telemetry',
properties={
'to': 'telemetry',
'content-type': 'application/json'
},
content_type='application/json',
body={"temp": 5, "transport": "amqp"}
))
event.sender.close()
event.connection.close()
print("Sender & connection closed")
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_transport_error(self, event):
print("Transport Error")
Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()
为了模拟服务器,我不使用 java 客户端,而是使用 python 快速入门示例中的示例代码。我还有一个客户端类,它像 python 快速启动示例中那样执行 http 调用,服务器类会做出反应并打印消息 - 所以根据我的理解,下面概述的服务器实现应该没问题:
from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'
class AmqpReceiver(MessagingHandler):
def __init__(self, server, address, name):
super(AmqpReceiver, self).__init__()
self.server = server
self.address = address
self._name = name
def on_start(self, event):
conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
event.container.create_receiver(conn, self.address)
def on_connection_error(self, event):
print("Connection Error")
def on_link_error(self, event):
print("Link Error")
def on_message(self, event):
print(self._name)
print("Got a message:")
print(event.message.body)
class CentralServer:
def listen_telemetry(self, name):
uri = f'amqp://{amqpNetworkIp}:15672'
address = f'telemetry/{tenantId}'
self.container = Container(AmqpReceiver(uri, address, name))
print("Starting (northbound) AMQP Connection...")
self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
self.thread.start()
time.sleep(2)
def stop(self):
# Stop container
print("Stopping (northbound) AMQP Connection...")
self.container.stop()
self.thread.join(timeout=5)
CentralServer().listen_telemetry('cs1')
经过一天的尝试,我找不到我做错了什么,我真的希望你能看到我错过了什么:)
br 阿明