0

我正在开发一个项目,该项目需要从 GCP Pub/Sub 接收事件、下载文件、解析和处理文件,最后将结果发布到 Kafka 代理,但我收到SRMSG00034: Insufficient downstream requests to emit item

在我第一次尝试发布到 Kafka 时,我使用流迭代 msgList,但我读到 Mutiny(https://quarkus.io/blog/mutiny-back-pressure/)可以控制背压,但我得到了同样的错误.

在我的场景中,我必须发布两个不同的列表,其中一个有大约 10k 条消息。我读到我可以使用@OnOverflow控制溢出配置,但我更喜欢保持默认配置,除非有必要进行一些更改。

Multi.createFrom().iterable(msgList)
    .onItem().transform(item -> {
        ... some transformation ...
    })
    .onItem().invoke(emitter::send)
    .subscribe().with(
            item -> Uni.createFrom().voidItem(),
            Throwable::printStackTrace,
            () -> System.out.println("Done!")
    );

你能为我指出正确的方向来解决这个问题吗?

提前致谢

4

1 回答 1

1

emitter.send是一种异步方法。在您的代码中,它会忽略结果并send立即返回。这可能不是你想要的。我建议使用 a MutinyEmitterand do onItem().call(emitter::send)。在这种情况下,您将等待消息发送。请注意,如果发送消息失败,它将传播失败。这样,将应用背压,因为它只会在前一条消息被确认时才会收到一条新消息。

如果要发送一批消息,请使用.group().asList().of(...)并发送列表中的所有消息。但是,和以前一样,您需要等待所有消息的确认。Uni.join会让你这样做。

如果您想同时发送消息但并发性受限,请使用:

.onItem().transformToUni(m -> emitter.send(m)).merge(concurrency)

它将concurrency同时发送消息。但是,在这种情况下不能保证顺序。

于 2021-11-03T06:52:02.860 回答