我正在使用 SCDF 开发一个流,允许将所有 MQTT 消息持久化到 SQL 数据库。
这是用于创建流的代码
stream create --name mqtt-to-jdbc --definition "mqtt --qos=2 --topics='#' --username=admin --password=******** --url='tcp://192.168.1.153:60065' | jdbc --username=sa --password=******** --driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver --url='jdbc:sqlserver://192.168.1.18;databaseName=test_db;schema=dbo' --table-name=mqtt_message --columns=\"headers:headers.toString(),payload:payload.toString(),created_at:new java.sql.Timestamp(T(System).currentTimeMillis()).toString()\"" --deploy
mqtt_message 表包含几列,其中包括 headers、payload、received_topic。流已成功部署并且数据被持久化,但是:headers列是使用 SpEL headers.toString() 提取的:
{b3=d4840635cb8c968c-381e88a613735a05-1, nativeHeaders={}, errorChannel=, id=e34b08d5-eafe-decd-4aa1-634cb187889a, timestamp=1622532820049}
使用SpEL payload.toString() 很好地提取了有效负载列:
Test payload
如您所见,标头中的值不包括假定的标头,包括消息的主题 (mqtt_receivedTopic)。
如果我为生产者和接收器提供实现,我可以访问以下消息头:
Headers:{
mqtt_id=0,
deliveryAttempt=1,
kafka_timestampType=CREATE_TIME,
kafka_receivedTopic=reaper.reaper-source,
mqtt_receivedRetained=false,
kafka_offset=31,
mqtt_duplicate=false,
scst_nativeHeadersPresent=true,
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19da7b97,
id=cc423e5f-af1e-173a-a9ac-c229ab544738,
kafka_receivedPartitionId=0,
mqtt_receivedTopic=test/topic,
contentType=application/json,
kafka_receivedTimestamp=1622453609998,
mqtt_receivedQos=0,
kafka_groupId=reaper,
timestamp=1622453610004
}
我还测试了以下属性,但它们都没有改变结果:
制片人
- spring.cloud.stream.bindings.output.producer.headerMode=embeddedHeaders
- spring.cloud.stream.bindings.output.producer.useNativeEncoding=true
消费者
- spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
有没有办法在生产者和接收器之间传递本机标头并将它们写入目标列(从接收到的主题中提取值)。
谢谢。