-1

我目前正在尝试通过 AMPQ 适配器向 Hono 沙箱发送遥测消息。尽管我接管了在Hono Noth 桥示例中看到的部分代码示例(这也适用于南桥),但我似乎对 SASL 有点挣扎。

这是我的代码

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

tenantId = 'xxxx'
deviceId = 'yyyyy'
devicePassword = 'my-secret-password'


class AmqpMessageSender(MessagingHandler):
    def __init__(self, server, address):
        super(AmqpMessageSender, self).__init__()
        self.server = server
        self.address = address

    def on_start(self, event):
        conn = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )    
        event.container.create_sender(conn, self.address)

    def on_sendable(self, event):   
        msg = Message(
            address=f'{self.address}/{deviceId}',
            content_type='application/json',
            body={"temp": 5, "transport": "amqp"}
        )
        event.sender.send(self.msg)
        event.sender.close()

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")


Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5671', f'telemetry/{tenantId}')).run()

如果我运行代码,我会收到上下文条件的传输错误

'预期的 SASL 协议头:未找到协议头(连接中止)'

我还尝试使用端口 5672 导致链接错误并使用端口 15672(实际上是北桥端口) - 令我惊讶的是,这并没有导致 SASL 错误,而是让我得到了预期的“未授权”错误(如设备不允许通过北桥连接)

======= 更新=======

再次感谢您的时间。

关于a),因为一旦将代码作为问题的答案,这里的评论就相当有限。我用来模拟设备的代码如下

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

tenantId = 'xxx'
deviceId = 'yyy'
devicePassword = 'my-secret-password'


class AmqpMessageSender(MessagingHandler):
    def __init__(self, server):
        super(AmqpMessageSender, self).__init__()
        self.server = server

    def on_start(self, event):
        print("In start")
        conn = event.container.connect(
            self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )
        print("connection established")
        event.container.create_sender(context=conn, target=None)
        print("sender created")

    def on_sendable(self, event):
        print("In Msg send")
        event.sender.send(Message(
            address=f'telemetry',
            properties={
                'to': 'telemetry',
                'content-type': 'application/json'
            },
            content_type='application/json',
            body={"temp": 5, "transport": "amqp"}
        )) 
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")

Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()

为了模拟服务器,我不使用 java 客户端,而是使用 python 快速入门示例中的示例代码。我还有一个客户端类,它像 python 快速启动示例中那样执行 http 调用,服务器类会做出反应并打印消息 - 所以根据我的理解,下面概述的服务器实现应该没问题:

from __future__ import print_function, unicode_literals
import threading
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container

amqpNetworkIp = "hono.eclipseprojects.io"
tenantId = 'xxx'


class AmqpReceiver(MessagingHandler):
    def __init__(self, server, address, name):
        super(AmqpReceiver, self).__init__()
        self.server = server
        self.address = address
        self._name = name

    def on_start(self, event):
        conn = event.container.connect(self.server, user="consumer@HONO", password="verysecret")
        event.container.create_receiver(conn, self.address)

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_message(self, event):
        print(self._name)
        print("Got a message:")
        print(event.message.body)


class CentralServer:
    def listen_telemetry(self, name):
        uri = f'amqp://{amqpNetworkIp}:15672'
        address = f'telemetry/{tenantId}'
        self.container = Container(AmqpReceiver(uri, address, name))

        print("Starting (northbound) AMQP Connection...")
        self.thread = threading.Thread(target=lambda: self.container.run(), daemon=True)
        self.thread.start()
        time.sleep(2)

    def stop(self):
        # Stop container
        print("Stopping (northbound) AMQP Connection...")
        self.container.stop()
        self.thread.join(timeout=5)


CentralServer().listen_telemetry('cs1')

经过一天的尝试,我找不到我做错了什么,我真的希望你能看到我错过了什么:)

br 阿明

4

1 回答 1

0

AMQP 协议适配器要求设备通过匿名终端发送消息

在您的代码中,这意味着on_start需要将方法更改为包含event.container.create_sender(context=conn, target=None).

无论如何,AMQP 适配器的非 TLS 端口是 5672,所以你应该使用amqp://hono.eclipseprojects.io:5672它作为服务器地址。构造函数 ( ) 的第二个参数telemetry无关紧要,可以删除。

还要确保你有一个为你的租户运行的消费者。否则,发件人将不会因实际发送消息而获得任何信用......

2021 年 10 月 21 日已编辑

这段代码对我有用......

class AmqpMessageSender(MessagingHandler):
    def __init__(self, server):
        super(AmqpMessageSender, self).__init__()
        self.server = server

    def on_start(self, event):
        print("In start")
        conn = event.container.connect(
            url=self.server,
            sasl_enabled=True,
            allowed_mechs="PLAIN",
            allow_insecure_mechs=True,
            user=f'{deviceId}@{tenantId}',
            password=devicePassword
        )
        print("connection established")
        event.container.create_sender(context=conn, target=None)
        print("sender created")

    def on_sendable(self, event):
        print("In Msg send")
        event.sender.send(Message(
            address=f'telemetry',
            content_type='application/json',
            body="{\"temp\": 5, \"transport\": \"amqp\"}"
        )) 
        event.sender.close()
        event.connection.close()
        print("Sender & connection closed")

    def on_connection_error(self, event):
        print("Connection Error")

    def on_link_error(self, event):
        print("Link Error")

    def on_transport_error(self, event):
        print("Transport Error")
        print(event)

Container(AmqpMessageSender(f'amqp://hono.eclipseprojects.io:5672')).run()
于 2021-10-18T07:41:31.133 回答