7

我想注意到我将描述的场景很少发生,并且在大多数情况下,一切都按预期工作。

我在 Pub/Sub 端有 1 个主题和 1 个订阅。

我的 java 应用程序侦听订阅,进行一些处理并发送回确认。由于 google Pub/Sub 保证至少一次交付,我们根据objectGeneration标头和“objectId”标头在我们这边进行消息重复数据删除。

有时我们会看到我们的应用程序一次又一次地接受已确认的消息,这是意想不到的行为。

日志示例:

//first
2019-12-17 20:51:57.375 INFO 1 --- [sub-subscriber3] bucketNotificationFlow : Received new message from pub-sub: GenericMessage [payload={....}, headers={.....objectGeneration=1576615916875106, eventTime=2019-12-17T20:51:56.874940Z, objectId=Small_files_bunch/100_12_1.csv, ....
....
2019-12-17 20:51:57.698 INFO 1 --- [sub-subscriber3] .i.g.PubSubMessageAcknowledgementHandler : Acknowledged message - 1576615916875106
...
//duplicate 1
2019-12-17 20:51:59.663 INFO 1 --- [sub-subscriber4] bucketNotificationFlow : Received new message from pub-sub: GenericMessage [payload={...}, headers={ objectGeneration=1576615916875106, eventTime=2019-12-17T20:51:56.874940Z, objectId=Small_files_bunch/100_12_1.csv", ....
...
2019-12-17 20:51:59.704 INFO 1 --- [sub-subscriber4] c.b.m.i.DiscardedMessagesHandler : Duplicate message received GenericMessage [ headers={idempotent.keys=[objectGeneration.1576615916875106, objectId.Small_files_bunch/100_12_1.csv], ...
....
//duplicate 2
2019-12-17 22:52:02.239 INFO 1 --- [sub-subscriber1] bucketNotificationFlow : Received new message from pub-sub: GenericMessage [payload={...}, headers={objectGeneration=1576615916875106, eventTime=2019-12-17T20:51:56.874940Z, objectId=Small_files_bunch/100_12_1.csv, ...
...
2019-12-17 22:52:02.339 INFO 1 --- [sub-subscriber1] c.b.m.i.DiscardedMessagesHandler : Duplicate message received GenericMessage [ headers={idempotent.keys=[objectGeneration.1576615916875106, objectId.Small_files_bunch/100_12_1.csv], ...

// and so on each 2 hours

确认代码:

var generation = message.getHeaders().get("objectGeneration");
pubSubMessage = message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class)
pubSubMessage.ack().addCallback(
        v -> {
            removeFromIdempotentStore(targetMessage, false);
            log.info("Acknowledged message - {}", generation); //from logs we see that this line was invoked
        },
        e -> {
            removeFromIdempotentStore(targetMessage, false);
            log.error("Failed to acknowledge message - {}", generation, e);
        }
);

GCP订阅页面包含下图:

在此处输入图像描述

StackDriver 确认图: 在此处输入图像描述

任何想法发生了什么,如何解决和修复它?

4

2 回答 2

1

尝试检查 Stackdriver 以查看您是否错过了确认截止日期

重复之间的两小时等待时间非常有趣。您之前是否尝试过延长您的消息截止日期?(这方面的信息在上面的链接中。)

于 2019-12-21T16:12:31.720 回答
0

在此处查看更多信息:如何清理 JdbcMetadataStore?

根据我们的结论,最好不要在处理后立即从元数据存储表中删除条目。一些外部工作应该不时发挥作用,并且只针对那些已经足够老可以删除的条目,我们确信 Pub/Sub 不会再向我们重新传递相同的消息。

于 2019-12-26T17:23:58.630 回答