0

我正在尝试获取一些 Spring-Boot kafka 流“动作”工作的样本,但我似乎最终完全糊涂了 :)

我正在通过网络接收 JSON 数据。我在avro中构建了一个模式,用于序列化数据:


{
  "UID": "XJ3_112",
  "type": "11X",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 0,
  "latitude": 50.1232,
  "longitude": -119.257,
  "altitude": 0,
  "time": "2018-07-18T00:00:13.9966Z"
}

{
  "platformUID": "BSG_SS_1_4",
  "type": "OB_334_11",
  "state": "ON_STATION",
  "fuelremaining": -1,
  "latitude": 56.1623,
  "longitude": -44.5614,
  "altitude": 519174,
  "time": "2018-07-18T00:01:43.0871Z"
}

这是据我所知:

@Component
class KStreamTransformer {

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @StreamListener(MyKafkaStreams.INPUT)
    @SendTo(MyKafkaStreams.OUTPUT)
    fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {

        return input.flatMapValues{
            value ->
            val out = Arrays.asList(value)

            out
        }.groupBy() ???
    }
}

我希望创建一个看起来像这样的 KTable:

|platformUID|状态|纬度|经度|Alt| |-----------|-----|---|---|---|

这就是我让自己感到困惑的地方。

我假设我想GroupByPlatformUID球场上做一个,但我不清楚如何实际前进。

有人可以指出我正确的方向吗?

我认为我正在寻找的是获取input流并将其转换为 KTable,其键为value.getUID(),值为之前的值

4

1 回答 1

2

如果platformUID已经是数据生产者使用的密钥,则可以使用

KTable ktable = kstream
                 .groupByKey()
                 .reduce((oldValue, newValue) -> newValue)

如果没有,则应放入 KeyValueMapper .groupBy(),它看起来像

KTable ktable = kstream
                 .groupBy((k, v) -> v.getPlatformUID())
                 .reduce((oldValue, newValue) -> newValue)

它将创建一个内部主题,该主题使用新密钥对源主题进行重新分区。

对于 java 7,KeyValueMapper 的语法如下:

KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
        public K apply(K key, V1 value) {
            return key;
        }
    };
于 2019-06-21T14:04:47.017 回答