0

我正在尝试将 ApacheSpark 结构化流连接到 MQTT 主题(在本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。

我正在创建结构化流,如下所示:

val df = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","a-vy0z2s-q6s8r693hv")
    .option("password","B+UX(aWuFPvX")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

到目前为止一切顺利,在 REPL 中我得到了这个 df 对象,如下所示:

df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]

我从这个线程中了解到,每次连接时我都必须更改客户端 ID。所以这解决了,但如果我开始使用这一行从流中读取:

val 查询 = df.writeStream。输出模式(“追加”)。
格式(“控制台”).start()

然后生成的架构如下所示:

df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]

数据如下:

在此处输入图像描述

这意味着我的 JSON 流被转换为包含 JSON 表示的字符串对象流。

这是 ApacheBahir 的限制吗?

提供模式也无济于事,因为以下代码类似于相同的结果:

import org.apache.spark.sql.types._
val schema = StructType(
    StructField("count",LongType,true)::
    StructField("flowrate",LongType,true)::
    StructField("fluidlevel",StringType,true)::
    StructField("frequency",LongType,true)::
    StructField("hardness",LongType,true)::
    StructField("speed",LongType,true)::
    StructField("temperature",LongType,true)::
    StructField("ts",LongType,true)::
    StructField("voltage",LongType,true)::
Nil)

:paste
val df = spark.readStream
    .schema(schema)
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","a-vy0z2s-q6s8r693hv")
    .option("password","B+UX(a8GFPvX")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")
4

2 回答 2

1

许多DataSources,包括但不限于 MQTTStreamSource,具有固定的模式,由消息和时间戳组成。Schema 不会丢失,根本不会被解析,这是一种预期的行为。

如果架构是固定的并且预先知道,您应该能够使用from_json函数:

import org.apache.spark.sql.functions.from_json

df.withColumn("value", from_json($"value", schema))
于 2017-02-06T17:40:54.873 回答
0

对于解析(因为我不再使用四个“from_json”方法)我使用过

导入 org.apache.spark.sql.functions.json_tuple

和以下代码,它也可以工作:

df.withColumn("value",json_tuple($"value","myColumnName"))

于 2017-02-07T08:39:15.007 回答