我正在尝试将 ApacheSpark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。
我正在创建结构化流,如下所示:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(aWuFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
到目前为止一切顺利,在 REPL 中我得到了这个 df 对象,如下所示:
df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]
我从这个线程中了解到,每次连接时我都必须更改客户端 ID。所以这解决了,但如果我开始使用这一行从流中读取:
val 查询 = df.writeStream。输出模式(“追加”)。
格式(“控制台”).start()
然后生成的架构如下所示:
df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]
数据如下:
这意味着我的 JSON 流被转换为包含 JSON 表示的字符串对象流。
这是 ApacheBahir 的限制吗?
提供模式也无济于事,因为以下代码类似于相同的结果:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true)::
Nil)
:paste
val df = spark.readStream
.schema(schema)
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("username","a-vy0z2s-q6s8r693hv")
.option("password","B+UX(a8GFPvX")
.option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
.load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")