问题标签 [spring-integration-mqtt]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
227 浏览

activemq - 一个 MQTT 订阅者可以同时接收同一主题的多条消息吗?

我正在尝试在我的 Spring Boot 应用程序中创建订阅者。我的目标是发布者将向一个主题发送多条消息,我必须获取这些消息并处理它们。我注意到 Paho 和 Apache ActiveMq 的“handleMessage”一次将处理 1 条消息。可以并发吗??

我试过以下

  1. 用 ActiveMq 替换 Paho
  2. 在我的 listenercontainer 中提供并发
  3. 在我的订阅 URL 中提供预取

请让我知道是否有任何方法可以让我的 MQTT 订阅者同时接收多条消息。

谢谢你

0 投票
2 回答
599 浏览

google-cloud-iot - 谷歌云物联网核心设备注册 - 多个设备可以使用相同的公钥/私钥

我正在使用多个传感器设备进行内部项目。我不希望用户单独注册每台设备。从某种意义上说,我想为注册到注册表的所有设备使用相同的公钥/私钥对,但能够通过 mqtt/http 将设备信息传递到 pubsub,并使用名称或 ID 等唯一设备信息。有可能实现吗?

我假设我是否使用相同的密钥。我将所有设备注册为一个,但是否可以将设备信息作为正在发布的消息的一部分发送。这样做会以任何方式(如 API)抑制谷歌内置功能的使用。

云技术的新手,任何想法/建议都会有所帮助。

0 投票
1 回答
724 浏览

spring-integration - 使用 spring 集成网关向不同主题发送消息

我正在尝试使用 spring 集成将 mqtt 消息发送到代理,并且我正在尝试使用网关接口。

我的问题是:如果我想使用网关处理程序将消息发送到不同的主题,我将如何做到这一点而不必为每个主题创建一个适配器?

谢谢。

希望我清楚地提出了我的问题并且代码格式正确。

0 投票
1 回答
51 浏览

spring-integration - 暂停和恢复 MQTT 入站适配器

我们有:一个转换器,它从 mqtt 主题中获取消息,解析、处理这些消息,并在特定验证后将它们发送到其他消息传递系统。如果目标系统无法访问,我们创建一个特定事件并通过注入的org.springframework.context.ApplicationEventPublisher实例发送它。

我们希望:在目标消息传递系统不可用的情况下,能够暂停(然后恢复)入站 MQTT 适配器的使用。

我们确实有对这些适配器的引用集合,并且希望编写实现ApplicationListener并基于事件暂停或恢复消费的“Toggler”。

作为MessageProducerSupport的任何实例,MqttPahoMessageDrivenChannelAdapter 具有受保护的doStart()doStop()方法。是否应该尝试将它们用于暂停/恢复目的?

0 投票
0 回答
419 浏览

java - 在 Spring Boot 集成中的 MqttPahoMessageHandler 和 MqttPahoMessageDrivenChannelAdapter 之间共享相同的 MQTT 客户端实例

