0

我正在使用 Kafka 和 Redis 构建反应式管道。现有服务使用来自 Apache Kafka 的事件,应用业务逻辑并最终更新 Redis 集。我正在重构服务以使用反应式 API。

这是使用反应式 API 的示例代码。

var options = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("topic_name"))
                .addAssignListener(receiverPartitions -> {
                    System.out.println("Receiver partitions::" + receiverPartitions);
                    receiverPartitions.forEach(ReceiverPartition::seekToBeginning);
                })
                .addRevokeListener(receiverPartitions -> System.out.println("Revoke listeners::" + receiverPartitions));

KafkaReceiver.create(options)
                        .receive()
                        .log()
                        .concatMap(record -> processRecord(dateFormat, record))
                        .subscribe();

以及过程记录

private Mono<ReceiverRecord<Object, Object>> processRecord(SimpleDateFormat dateFormat, ReceiverRecord<Object, Object> record) {
        ReceiverOffset offset = record.receiverOffset();
        System.out.printf(Thread.currentThread() + "::Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",
                offset.topicPartition(),
                offset.offset(),
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                record.value());
        Object value = record.value();
        var publisher = reactiveRedisTemplate
                .opsForZSet()
                .add("recent::users" + UUID.randomUUID(), value.toString(), (double) System.currentTimeMillis())
                .log()
                .flatMap(new Function<Boolean, Mono<ReceiverRecord<Object, Object>>>() {
                    @Override
                    public Mono<ReceiverRecord<Object, Object>> apply(Boolean aBoolean) {
                        System.out.println(Thread.currentThread() + "Got the response::"  + aBoolean);
                        if (aBoolean ==null) {
                            return Mono.empty();
                        }
                        record.receiverOffset().acknowledge();
                        System.out.println(Thread.currentThread()+ ":: Ack done");
                        return Mono.just(record);
                    }
                });
        System.out.println(Thread.currentThread() + "::Done..");
        return publisher;
    }

执行后的日志。

Thread[lettuce-nioEventLoop-4-1,5,main]Got the response::true
Thread[lettuce-nioEventLoop-4-1,5,main]:: Ack done
Thread[lettuce-nioEventLoop-4-1,5,main]::Received message: topic-partition=topic_name-101 offset=108 timestamp=17:10:54:509 IST 28 Jun 2021 key=null value={"log":{"log_date":1618255865000,"empId":1234}}
Thread[lettuce-nioEventLoop-4-1,5,main]::Done..
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369                   : onSubscribe(MonoNext.NextSubscriber)
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2369                   : request(unbounded)
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2361                   : onComplete()
2021-06-28 20:11:14.158  INFO 21262 --- [ioEventLoop-4-1] reactor.Mono.Next.2362                   : onNext(true)
Thread[lettuce-nioEventLoop-4-1,5,main]Got the response::true
Thread[lettuce-nioEventLoop-4-1,5,main]:: Ack done

我观察到lettuce-nioEventLoop线程甚至用于处理 kafka 事件和 lettuce 回调。我不明白这种行为。有人可以阐明一下吗?

4

0 回答 0