1

我是 Spark 的初学者,想将数据帧从我的 spark-structured-streaming 应用程序发送到 ActiveMQ 中定义的主题。我怎样才能做到这一点?

编辑:我的版本:activeMQ-5.16, spark-2.4.0, bahir-2.4.0,scala-2.11.11

目前,我能够从一个主题读取数据并将其发送到另一个主题:

    val df = spark
        .readStream
        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
        .option("brokerUrl","tcp://localhost:1883")
        .option("topic","sample_topic")
        .option("persistence","memory")
        .option("cleanSession", "true")
        .load()
        .select("payload")
        .as[Array[Byte]]
        .map(payload => new String(payload))
        .toDF("payload")
    
    val checkpointLocation : String = "/home/wintersoldier/Desktop/dump/checkpoint"
  
    val df2 = df
        .writeStream
        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
        .option("brokerUrl","tcp://localhost:1883")
        .option("topic","sample_topic4")
        .option("checkpointLocation", checkpointLocation)
        .start
    df2.awaitTermination()

但这仅用于测试目的,我想将自定义字符串发送到 ActiveMQ 主题,数据帧模式是id : integer, topic: String, payload : binary, timestamp : timestamp如何将我的字符串转换为binary格式的有效负载并发送它?

4

0 回答 0