[注意] 问题是 Lagom 框架特有的!
在我当前的项目中,当上游高速并且看起来下游无法及时处理所有消息时,观察到将消息列表从 Source 剪切到 Kafka 主题发布者的问题。正如意识到的那样,切割与 PubSubRef.subscribe() 方法的行为有关https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl /pubsub/PubSubRef.scala#L85
完整的方法定义:
def subscriber(): Source[T, NotUsed] = {
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead)
.mapMaterializedValue { ref =>
mediator ! Subscribe(topic.name, ref)
NotUsed
}.asJava
}
使用了 OverflowStrategy.dropHead。可以改成使用背压策略吗?
UPD#1: 用例非常简单,当查询请求发布到命令主题时,获取它并从数据库表中查询对象,结果列表被推送到结果 Kafka 主题中。代码片段:
objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC));
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
Flow.fromFunction(this::deserializeCommandAndQueryObjects)
.mapAsync(concurrency, objects -> objects)
.flatMapMerge(concurrency, objects -> objects)
.alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object)))
.to(objectsResultTopic.publisher()),
Source.repeat(Done.getInstance())
)
)
如果函数生成的对象流deserializeCommandAndQueryObjects
超过默认缓冲区大小 = 1000,它会开始切割元素(我们的例子是 ~ 2.5k 个对象)。
UPD#2: 对象数据的来源是:
// returns CompletionStage<Source<CustomObject, ?>>
jdbcSession.withConnection(
connection -> Source.from(runQuery(connection, rowConverter))
)
还有一个 Kafka 订阅objectsResultTopic
:
TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> {
JsonNode node = mapper.convertValue(gm, JsonNode.class);
return Pair.create(node, offset);
}));