1

我有以下工作流程:

  1. 服务总线接收消息。
  2. Azure 函数触发并尝试通过 HTTP 将此消息传递到某些服务。
  3. 如果交付失败 - 函数抛出异常(自定义)并通过以下代码禁用主题订阅:

在此处输入图像描述

  1. 并行 ping 服务的特殊健康检查端点的另一个功能,如果它得到 200 - 它会尝试启用订阅并使流程再次工作。
  2. 该步骤可以重复N次,导致健康检查将返回200,因此点2的交付url - 4xx代码。

在下一次尝试启用订阅并传递消息后,我希望传递计数会增加,最终(在 10 次传递尝试之后)它将达到死信。 实际 - 它等于 1。

在此处输入图像描述

我假设,当我调用 CreateOrUpdate 并更改状态时,它可能会重置。如果是 - 管理订阅状态而不是 Microsoft.Azure.Management 包的其他方法是什么,以便不会重置消息传递计数?

更新:功能代码

public static class ESBTESTSubscriptionTrigger
{
    private static readonly HttpClient Client = new HttpClient();

    private static IDatabase redisCache;

    [FunctionName("ESBTESTSubscriptionTrigger")]
    [Singleton]
    public static async Task Run([ServiceBusTrigger("Notifications", "ESBTEST", AccessRights.Listen, Connection = "NotificationsBusConnectionString")]BrokeredMessage serviceBusMessage, TraceWriter log, [Inject]IKeyVaultSecretsManager keyVaultSecretsManager)
    {
        var logicAppUrl = await keyVaultSecretsManager.GetSecretAsync("NotificationsLogicAppUrl");

        if (redisCache == null)
        {
            redisCache = RedisCacheConnectionManager.GetRedisCacheConnection(
                keyVaultSecretsManager.GetSecretAsync("RedisCacheConnectionString").GetAwaiter().GetResult());
        }

        if (string.IsNullOrWhiteSpace(logicAppUrl))
        {
            log.Error("Logic App URL should be provided in Application settings of function App.");
            throw new ParameterIsMissingException("Logic App URL should be provided in Application settings of function App.");
        }

        var applicaitonId = serviceBusMessage.Properties["applicationId"].ToString();
        var eventName = serviceBusMessage.Properties.ContainsKey("Event-Name") ? serviceBusMessage.Properties["Event-Name"].ToString() : string.Empty;
        if (string.IsNullOrWhiteSpace(applicaitonId))
        {
            log.Error("ApplicationId should be present in service bus message properties.");
            throw new ParameterIsMissingException("Application id is missing in service bus message.");
        }

        Stream stream = serviceBusMessage.GetBody<Stream>();
        StreamReader reader = new StreamReader(stream);
        string s = reader.ReadToEnd();

        var content = new StringContent(s, Encoding.UTF8, "application/json");
        content.Headers.Add("ApplicationId", applicaitonId);

        HttpResponseMessage response;
        try
        {
            response = await Client.PostAsync(logicAppUrl, content);
        }
        catch (HttpRequestException e)
        {
            log.Error($"Logic App responded with {e.Message}");
            throw new LogicAppBadRequestException($"Logic App responded with {e.Message}", e);
        }

        if (!response.IsSuccessStatusCode)
        {
            log.Error($"Logic App responded with {response.StatusCode}");

            var serviceBusSubscriptionsSwitcherUrl = await keyVaultSecretsManager.GetSecretAsync("ServiceBusTopicSubscriptionSwitcherUri");
            var sbSubscriptionSwitcherResponse = await Client.SendAsync(
                                                     new HttpRequestMessage(HttpMethod.Post, serviceBusSubscriptionsSwitcherUrl)
                                                         {
                                                             Content =
                                                                 new
                                                                     StringContent(
                                                                         $"{{\"Action\":\"Disable\",\"SubscriptionName\":\"{applicaitonId}\"}}",
                                                                         Encoding.UTF8,
                                                                         "application/json")
                                                         });

            if (sbSubscriptionSwitcherResponse.IsSuccessStatusCode == false)
            {
                throw new FunctionNotAvailableException($"ServiceBusTopicSubscriptionSwitcher responded with {sbSubscriptionSwitcherResponse.StatusCode}");
            }

            throw new LogicAppBadRequestException($"Logic App responded with {response.StatusCode}");
        }

        if (!string.IsNullOrWhiteSpace(eventName))
        {
            redisCache.KeyDelete($"{applicaitonId}{eventName}DeliveryErrorEmailSent");
        }
    }
}
4

0 回答 0