假设我有一个 Spring Boot 应用程序需要实例化多个 MQTT 客户端 (# ~10^4) 以将物理设备虚拟化为代理 (Mosquitto)。他们每个人都有一个唯一的客户端 ID,以便从/向专用主题获取/发送消息。

我正在尝试使用 Spring Boot 中的 MQTT 入站/出站集成流(spring-integration-mqtt-5.1.6.RELEASE)来实现这一点,但是每个流(MqttPahoMessageDrivenChannelAdapter/MqttPahoMessageHandler)都会实例化自己的客户端来管理传入/传出消息。

我的担忧是:

  • 根据定义,由于 Spring Boot MQTT 集成流的结构,我不能只使用一个客户端进行输入/输出流。
  • 它可能效率不高,因为我需要两个不同的客户端,因此资源加倍。
  • 我不能为每个“逻辑”通道使用相同的客户端 ID,因为我正在使用两个不同的 MQTT 客户端,并且如您所知,如果您尝试使用相同的客户端 ID 连接到同一个代理,那么最旧的一个会断开连接。因此,为了发送和接收消息,我需要为每个逻辑输入/输出流提供不同的配置(和服务器端授权)。
  • Spring Boot 应用程序向服务器生成 2 次连接,因此服务器必须调整大小以接收 2 次传入连接,这可能是一个问题。

我的总体想法是重新实现MqttPahoMessageDrivenChannelAdapterMqttPahoMessageHandler让它们为相同的客户端 ID 共享相同的 MqttClient 实例。为此,我必须在 Adapter 和 Handler 中重新实现连接/断开/订阅方法,以使它们变得健壮。此外,我必须使用专用的客户端工厂才能为相同的客户端 ID 返回相同的客户端(这是最简单的部分)。

现在的问题是:这种方法正确吗?我是在强奸 Spring Boot 试图这样做吗?

我发现的唯一选择是直接实现原始 MqttClients,管理它们的整个生命周期。这是可行的,但我正在重新发明轮子,我没有利用 Spring Boot 的消息传递设计模式。

0 投票
1 回答
224 浏览

spring-integration - 由于令牌现已过期,处理与 MqttPahoMessageHandler 重新连接的最佳方法是什么?

我使用以下示例作为我自己的代码发布到 MQTT 服务器的基础:https ://github.com/spring-projects/spring-integration-samples/blob/master/basic/mqtt/src/main/ java/org/springframework/integration/samples/mqtt/Application.java

我有一个特殊的用例,其中密码是一个令牌,特别是一个会过期的 keycloak 令牌。如果出于某种原因,spring 应用程序失去与 MQTT 服务器的连接并尝试重新连接,则令牌将过期,并且将抛出 MqttSecurityException: Not authorized to connect 异常。我尝试在 MqttPahoMessageHandler 中扩展 connectionLost 方法,但由于 MqttPahoClientFactory 和 IMqttAsyncClient 是私有的 final,所以我无能为力。想知道是否还有其他我没有想到的方法,或者图书馆只是不打算像这样使用???

感谢您的任何回复。

0 投票
1 回答
1652 浏览

java - MQTT 和 SpringBoot 集成连接丢失

我目前在 SpringBoot 中有一个 API,我想添加一个 MQTT 客户端来订阅一个或多个主题。我尝试了几个 Paho,Hive 客户端,但没有成功,我目前使用的是使用 Paho 的 SpringBoot 的默认 MQTT,但即使使用基本配置,我也无法让它工作。我一启动应用程序就收到“连接丢失”错误...你能告诉我修复或其他可行的方法吗?谢谢! Mosquitto 在 windows 上运行进行测试

马文:

运行时的错误:

更多日志

0 投票
0 回答
136 浏览

spring - 我可以在 Spring Boot 应用程序中运行 MQTT 代理吗

是否可以让像 Mosquitto 这样的 mqqt 代理在 Spring Boot 应用程序中运行?或者任何其他mqqt经纪人?我发现了 eclipse paho 库,它可以是 Spring boot 应用程序中的一个客户端。我想知道我们是否有类似的 mqqt 经纪人?

0 投票
0 回答
18 浏览

spring-kafka - 如何访问 SCDF 流中的源本机标头?

我正在使用 SCDF 开发一个流,允许将所有 MQTT 消息持久化到 SQL 数据库。

这是用于创建流的代码

mqtt_message 表包含几列,其中包括 headers、payload、received_topic。流已成功部署并且数据被持久化,但是:headers列是使用 SpEL headers.toString() 提取的:

使用SpEL payload.toString() 很好地提取了有效负载列:

如您所见,标头中的值不包括假定的标头,包括消息的主题 (mqtt_receivedTopic)。

如果我为生产者和接收器提供实现,我可以访问以下消息头:

我还测试了以下属性,但它们都没有改变结果:

制片人

  • spring.cloud.stream.bindings.output.producer.headerMode=embeddedHeaders
  • spring.cloud.stream.bindings.output.producer.useNativeEncoding=true

消费者

  • spring.cloud.stream.default.consumer.headerMode=embeddedHeaders

有没有办法在生产者和接收器之间传递本机标头并将它们写入目标列(从接收到的主题中提取值)。

谢谢。

0 投票
0 回答
31 浏览

spring-boot - 即使在 MqttConnectOptions 中将自动重新连接设置为 true,Eclipse Paho MqttClient 也无法重新连接

场景一: 当 SpringBoot 应用程序在没有启动 MQTT 代理的情况下启动时,MqttClient 永远不会在 MQTT 代理启动时尝试重新连接。

场景 2: 但是如果 MQTT 代理在启动 SpringBoot 应用程序之前启动,然后如果 MQTT 代理停止并再次启动,它会尝试重新连接并成功连接回来。

期望 即使对于场景 1,我也期待与场景 2 相同的行为

需要进行哪些更改才能使其适用于场景 1?

MqttClient bean 的片段

服务等级