1

我正在测试 Azure 服务总线发布者/订阅者客户端的重试选项,因为在突然连接失败后,客户端将不会重试发送或接收消息。

以下是发布者客户端 sendMessage() 方法的代码,我已将订阅的最大传递计数设置为 1000。客户端仍然使用默认的 retryPolicy 值,我看不到它重试,就像我在 amqpRetryOptions 中给出的那样。

static void sendMessage() {
        // create Retry Options for the Service Bus client
        AmqpRetryOptions amqpRetryOptions = new AmqpRetryOptions();
        amqpRetryOptions.setDelay(Duration.ofSeconds(1));
        amqpRetryOptions.setMaxRetries(120);
        amqpRetryOptions.setMaxDelay(Duration.ofMinutes(5));
        amqpRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
        amqpRetryOptions.setTryTimeout(Duration.ofSeconds(5));

        // create a Service Bus Sender client for the queue
        ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .retryOptions(amqpRetryOptions)
                .sender()
                .topicName(topicName)
                .buildClient();

        // send one message to the topic
        senderClient.sendMessage(new ServiceBusMessage("Hello, World! "));
        System.out.println("Sent a single message to the topic");
    }

我的方法错了吗?

  • 如果是这样,标准方法是什么?
  • 如果没有,如何处理重试机制?

如果没有怎么办

4

1 回答 1

1

我能够使用ServiceBusSenderAsyncClient. 此外,我可以捕获异常以检查原因是否是暂时的。

static void sendMessage() {
    // create Retry Options for the Service Bus client
    AmqpRetryOptions amqpRetryOptions = new AmqpRetryOptions();
    amqpRetryOptions.setDelay(Duration.ofSeconds(1));
    amqpRetryOptions.setMaxRetries(5);
    amqpRetryOptions.setMaxDelay(Duration.ofSeconds(15));
    amqpRetryOptions.setMode(AmqpRetryMode.EXPONENTIAL);
    amqpRetryOptions.setTryTimeout(Duration.ofSeconds(5));

   // instantiate a client that will be used to call the service
   ServiceBusSenderAsyncClient serviceBusSenderAsyncClient = new ServiceBusClientBuilder()
       .connectionString(connectionString)
       .retryOptions(amqpRetryOptions)
       .sender()
       .topicName(topicName)
       .buildAsyncClient();

    // create a message
    ServiceBusMessage serviceBusMessage = new ServiceBusMessage("Hello, World!\n")

    // send the message to the topic
    serviceBusSenderAsyncClient.sendMessage(serviceBusMessage).subscribe(
        unused -> System.out.println("Message sent successfully"),
        error -> {
            ServiceBusException serviceBusException = (ServiceBusException) error;
            System.out.println(serviceBusException.isTransient());
        },
        () -> {
            System.out.println("Message sent successfully");
        }
    );
}

于 2021-07-26T04:23:28.550 回答