0

[注意] 问题是 Lagom 框架特有的!

在我当前的项目中,当上游高速并且看起来下游无法及时处理所有消息时,观察到将消息列表从 Source 剪切到 Kafka 主题发布者的问题。正如意识到的那样,切割与 PubSubRef.subscribe() 方法的行为有关https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl /pubsub/PubSubRef.scala#L85

完整的方法定义:

def subscriber(): Source[T, NotUsed] = {
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead)
  .mapMaterializedValue { ref =>
    mediator ! Subscribe(topic.name, ref)
    NotUsed
  }.asJava
}

使用了 OverflowStrategy.dropHead。可以改成使用背压策略吗?

UPD#1: 用例非常简单,当查询请求发布到命令主题时,获取它并从数据库表中查询对象,结果列表被推送到结果 Kafka 主题中。代码片段:

objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC));
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects)
        .mapAsync(concurrency, objects -> objects)
        .flatMapMerge(concurrency, objects -> objects)
        .alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object)))
        .to(objectsResultTopic.publisher()),
    Source.repeat(Done.getInstance())
    )
)

如果函数生成的对象流deserializeCommandAndQueryObjects超过默认缓冲区大小 = 1000,它会开始切割元素(我们的例子是 ~ 2.5k 个对象)。

UPD#2: 对象数据的来源是:

// returns CompletionStage<Source<CustomObject, ?>>
jdbcSession.withConnection(
  connection -> Source.from(runQuery(connection, rowConverter))
)

还有一个 Kafka 订阅objectsResultTopic

TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> {
    JsonNode node = mapper.convertValue(gm, JsonNode.class);
    return Pair.create(node, offset);
}));
4

2 回答 2

4

听起来 Lagom 的分布式发布-订阅功能可能不是您工作的最佳工具。

您的问题提到了 Kafka,但此功能没有使用 Kafka。相反,它通过直接向集群中的所有订阅者广播消息来工作。这是一种“最多一次”消息传输,可能确实会丢失消息,并且适用于那些更关心跟上最新消息而不是处理每一条消息的消费者。溢出策略不可定制,您不希望在这些用例中使用背压,因为这意味着一个缓慢的消费者可能会减慢向所有其他订阅者的交付速度。

您还有其他一些选择:

  1. 如果你确实想使用 Kafka,你应该使用 Lagom 的消息代理 API。这支持“至少一次”传递语义,并可用于确保每个消费者处理每条消息(以可能增加延迟为代价)。

    在这种情况下,Kafka 充当了一个巨大的持久缓冲区,因此它甚至比背压更好:生产者和消费者可以以不同的速度进行,并且(当与分区一起使用时)您可以添加消费者以扩展和处理消息需要时更快。

    当生产者和消费者都在同一个服务中时,可以使用消息代理 API,但它特别适用于服务之间的通信。

  2. 如果您发送的消息是持久性实体事件,并且消费者是同一服务的一部分,那么持久性读取端处理器可能是一个不错的选择。

    这也提供了“至少一次”传递,如果处理消息的唯一效果是数据库更新,那么对Cassandra 读取端数据库关系读取端数据库的内置支持提供“有效一次”语义,其中数据库更新以事务方式运行,以确保在事件处理期间发生的故障不会导致部分更新。

  3. 如果您发送的消息是持久实体事件,消费者是同一服务的一部分,但您希望将事件作为流处理,则可以访问原始事件流

  4. 如果您的用例不适合 Lagom 明确支持的用例之一,您可以使用较低级别的 Akka API(包括分布式发布订阅)来实现更适合您需求的东西。

最佳选择将取决于您的用例的具体情况:消息的来源和您想要的消费者类型。如果您使用更多详细信息更新您的问题并对此答案添加评论,我可以使用更具体的建议编辑答案。

于 2017-07-04T01:04:43.477 回答
1

如果有人感兴趣,最后我们使用 Akka Producer API 解决了这个问题,例如:

ProducerSettings<String, CustomObject> producerSettings = ProducerSettings.create(system, new StringSerializer(), new CustomObjectSerializer());
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects)
        .mapAsync(concurrency, objects -> objects)
        .flatMapMerge(concurrency, objects -> objects)
        .alsoTo(Sink.foreach(object -> LOG.trace("Sending event {}", object)))
        .map(object -> new ProducerRecord<String, CustomObject>(OBJECTS_RESULT_TOPIC, object))
        .to(Producer.plainSink(producerSettings)),
    Source.repeat(Done.getInstance())));

它无需缓冲即可工作,只是推动进入 Kafka 主题。

于 2017-07-05T10:56:53.013 回答