1

[编辑:我重新制定并简化了我原来的问题]

我在会话中使用 Azure 服务总线主题/订阅,并且在关闭会话时遇到问题。

对于背景信息,我的应用程序从我大部分时间保持活动状态的主题订阅会话(FIFO 要求)接收数据。只有偶尔我们需要暂时“暂停”数据流。

当请求此数据流“暂停”时,我们退出订阅会话并等待被要求再次打开会话。

// pseudo code
public class Test
{
    public static async Task Me()
    {
        var client = new SubscriptionClient(
          EndPoint,
          Path,
          Name,
          TokenProvider,
          TransportType.Amqp,
          ReceiveMode.PeekLock,
          new RetryExponential(
            minimumBackoff: TimeSpan.FromSeconds(1),
            maximumBackoff: TimeSpan.FromSeconds(30),
            maximumRetryCount: 10));
            
        // Setup consumer options
        var sessionOptions = new SessionHandlerOptions(OnHandleExceptionReceived)
        {
            AutoComplete = false,
            MessageWaitTimeout = TimeSpan.FromSeconds(10),
            MaxConcurrentSessions = 1,
        };

        // Registration 1 - Start data flow
        client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);

        // Wait 1 - Artificially wait for 'data flow pause' to kick in.
        //          For the sake of this example, we artificially give plenty 
        //          of time to the message session handler to receive something
        //          and close the session.
        Task.Wait(TimeSpan.FromSeconds(30));

        // Registration 2 - Artificially 'unpause' data flow
        client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);

        // Wait 2 - Artificially wait for 'pause' to kick in again
        Task.Wait(TimeSpan.FromSeconds(30));

        // Finally close client
        await client.CloseAsync();
    }

    private static async Task OnMessageSessionAsync(IMessageSession session, Message message, CancellationToken cancellationToken)
    {
        try
        {
            await client.CompleteAsync(message.SystemProperties.LockToken);

            // Process message .. It doesn't matter what it is, 
            // just that at some point I want to break away from session
            if (bool.TryParse(message.UserProperties["SessionCompleted"] as string, out bool completed) && completed)
                await session.CloseAsync(); // <-- This never works
        }
        catch (Exception e)
        {
            Console.WriteLine("OnMessageSessionAsync exception: {0}", e);

            // Indicates a problem, unlock message in subscription.
            await client.AbandonAsync(message.SystemProperties.LockToken);
        }
    }

    private static Task OnHandleExceptionReceived(ExceptionReceivedEventArgs e)
    {
        var context = e.ExceptionReceivedContext;

        Options.Logger?.LogWarning(e.Exception, new StringBuilder()
            .AppendLine($"Message handler encountered an exception {e.Exception.GetType().Name}.")
            .AppendLine("Exception context for troubleshooting:")
            .AppendLine($" - Endpoint: {context.Endpoint}")
            .AppendLine($" - Entity Path: {context.EntityPath}")
            .Append($" - Executing Action: {context.Action}"));

        return Task.CompletedTask;
    }
}

问题 :

如前所述,我在退出会话时遇到问题,因为调用session.CloseAsync()似乎不起作用。即使我明确要求会话停止,消息也会不断出现。

  • 不能直接关闭主题会话是正常行为吗?如果是这样,为什么要公开电话session.CloseAsync()
  • 我真的可以独立于订阅连接关闭会话吗?

ps1:我的代码基于微软在 github.com 上提供的官方示例。尽管此示例基于队列会话而不是主题会话,但在我看来,行为应该相同是合乎逻辑的。

ps2:我在Microsoft.Azure.ServiceBus存储库上深入研究了可能是什么原因,我想知道属性的底层是否缺少变量初始化MessageSession.OwnsConnection..

4

0 回答 0