0

我有一个 Datastream[ObjectNode],我从 kafka 主题中读取为反序列化的 json。这个 ObjectNode 的一个元素实际上是一个事件数组。这个数组有不同的长度。传入的 json 流如下所示:

{
    "eventType": "Impression",
    "deviceId": "359849094258487",
    "payload": {
        "vertical_name": "",
        "promo_layout_type": "aa",
        "Customer_Id": "1011851",
        "ecommerce": {
            "promoView": {
                "promotions": [{
                    "name": "/-category_icons_all",
                    "id": "300275",
                    "position": "slot_5_1",
                    "creative": "Central/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "300276",
                    "position": "slot_6_1",
                    "creative": "Lifestyle/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "413002",
                    "position": "slot_7_1",
                    "creative": "Uber/Deals/00000001B890D1739913DDA956AB5C79775991EC"
                }]
            }
        }
    }
}

我希望能够分解数组,以便内部的promotions每个元素都成为可以写入接收器 kafka 主题的单独消息。Flink 是否在 DataStream 和/或 Table API 中提供了爆炸功能?

我试图在这个流上做一个 RichFlatMap 以便能够收集单独的行,但这也只是返回一个 DataStream[Seq[GenericRecord]],如下所示:

class PromoMapper(schema: Schema) extends RichFlatMapFunction[node.ObjectNode,Seq[GenericRecord]] {

  override def flatMap(value: ObjectNode, out: Collector[Seq[GenericRecord]]): Unit = {
    val promos = value.get("payload").get("ecommerce").get("promoView").get("promotions").asInstanceOf[Seq[node.ObjectNode]]

    val record = for{promo <- promos} yield {
      val processedRecord: GenericData.Record = new GenericData.Record(schema)
      promo.fieldNames().asScala.foreach(f => processedRecord.put(f,promo.get(f)))
      processedRecord
    }

    out.collect(record)
  }
}

请帮忙。

4

1 回答 1

0

使用平面地图是正确的想法(不知道您为什么要使用 RichFlatMap,但这是一个细节)。

似乎您应该out.collect(processedRecord)在 for 循环中调用每个元素,而不是在该循环产生的 Seq 上调用一次。

于 2019-05-31T13:31:28.553 回答