2

我有以下形式的服务总线触发功能:

public static void ProcessJobs([ServiceBusTrigger("topicname", "subscriptionname", AccessRights.Listen)] BrokeredMessage input, [ServiceBus("topic2name", "subscription2name", AccessRights.Send)] out BrokeredMessage output)
{
    output = new BrokeredMessage(input.GetBody<String>());
}

我的用例是这个函数只是从一个主题中获取消息并将它们推送到另一个主题。我不想在此过程中从源主题中删除消息。

这有可能实现吗?

此外,我在哪里可以找到有关AccessRights以及它们如何影响消息访问的更多信息。例如:在上面的示例中,我使用AccessRights.Listen从输入主题获取消息,但是一旦对这些消息进行函数调用,它似乎仍然在“删除”消息。

4

2 回答 2

3

正如 Jambor 所说,默认行为是在函数成功完成时完成消息(参见文档),或者在函数失败时放弃。

您可以在SDK Repo中查看该类的代码来查看此行为的实现MessageProcessor

public virtual async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
{
    if (result.Succeeded)
    {
        if (!MessageOptions.AutoComplete)
        {
            // AutoComplete is true by default, but if set to false
            // we need to complete the message
            cancellationToken.ThrowIfCancellationRequested();
            await message.CompleteAsync();
        }
    }
    else
    {
        cancellationToken.ThrowIfCancellationRequested();
        await message.AbandonAsync();
    }
}

有趣的一点:这是一个虚函数。

ServiceBusConfiguration暴露一个MessagingProvider属性。

如果您查看SDK RepoMessagingProvider中默认类的代码,您会发现您可以覆盖负责创建 new 的方法:MessageProcessor

/// <summary>
/// Creates a <see cref="MessageProcessor"/> for the specified ServiceBus entity.
/// </summary>
/// <param name="entityPath">The ServiceBus entity to create a <see cref="MessageProcessor"/> for.</param>
/// <returns>The <see cref="MessageProcessor"/>.</returns>
public virtual MessageProcessor CreateMessageProcessor(string entityPath)
{
    if (string.IsNullOrEmpty(entityPath))
    {
        throw new ArgumentNullException("entityPath");
    }
    return new MessageProcessor(_config.MessageOptions);
}

这个函数也是虚拟的。

现在您可以创建自己的MessagingProvider实现MessageProcessor

public class CustomMessagingProvider : MessagingProvider
{
    private readonly ServiceBusConfiguration _config;

    public CustomMessagingProvider(ServiceBusConfiguration config) : base(config)
    {
        _config = config;
    }

    public override MessageProcessor CreateMessageProcessor(string entityPath)
    {
        if (string.IsNullOrEmpty(entityPath))
        {
            throw new ArgumentNullException("entityPath");
        }
        return new CustomMessageProcessor(_config.MessageOptions);
    }

    class CustomMessageProcessor : MessageProcessor
    {
        public CustomMessageProcessor(OnMessageOptions messageOptions) : base(messageOptions)
        {
        }

        public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken)
        {
            if (!result.Succeeded)
            {
                cancellationToken.ThrowIfCancellationRequested();
                await message.AbandonAsync();
            }
        }
    }
}

并像这样配置您的 JobHost:

public static void Main()
{
    var config = new JobHostConfiguration();
    var sbConfig = new ServiceBusConfiguration
    {
        MessageOptions = new OnMessageOptions
        {
            AutoComplete = false
        }
    };
    sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig);
    config.UseServiceBus(sbConfig);
    var host = new JobHost(config);
    host.RunAndBlock();
}

这是针对此问题的技术部分...

现在,如果您没有完成您的消息,该消息将一次又一次地用于相同的功能,直到您到达MaxDeliveryCount,然后您的消息将是死信。因此,即使将您的函数设计为幂等的,我也很确定这不是您想要的。

也许您应该多解释一下您要达到的目标?

如果您正在寻找父子队列通信(请参阅问题评论),有一篇很好的文章解释了如何使用 ASB 设计工作流:

否则,您可以查看DeferBrokerMessage 对象上的方法:

表示接收方希望推迟处理此消息。

它将允许您保留父消息,直到处理子消息。

于 2016-09-29T08:48:13.860 回答
0

我不想在此过程中从源主题中删除消息。

这个文档我们可以知道,ServiceBusTrigger 会在触发完成后调用完成函数。这是默认模式。这是那篇文章的片段:

SDK 在 PeekLock 模式下接收消息,如果函数执行成功,则对该消息调用 Complete,如果函数失败,则调用 Abandon。如果函数运行时间超过 PeekLock 超时时间,则会自动更新锁。

于 2016-09-28T09:44:34.677 回答