2

请找到我们需要实现的用例。

首先,我们需要调用 Kafka 生产者一条消息作为休息服务,他们将在另一个主题中处理并返回响应。

对我们来说,这是一个请求-回复主题,我们需要为相同的请求回复响应,使用replykafka模板工作正常,但我们可以在标题中设置关联 ID 。

作为主题消息元数据,有属性发送,有没有办法将关联ID与请求主题消息和回复主题消息映射。

给你更好的解释。

一个微服务期望有效负载如下所示,有效负载中包含相关ID。

{
  "operationDate": "2020-09-16T11:58:25",
  "correlationId": "-5544538377183901824042719876882142227",
  "birthDate": "2013-12-12",
  "firstNameEn": "boby",
  "firstNameAr": "الشيخ",
}

微服务将处理有效负载,并将在另一个主题中给出响应。

{
  "correlationId": -5544538377183901824042719876882142227,
  "consumerId": null,
  "userid": 123456,
  "statusCode": "SUCCESS",
  "errors": null
}

现在,我们需要使用 spring ReplyingKafkaTemplate 来实现。

由于ReplyingKafkaTemplate 将仅在标头中使用correlationId

4

2 回答 2

1

假设您的意思是要在相关 ID 中包含主题,请参阅

/**
 * Set a function to be called to establish a unique correlation key for each request
 * record.
 * @param correlationStrategy the function.
 * @since 2.3
 */
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {

ProducerRecord您可以根据(具有)创建自己的相关 ID topic()

你只需要确保它是独一无二的。如果您手动设置KafkaHeaders.REPLY_TOPIC,它将对策略可见。

编辑

使用有效负载中的相关 id,用于setCorrelationIdStrategy从有效负载中提取相关 ID,并添加 aRecordInterceptor以在回复端执行相同操作。

于 2020-09-03T16:17:25.000 回答
0

感谢您的提示。

我已经用 Kafka 标头相关 ID 覆盖了有效负载。

@Override
    protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
        if(producerRecord.value()!=null){
           // i have appeneded the header correlationId in th payload
        }
        return super.doSend(producerRecord);
    }

在 Replay onMessage 中,我已将响应有效负载相关性 ID 填充到标题中。

@Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(
            krConsumerRecord -> //update each record header
        );
        super.onMessage(data);
    }

通过这种方式,在请求和响应负载中成功地将请求-响应语义与correlationId 集成在一起。

于 2020-09-17T13:52:56.997 回答