3

我们尝试使用 smallrye 响应式消息传递发布和订阅 MQTT 协议。我们设法通过以下简单代码将消息实际发布到特定主题/频道

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class Publish {
    
    @Outgoing("pao")
    public Multi<String> generate() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> "A Message in here");
    }
}

我们想要做的是,只要我们想以generate()某种方式使用动态主题调用该方法,用户将在其中定义它。那是我们的问题,但后来我们从github的那个repo中找到了这些类。包裹名字io.smallrye.reactive.messaging.mqtt

例如,我们发现有一个类说它对 MQTT 代理(Mosquitto 服务器启动)进行发布调用。

在该语句中,SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false); 我们在“SendingMqttMessage(java.lang.String, java.lang.String, io.netty.handler.codec.mqtt.MqttQoS, boolean)”下面的红色下划线SendingMqttMessage<String>“io. smallrye.reactive.messaging.mqtt.SendingMqttMessage'。无法从外部包访问

更新(发布完成) 最后向 mqtt 代理(一个 mosquitto 服务器)发出了一个发布请求,所有这些都使用用户配置的动态主题。正如我们发现的,以前的 ClassSendingMqttMessage根本不应该被使用。我们发现我们还需要和发射器来实际发出带有动态主题的发布请求。

    @Inject
    @Channel("panatha")
    Emitter<String> emitter;

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createUser(Device device) {
        System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
        emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
        return Response.ok().status(Response.Status.CREATED).build();
    }

现在我们需要了解如何动态订阅主题。

4

1 回答 1

0

首先将我们设置到同一页面:
响应式消息传递不适用于主题,但适用于渠道。这一点很重要,因为您可以专门读取写入通道。所以如果要两者都提供,需要配置两个channel指向同一个topic,一个传入,一个传出

要回答您的问题:

您在 Emitters 方面取得了不错的开端,但您仍然缺乏您想要的动态特性。在您的示例中,您通过 CDI 获得了该发射器。
现在这就是我们所需要的,使其动态化,因为我们不能在运行时使用 CDI 动态注入 Bean,如下所示:

发送消息

private Emitter<byte[]> dynamicEmitter(String topic){
        return CDI.current().select(new TypeLiteral<Emitter<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

另请注意,我正在创建一个字节 [] 类型的发射器,因为根据其文档,这是 smallrye-mqtt 连接器(版本 3.4.0)的唯一当前支持类型。

接收消息

要从反应式消息传递通道读取消息,您可以使用 Emitter 的对应物,即Publisher
它可以用于模拟:

private Publisher<byte[]> dynamicReceiver(String topic){
        return CDI.current().select(new TypeLiteral<Publisher<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

然后,您可以以任何您喜欢的方式处理这些日期。作为演示,它将它挂在一个简单的 REST 端点上

@GET
    @Produces(MediaType.SERVER_SENT_EVENTS) 
    public Multi<String> stream(@QueryParam("topic") String topic) {
        return Multi.createFrom().publisher(dynamicReceiver(topic)).onItem().transform(String::new); 
    }
    
    @GET
    @Path("/publish")
    public boolean publish(@QueryParam("msg") String msg, @QueryParam("topic") String topic) {
        dynamicEmitter(topic).send(msg.getBytes());
        return true; 
    }

还有一件事

在创建此解决方案时,我遇到了一些您应该知道的陷阱:

  1. Quarkus 删除任何“未使用”的 CDI-Bean。因此,如果您想动态注入它们,则需要排除它们,或关闭该功能。
  2. 必须配置以这种方式注入的所有通道。否则注入将失败。
  3. 出于某种原因,(即使完全禁用删除)我无法动态注入发射器,除非它们曾经注入到其他地方。
于 2022-01-10T12:55:03.750 回答