我是 Spark 的初学者,想将数据帧从我的 spark-structured-streaming 应用程序发送到 ActiveMQ 中定义的主题。我怎样才能做到这一点?
编辑:我的版本:activeMQ-5.16
, spark-2.4.0
, bahir-2.4.0
,scala-2.11.11
目前,我能够从一个主题读取数据并将其发送到另一个主题:
val df = spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl","tcp://localhost:1883")
.option("topic","sample_topic")
.option("persistence","memory")
.option("cleanSession", "true")
.load()
.select("payload")
.as[Array[Byte]]
.map(payload => new String(payload))
.toDF("payload")
val checkpointLocation : String = "/home/wintersoldier/Desktop/dump/checkpoint"
val df2 = df
.writeStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
.option("brokerUrl","tcp://localhost:1883")
.option("topic","sample_topic4")
.option("checkpointLocation", checkpointLocation)
.start
df2.awaitTermination()
但这仅用于测试目的,我想将自定义字符串发送到 ActiveMQ 主题,数据帧模式是id : integer, topic: String, payload : binary, timestamp : timestamp
如何将我的字符串转换为binary
格式的有效负载并发送它?