我正在使用 Spark 2.1.1 (kafka 0.10+) 阅读 Kafka 主题,并且有效负载是 JSON 字符串。我想用模式解析字符串并推进业务逻辑。
每个人似乎都建议我应该使用它from_json
来解析 JSON 字符串,但是,它似乎不适用于我的情况。错误是
not found : value from_json
.select(from_json($"json", txnSchema) as "data")
当我在 spark shell 中尝试以下几行时,它工作得很好 -
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
任何想法,我在代码中做错了什么才能看到这部分在 shell 中工作,而不是在 IDE/编译时?
这是代码:
import org.apache.spark.sql._
object Kafka10Cons3 extends App {
val spark = SparkSession
.builder
.appName(Util.getProperty("AppName"))
.master(Util.getProperty("spark.master"))
.getOrCreate
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load
val txnSchema = Util.getTxnStructure
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
}