[编辑:我重新制定并简化了我原来的问题]
我在会话中使用 Azure 服务总线主题/订阅,并且在关闭会话时遇到问题。
对于背景信息,我的应用程序从我大部分时间保持活动状态的主题订阅会话(FIFO 要求)接收数据。只有偶尔我们需要暂时“暂停”数据流。
当请求此数据流“暂停”时,我们退出订阅会话并等待被要求再次打开会话。
// pseudo code
public class Test
{
public static async Task Me()
{
var client = new SubscriptionClient(
EndPoint,
Path,
Name,
TokenProvider,
TransportType.Amqp,
ReceiveMode.PeekLock,
new RetryExponential(
minimumBackoff: TimeSpan.FromSeconds(1),
maximumBackoff: TimeSpan.FromSeconds(30),
maximumRetryCount: 10));
// Setup consumer options
var sessionOptions = new SessionHandlerOptions(OnHandleExceptionReceived)
{
AutoComplete = false,
MessageWaitTimeout = TimeSpan.FromSeconds(10),
MaxConcurrentSessions = 1,
};
// Registration 1 - Start data flow
client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);
// Wait 1 - Artificially wait for 'data flow pause' to kick in.
// For the sake of this example, we artificially give plenty
// of time to the message session handler to receive something
// and close the session.
Task.Wait(TimeSpan.FromSeconds(30));
// Registration 2 - Artificially 'unpause' data flow
client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);
// Wait 2 - Artificially wait for 'pause' to kick in again
Task.Wait(TimeSpan.FromSeconds(30));
// Finally close client
await client.CloseAsync();
}
private static async Task OnMessageSessionAsync(IMessageSession session, Message message, CancellationToken cancellationToken)
{
try
{
await client.CompleteAsync(message.SystemProperties.LockToken);
// Process message .. It doesn't matter what it is,
// just that at some point I want to break away from session
if (bool.TryParse(message.UserProperties["SessionCompleted"] as string, out bool completed) && completed)
await session.CloseAsync(); // <-- This never works
}
catch (Exception e)
{
Console.WriteLine("OnMessageSessionAsync exception: {0}", e);
// Indicates a problem, unlock message in subscription.
await client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task OnHandleExceptionReceived(ExceptionReceivedEventArgs e)
{
var context = e.ExceptionReceivedContext;
Options.Logger?.LogWarning(e.Exception, new StringBuilder()
.AppendLine($"Message handler encountered an exception {e.Exception.GetType().Name}.")
.AppendLine("Exception context for troubleshooting:")
.AppendLine($" - Endpoint: {context.Endpoint}")
.AppendLine($" - Entity Path: {context.EntityPath}")
.Append($" - Executing Action: {context.Action}"));
return Task.CompletedTask;
}
}
问题 :
如前所述,我在退出会话时遇到问题,因为调用session.CloseAsync()
似乎不起作用。即使我明确要求会话停止,消息也会不断出现。
- 不能直接关闭主题会话是正常行为吗?如果是这样,为什么要公开电话
session.CloseAsync()
? - 我真的可以独立于订阅连接关闭会话吗?
ps1:我的代码基于微软在 github.com 上提供的官方示例。尽管此示例基于队列会话而不是主题会话,但在我看来,行为应该相同是合乎逻辑的。
ps2:我在Microsoft.Azure.ServiceBus存储库上深入研究了可能是什么原因,我想知道属性的底层是否缺少变量初始化MessageSession.OwnsConnection
..