在使用 Spring Cloud Stream 的流处理应用程序中,我正在获取输入流(以整数为键)并调用selectKey
它以创建具有相同值但具有不同键(字符串)的新主题。输入主题中包含正确 JSON 格式的记录,例如:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
问题是流处理应用程序创建的主题具有value
包含 JSON 的字符串,而不是正确的 JSON,即:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
代码如下:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
上面的函数所做的是消耗输入流,并生成一个输出流(被发送到output
)。该输出流与输入流具有相同的值,但只是一个不同的键。(在这种情况下,键来自值的publicId
属性。)
application.yml
如下:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
有什么我想念的吗?这实际上是一个问题,还是可以将 JSON 作为字符串存储在 Spring Cloud Stream 生成的消息中?
我尝试过的其他没有什么影响的事情:
- 使用本机解码/编码
- 设置
spring.cloud.stream.bindings.output.content-type
为application/json
- 使用
map
代替selectKey