正如 Jambor 所说,默认行为是在函数成功完成时完成消息(参见文档),或者在函数失败时放弃。
您可以在SDK Repo中查看该类的代码来查看此行为的实现MessageProcessor
:
public virtual async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (result.Succeeded)
{
if (!MessageOptions.AutoComplete)
{
// AutoComplete is true by default, but if set to false
// we need to complete the message
cancellationToken.ThrowIfCancellationRequested();
await message.CompleteAsync();
}
}
else
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
有趣的一点:这是一个虚函数。
ServiceBusConfiguration
暴露一个MessagingProvider
属性。
如果您查看SDK RepoMessagingProvider
中默认类的代码,您会发现您可以覆盖负责创建 new 的方法:MessageProcessor
/// <summary>
/// Creates a <see cref="MessageProcessor"/> for the specified ServiceBus entity.
/// </summary>
/// <param name="entityPath">The ServiceBus entity to create a <see cref="MessageProcessor"/> for.</param>
/// <returns>The <see cref="MessageProcessor"/>.</returns>
public virtual MessageProcessor CreateMessageProcessor(string entityPath)
{
if (string.IsNullOrEmpty(entityPath))
{
throw new ArgumentNullException("entityPath");
}
return new MessageProcessor(_config.MessageOptions);
}
这个函数也是虚拟的。
现在您可以创建自己的MessagingProvider
实现MessageProcessor
:
public class CustomMessagingProvider : MessagingProvider
{
private readonly ServiceBusConfiguration _config;
public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
{
_config = config;
}
public override MessageProcessor CreateMessageProcessor(string entityPath)
{
if (string.IsNullOrEmpty(entityPath))
{
throw new ArgumentNullException("entityPath");
}
return new CustomMessageProcessor(_config.MessageOptions);
}
class CustomMessageProcessor : MessageProcessor
{
public CustomMessageProcessor(OnMessageOptions messageOptions) : base(messageOptions)
{
}
public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (!result.Succeeded)
{
cancellationToken.ThrowIfCancellationRequested();
await message.AbandonAsync();
}
}
}
}
并像这样配置您的 JobHost:
public static void Main()
{
var config = new JobHostConfiguration();
var sbConfig = new ServiceBusConfiguration
{
MessageOptions = new OnMessageOptions
{
AutoComplete = false
}
};
sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
config.UseServiceBus(sbConfig);
var host = new JobHost(config);
host.RunAndBlock();
}
这是针对此问题的技术部分...
现在,如果您没有完成您的消息,该消息将一次又一次地用于相同的功能,直到您到达MaxDeliveryCount
,然后您的消息将是死信。因此,即使将您的函数设计为幂等的,我也很确定这不是您想要的。
也许您应该多解释一下您要达到的目标?
如果您正在寻找父子队列通信(请参阅问题评论),有一篇很好的文章解释了如何使用 ASB 设计工作流:
否则,您可以查看Defer
BrokerMessage 对象上的方法:
表示接收方希望推迟处理此消息。
它将允许您保留父消息,直到处理子消息。