目前我在同一个 Spring Boot 应用程序中配置了生产者和消费者,但是很奇怪,触发消息的 Spring 云流没有通过 Kafka(我正在使用 kafka-console-consumer 监视消息),但是消费者仍然收到消息(使用与生产者相同的线程)。
如果我删除应用程序中的 consumerHandler (@StreamListener),生产者会成功地将消息发送到 Kafka。
这有什么配置吗?默认情况下,我需要 Spring 云流向 Kafka 发送消息。
生产者和消费者配置:
@Component
public interface NotificationProcessor {
String EMAIL_NOTIFICATION = "email-notification";
@Input(EMAIL_NOTIFICATION)
SubscribableChannel receiveEmail();
@Output(EMAIL_NOTIFICATION)
MessageChannel sendEmail();
}
这是我的一些配置:
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: ${KAFKA_BROKERS:localhost:9092}
auto-create-topics: true
configuration:
auto.offset.reset: latest
bindings:
email-notification:
group: ${EMAIL_GROUP:email-group-notification}
destination: ${EMAIL_TOPIC:email-notification}
contentType: application/json
producer:
partitionCount: 9
consumer:
partitioned: true
concurrency: 3
instance-count: 1
instance-index: 0
触发发送消息的 API:
@RestController
@RequestMapping("/api")
public class TestResource {
private final Logger log = LoggerFactory.getLogger(TestResource.class);
private final NotificationProcessor notificationProcessor;
public TestResource(NotificationProcessor notificationProcessor) {
this.notificationProcessor = notificationProcessor;
}
@ApiOperation(value = "Test api")
@GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Boolean> test2() {
EmailMessage test = EmailMessage.builder()
.to(Arrays.asList(Receiver.builder().email("test@nomail.com").build())
).type(EContentType.JSON)
.build();
log.info("send email message to kafka");
notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
和消费者处理程序:
@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {
private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);
public NotificationProducer(){}
@StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
public void receiveEmail(@Payload Message<EmailMessage> message) {
log.info("Receive email message from kafka");
EmailMessage emailMessage = message.getPayload();
}
}