0

我正在尝试从 Eclipse Paho MQTT 客户端订阅一个简单的主题“foo”。

该代理由 Eclipse Kapua 管理,可通过 tcp://localhost:1883 使用凭据“kapua-broker”和“kapua-password”访问。

我以这种方式发布一个值:

send(new Payload.Builder().put("testKey","testVal"),"foo");

这基本上发送了一个主题为“foo”的地图(“testKey”,“testVal”)。要订阅这个主题,我有以下代码(host="localhost", port=1883):

    String topic = "foo";
    String broker = "tcp://"+host+":"+Integer.toString(port);
    String clientId = "supply-chain-control-simulation-listener";
    String username = "kapua-broker";
    String password = "kapua-password";

    try {
        MqttClient client = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(username);
        connOpts.setPassword(password.toCharArray());
        connOpts.setCleanSession(true);
        logger.info("Connecting to broker: "+broker);
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                logger.info("Subscriptions stopped");
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                logger.info(s);
                logger.info(mqttMessage.getPayload().toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        client.connect(connOpts);
        if (client.isConnected())
            logger.info("Connected");
        else
            logger.error(client.getDebug().toString());
        client.subscribe(topic,2);
    } catch(MqttException me) {
        logger.error("reason "+me.getReasonCode());
        logger.error("msg "+me.getMessage());
        logger.error("loc "+me.getLocalizedMessage());
        logger.error("cause "+me.getCause());
        logger.error("excep "+me);
        me.printStackTrace();
    }

连接有效,但订阅输出此错误:

15:40:03.240 [ActiveMQ NIO Worker 0] WARN oekbcpKapuaSecurityBrokerFilter - 用户 1:kapua-broker (supply-chain-control-simulation-listener - tcp://172.17.0.1:40888 - conn id 1734706196170193882) 无权读取来自:topic://VirtualTopic.foo

4

2 回答 2

2

在 Kapua 中,您可以根据您的用户权限发布/订阅。

如果您的用户只有broker:connect权限,您只能在主题上发布/订阅:

{account-name}/{connectionClientId}/{semanticTopic}

在您的特定情况下,您应该发布/订阅主题:

kapus-sys/supply-chain-control-simulation-listener/foo

kapua-sys是用户kapua-broker所属的账户名,而是supply-chain-control-simulation-listener用于创建连接的clientId。

请注意,用于连接的用户名和帐户名在 Kapua 中是两个不同的东西。一个帐户有多个用户。

于 2017-07-25T07:26:37.840 回答
1

不要subscribe在调用之后立即调用connect,而是将该调用移到connectComplete回调中:

IMqttAsyncClient client = new MqttAsyncClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
logger.info("Connecting to broker: "+broker);
client.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String brokerAddress) {
        logger.info("Connected");
        client.subscribe(topic,2);
    }
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("Subscriptions stopped");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        logger.info(s);
        logger.info(mqttMessage.getPayload().toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
});
client.connect(connOpts);

也就是说,您的错误可能来自您正在使用的 MQTT 代理,您需要对其进行配置以允许访问该主题。

于 2017-07-14T07:06:09.127 回答