我们尝试使用 smallrye 响应式消息传递发布和订阅 MQTT 协议。我们设法通过以下简单代码将消息实际发布到特定主题/频道
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
@ApplicationScoped
public class Publish {
@Outgoing("pao")
public Multi<String> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> "A Message in here");
}
}
我们想要做的是,只要我们想以generate()
某种方式使用动态主题调用该方法,用户将在其中定义它。那是我们的问题,但后来我们从github的那个repo中找到了这些类。包裹名字io.smallrye.reactive.messaging.mqtt
例如,我们发现有一个类说它对 MQTT 代理(Mosquitto 服务器启动)进行发布调用。
在该语句中,SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false);
我们在“SendingMqttMessage(java.lang.String, java.lang.String, io.netty.handler.codec.mqtt.MqttQoS, boolean)”下面的红色下划线SendingMqttMessage<String>
在“io. smallrye.reactive.messaging.mqtt.SendingMqttMessage'。无法从外部包访问
更新(发布完成)
最后向 mqtt 代理(一个 mosquitto 服务器)发出了一个发布请求,所有这些都使用用户配置的动态主题。正如我们发现的,以前的 ClassSendingMqttMessage
根本不应该被使用。我们发现我们还需要和发射器来实际发出带有动态主题的发布请求。
@Inject
@Channel("panatha")
Emitter<String> emitter;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createUser(Device device) {
System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
return Response.ok().status(Response.Status.CREATED).build();
}
现在我们需要了解如何动态订阅主题。