0

以下我的数据框架构

root
 |-- name: string (nullable = true)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- city: string (nullable = true)

我想输出名称和城市。以下是我的 spark 流应用程序,它输出名称和地址,但我想要输出中的名称和城市。感谢你的帮助。谢谢。

object PersonConsumer {
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").
      option("kafka.bootstrap.servers","localhost:9092").
      option("subscribe","person").load()

    val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")

    ds2.printSchema()

    val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}
4

2 回答 2

0

谢谢桑迪普。select("name","addresses.element.city") 给我错误,因为地址是 Seq[Address] 并且我想要输出中的所有城市。

最后我编写了以下函数来获取所有城市..

    def getCities(addresses: Seq[Address]) : String = {
      var cities:String = ""
      if (addresses.size > 0) {
        cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
//        cities = addresses.foldLeft("")((str,addr) => str  + addr.city.getOrElse(""))
      }
      cities
    }
于 2016-11-20T21:48:42.073 回答
0

您可以简单地获取名称和城市的数据框,然后您就可以使用它,对于获取名称和城市的数据框,您可以选择两者如下

ds1.select("name","addresses.element.city")
于 2016-11-20T18:13:10.540 回答