0

我提到这个通过订阅从我的 Azure 服务总线接收消息

我能够收到消息,但我会不断收到消息,直到我手动终止程序

我有一个超时选项,并且只想在超时之前接收消息。

如果您可以解释以下代码的工作原理以及如何修改以下代码以接收特定时间范围内的消息并在达到超时后停止接收,这将很有帮助。

static void registerMessageHandlerOnClient(SubscriptionClient receiveClient, ExecutorService executorService) throws Exception {
    // register the RegisterMessageHandler callback
    receiveClient.registerMessageHandler(
            new IMessageHandler() {
                // callback invoked when the message handler loop has obtained a message
                public CompletableFuture<Void> onMessageAsync(IMessage message) {
                    // receives message is passed to callback
                    if (message.getLabel() != null &&
                            message.getContentType() != null &&
                            message.getLabel().contentEquals("Scientist") &&
                            message.getContentType().contentEquals("application/json")) {

                        byte[] body = message.getBody();
                        Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                        System.out.printf(
                                "\n\t\t\t\t%s Message received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                                        "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                                receiveClient.getEntityPath(),
                                message.getMessageId(),
                                message.getSequenceNumber(),
                                message.getEnqueuedTimeUtc(),
                                message.getExpiresAtUtc(),
                                message.getContentType(),
                                scientist != null ? scientist.get("firstName") : "",
                                scientist != null ? scientist.get("name") : "");
                    }
                    return receiveClient.completeAsync(message.getLockToken());
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
            },
            // 1 concurrent call, messages are auto-completed, auto-renew duration
            new MessageHandlerOptions(1, false, Duration.ofMinutes(1)),
            executorService);
}
4

1 回答 1

2

这不能在您的订阅代码中完成。

您可以执行两种选择/解决方法:

  • 不要连续向主题发送消息,在那里有时间控制。
  • 创建一个计时器触发器,它可以调用 REST API订阅 - 创建或更新以制作EntityStatus=ReceiveDisabled并使用类似的函数来制作EntityStatus= Active
于 2021-05-20T12:04:33.903 回答