1

我的团队一直在尝试让我们的一些 WebJobs自定义队列处理器具有单例行为,但我们还没有真正通过[Singleton(Mode = SingletonMode.Listener)]属性获得这种行为,也没有通过设置来获得这种行为QueueProcessorFactoryContext.BatchSize = 1。这导致每晚的进程一次猛击数据库——其中许多超时——并且已经变得有点令人头疼。

这或多或少是我们的 CustomQueueProcessorFactory 的样子:

    public class CustomQueueProcessorFactory : IQueueProcessorFactory
    {
        public QueueProcessor Create(QueueProcessorFactoryContext context)
        {
            if (context == null)
                throw new ArgumentNullException(nameof(context));

            if (context.Queue.Name == Constants.UploadQueueName 
                || context.Queue.Name == Constants.BuildQueueName)
            {
                context.BatchSize = 1;
            }

            return new QueueProcessor(context);
        }
    }

这是在配置我们的 JobHost 时引用的:

    var config = new JobHostConfiguration();
    config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();

我们还用QueueTriggers 设置了一些函数,例如:

    public static async Task ExecuteBackgroundRequest([QueueTrigger(Constants.BackgroundQueueName)] BackgroundRequest background, TextWriter logger)
    {
        await ExecuteRequest(background, logger);
    }

    [Singleton(Mode = SingletonMode.Listener)]
    public static async Task ExecuteUploadRequest([QueueTrigger(Constants.UploadQueueName)] BackgroundRequest background, TextWriter logger)
    {
        await ExecuteRequest(background, logger);
    }

    [Singleton(Mode = SingletonMode.Listener)]
    public static async Task ExecuteBuildRequest([QueueTrigger(Constants.BuildQueueName)] BackgroundRequest background, TextWriter logger)
    {
        await ExecuteRequest(background, logger);
    }

我们正在使用包Microsoft.Azure.WebJobs(+ .Core, .Extensions) v2.0.0 和WindowsAzure.Storagev8.0.0,它们有些过时了,所以我一直在探索的一个潜在解决方案是更新 WebJobs 的最新稳定版本 (v3.0.4)。这打开了一个全新的蠕虫罐,因为配置已经完全重做,所有的类都被移动了。文档似乎稀疏/分散,所以我还没有确定在哪里(或者即使)我可以在每个 QueueProcessor 的基础上自定义属性,比如将某些队列的 BatchSize 设置为 1,而将其他队列设置为更高。

是否有一些 WebJobs 版本可以使用上述 CustomQueueProcessorFactory 逻辑来限制 BatchSize?或者 Singleton 属性实际上会确保一次只有一个后台进程访问特定队列?可以在最新版本的 WebJobs 中配置 QueueProcessorFactories 吗?

对任何这些问题的帮助将不胜感激!

4

2 回答 2

1

WebJobs SDK 通过其 SingletonAttribute 促进常见的分布式锁定方案。您可以简单地将 SingletonAttribute 应用于作业函数,以确保该函数的所有调用都将被序列化,即使跨扩展实例也是如此。如果您的函数需要访问其他分布式资源或执行不应该/不能同时执行的其他操作,这将非常有用。

[Singleton]
public static async Task ProcessImage([BlobTrigger("images")] Stream image)
{
     // Process the image
}

就像在这个例子中一样,只有一个 ProcessImage 函数实例将在任何给定时间运行。当函数被添加到图像容器的新图像触发时,运行时将首先尝试获取锁(blob 租约)。一旦获得锁,就会在函数执行期间持有锁(并且更新 blob 租约),确保不会运行其他实例。如果在此函数运行时触发了另一个函数实例,它将等待锁,并定期轮询它。

Singleton 在幕后使用Azure Blob Leases来实现分布式锁定。管理 blob 租约、租约续订等的所有复杂性都由 SDK 处理。

单例锁的详细信息也显示在 WebJobs Dashboard 中,包括正在进行的函数执行的当前锁状态,以及函数在获取锁之前等待锁的时间。您可以使用这些详细信息来查看和管理锁争用。

在此处输入图像描述

非并发场景将需要使用 Singleton。一些触发器通过其配置设置对并发管理具有固有的支持。在这种情况下,使用内置支持可能会更有效。您可以使用这些设置来确保您的函数在单个实例上运行单例。为确保函数的单个实例在横向扩展的实例中运行,此外,您可以在函数上应用侦听器级别的单例锁(例如 [Singleton(Mode = SingletonMode.Listener)])。一些触发器的配置旋钮是:

QueueTrigger - 您可以将 JobHostConfiguration.Queues.BatchSize 设置为 1 ServiceBusTrigger - 您可以将 ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls 设置为 1 FileTrigger - 您可以将 FileProcessor.MaxDegreeOfParallelism 设置为 1 但是,对于本质上不支持并发控制的触发器,或者如果您想要通过 Singleton 范围进行更高级的锁定(见下文),Singleton 是正确的方法。

我还要说使用双锁实现您的 CustomQueueProcessorFactory 如下所示:-

 if (_instance == null)
                {
                    lock (SyncObject)
                    {
                        if (_instance == null)
                        {
                            _instance = new CustomQueueProcessor();
                        }
                    }
                }
                return _instance;

于 2019-02-06T09:29:34.520 回答
0

因此,如果我理解正确,即使在横向扩展之后,您也希望多个函数按顺序处理它们的触发消息。我对吗?

我认为您可以使用 SingletonAttribute 和范围参数来强制执行此操作:

public static async Task ExecuteBackgroundRequest([QueueTrigger(Constants.BackgroundQueueName)] BackgroundRequest background, TextWriter logger)
{
    await ExecuteRequest(background, logger);
}

[Singleton("DatabaseSync", SingletonScope.Host)]
public static async Task ExecuteUploadRequest([QueueTrigger(Constants.UploadQueueName)] BackgroundRequest background, TextWriter logger)
{
    await ExecuteRequest(background, logger);
}

[Singleton("DatabaseSync", SingletonScope.Host)]
public static async Task ExecuteBuildRequest(
    [QueueTrigger(Constants.BuildQueueName)] BackgroundRequest background, TextWriter logger)
{
    await ExecuteRequest(background, logger);
}
于 2020-10-30T11:29:48.873 回答