protected void consume(KafkaReadStream<String, JsonObject> consumer) {
consumer.handler(record -> {
// call a REST API to write a value to DB
doRestApi(record); // such as client.postAbs(...).send(...)
});
}
例如,主题中的记录,例如,
{"createtime":"2021-10-29 00:03:49.0"}
{"createtime":"2021-10-29 00:03:50.0"}
{"createtime":"2021-10-29 00:03:56.0"}
...
{"createtime":"2021-10-29 00:13:56.0"}
当记录{"createtime":"2021-10-29 00:03:49.0"}
到来时,处理程序执行了一个 REST api 调用来保存createtime=2021-10-29 00:03:49.0
到 mongodb。
当第二个记录到来时,createtime=2021-10-29 00:03:50.0
将被保存。然后将保存
第三条记录。
等等。我希望这些消息能够按顺序处理。因此,最后一条消息中的创建时间将是 DB 中的最终值。createtime=2021-10-29 00:03:56.0
2021-10-29 00:13:56.0
当我运行我的代码时,结果值是随机的,前一个值(例如2021-10-29 00:03:56.0
)可能是最终值。我怎样才能让处理程序按顺序运行?