我正在使用 Kafka 和 Redis 构建反应式管道。现有服务使用来自 Apache Kafka 的事件,应用业务逻辑并最终更新 Redis 集。我正在重构服务以使用反应式 API。
这是使用反应式 API 的示例代码。
var options = ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("topic_name"))
.addAssignListener(receiverPartitions -> {
System.out.println("Receiver partitions::" + receiverPartitions);
receiverPartitions.forEach(ReceiverPartition::seekToBeginning);
})
.addRevokeListener(receiverPartitions -> System.out.println("Revoke listeners::" + receiverPartitions));
KafkaReceiver.create(options)
.receive()
.log()
.concatMap(record -> processRecord(dateFormat, record))
.subscribe();
以及过程记录
private Mono<ReceiverRecord<Object, Object>> processRecord(SimpleDateFormat dateFormat, ReceiverRecord<Object, Object> record) {
ReceiverOffset offset = record.receiverOffset();
System.out.printf(Thread.currentThread() + "::Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",
offset.topicPartition(),
offset.offset(),
dateFormat.format(new Date(record.timestamp())),
record.key(),
record.value());
Object value = record.value();
var publisher = reactiveRedisTemplate
.opsForZSet()
.add("recent::users" + UUID.randomUUID(), value.toString(), (double) System.currentTimeMillis())
.log()
.flatMap(new Function<Boolean, Mono<ReceiverRecord<Object, Object>>>() {
@Override
public Mono<ReceiverRecord<Object, Object>> apply(Boolean aBoolean) {
System.out.println(Thread.currentThread() + "Got the response::" + aBoolean);
if (aBoolean ==null) {
return Mono.empty();
}
record.receiverOffset().acknowledge();
System.out.println(Thread.currentThread()+ ":: Ack done");
return Mono.just(record);
}
});
System.out.println(Thread.currentThread() + "::Done..");
return publisher;
}
执行后的日志。
Thread[lettuce-nioEventLoop-4-1,5,main]Got the response::true
Thread[lettuce-nioEventLoop-4-1,5,main]:: Ack done
Thread[lettuce-nioEventLoop-4-1,5,main]::Received message: topic-partition=topic_name-101 offset=108 timestamp=17:10:54:509 IST 28 Jun 2021 key=null value={"log":{"log_date":1618255865000,"empId":1234}}
Thread[lettuce-nioEventLoop-4-1,5,main]::Done..
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369 : onSubscribe(MonoNext.NextSubscriber)
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369 : request(unbounded)
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2361 : onComplete()
2021-06-28 20:11:14.158 INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2362 : onNext(true)
Thread[lettuce-nioEventLoop-4-1,5,main]Got the response::true
Thread[lettuce-nioEventLoop-4-1,5,main]:: Ack done
我观察到lettuce-nioEventLoop线程甚至用于处理 kafka 事件和 lettuce 回调。我不明白这种行为。有人可以阐明一下吗?