我正在尝试使用此处所示的 AWS IoT Core 自定义授权方 ( https://docs.aws.amazon.com/iot/latest/developerguide/config-custom-auth.html )。我开发了 lambda 并能够使用 HTTP 端点 ( https://docs.aws.amazon.com/iot/latest/apireference/API_iotdata_Publish.html ) 进行发布,也能够通过 AWS CLI 通过运行来调用它aws iot test-invoke-authorizer --authorizer-name <name> --mqtt-context "username=***,password=***,clientId=***"
。但是,当我尝试使用任何其他客户端时都不起作用(我使用了第一个蚊子、MQTT Explorer 和 paho 客户端)。有了这些,我只能在客户端获得时间,而在服务器端什么也没有。我还尝试了 Java AWS IoT Core SDK,我在boolean sessionPresent = connected.get();
连接时遇到了 TLS 协商失败。作为端点,我使用IoT:Data-ATS
. 文档中有一条令人困惑的行(https://docs.aws.amazon.com/iot/latest/developerguide/protocols.html)在表格中显示应在端口 443 中使用带有 MQTT 的自定义身份验证,并带有脚注。脚注说端口 443 中的自定义身份验证不起作用。这没有任何意义。
任何想法或帮助?
Mosquitto_sub 截图:
mosquitto_sub -d -h **** -p 443 -u username?x-amz-customauthorizer-name=*** -P test -t test --cafile /etc/ssl/certs/Amazon_Root_CA_1.pem -i ***
帕霍斩钉截铁:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
public class Paho implements MqttCallback {
public static void execute(){
try {
MqttClient client = new MqttClient("ssl://***:443","***",new MemoryPersistence());
client.connect();
client.setCallback(new Paho());
while (true){
client.publish("test","test".getBytes(StandardCharsets.UTF_8),0, false);
Thread.sleep(500);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
AWS 客户端截图:
public class AWS implements MqttClientConnectionEvents {
public static void execute(){
try {
try(EventLoopGroup eventLoopGroup = new EventLoopGroup(1);
HostResolver resolver = new HostResolver(eventLoopGroup);
ClientBootstrap clientBootstrap = new ClientBootstrap(eventLoopGroup, resolver);
AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newDefaultBuilder()) {
builder.withBootstrap(clientBootstrap)
.withConnectionEventCallbacks(new AWS())
.withClientId("****")
.withEndpoint("***")
.withCleanSession(true);
try(MqttClientConnection connection = builder.build()) {
CompletableFuture<Boolean> connected = connection.connect();
try {
boolean sessionPresent = connected.get();
System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!");
} catch (Exception ex) {
ex.printStackTrace();
throw new RuntimeException("Exception occurred during connect", ex);
}
CountDownLatch countDownLatch = new CountDownLatch(10);
CompletableFuture<Integer> subscribed = connection.subscribe("test", QualityOfService.AT_LEAST_ONCE, (message) -> {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("MESSAGE: " + payload);
countDownLatch.countDown();
});
subscribed.get();
int count = 0;
while (count++ < 10) {
CompletableFuture<Integer> published = connection.publish(new MqttMessage("test", "test".getBytes(), QualityOfService.AT_LEAST_ONCE, false));
published.get();
Thread.sleep(1000);
}
countDownLatch.await();
CompletableFuture<Void> disconnected = connection.disconnect();
disconnected.get();
}
} catch (Exception ex) {
System.out.println("Exception encountered: " + ex.toString());
}
System.out.println("Complete!");
}
@Override
public void onConnectionInterrupted(int errorCode) {
}
@Override
public void onConnectionResumed(boolean sessionPresent) {
}
}