13

我有一种情况,我必须在 Web API 方法中接收请求,将这些请求排队,然后将批量发送到数据库(Solr 实例)。

我不太确定如何维护来自多个来源的一批请求。现在我正在将每个请求数据以 json 格式写入磁盘上的文件,稍后我将有一个 windows 服务,通过文件夹读取所有文件,更新数据库并删除这些文件。

这是我在我的 Web API 中所做的

public void Post(LogEntry value)
{
    value.EventID = Guid.NewGuid();
    value.ServerTime = DateTime.UtcNow;
    string json = JsonConvert.SerializeObject(value);
    using(StreamWriter sw = new StreamWriter(value.EventID.ToString()))
    {
        sw.Write(json);
    }
}

(这里EventID是 GUID)

这个过程看起来不对,一定有办法维护一个请求队列,但我不太确定如何在多个请求期间维护一个队列。

我这样做的原因是,在 solr 实例中批量插入比通过 SolrNet 插入单个记录要快。我期望在 Web API 上每秒至少收到 100 个请求。我想创建一批 1000 个请求并每 10 秒更新一次 solr 实例。请不要认为我需要代码,只需要知道我应该采用什么策略来维护请求/状态队列。

4

6 回答 6

20

如果您使用的是 .NET 4.0 或更高版本,则可以使用并发队列:

并发队列 (MSDN)

这是一种使用队列的线程安全方式,然后可以在所需时间访问该队列。

编辑:

例子:

这将是队列的包装器:

public static class RequestQueue
{
    private static ConcurrentQueue<int> _queue;

    public static ConcurrentQueue<int> Queue
    {
        get
        {
            if (_queue == null)
            {
                _queue = new ConcurrentQueue<int>();
            }

            return _queue;
        }
    }

}

然后你可以像这样设置你的 web api(为了简洁起见,这个例子存储了整数):

public class ValuesController : ApiController
{        
    public string Get()
    {
        var sb = new StringBuilder();
        foreach (var item in RequestQueue.Queue)
        {
            sb.Append(item.ToString());
        }

        return sb.ToString();
    }

    public void Post(int id)
    {
        RequestQueue.Queue.Enqueue(id);
    }        
}

如果你使用这个例子,你会看到队列包含多个请求的值。但是,由于它存在于内存中,如果应用程序池被回收(例如),那些排队的项目将会消失。

现在,您可以检查队列何时包含 10 个项目,然后将这些项目保存到数据库,同时创建另一个队列来存储传入值。

像这样:

public static class RequestQueue
{
    private static ConcurrentQueue<int> _queue;

    public static ConcurrentQueue<int> Queue
    {
        get
        {
            if (_queue == null)
            {
                _queue = new ConcurrentQueue<int>();
            }

            if (_queue.Count >= 10)
            {
                SaveToDB(_queue);
                _queue = new ConcurrentQueue<int>();
            }

            return _queue;
        }
    }

    public static void SaveToDB(ConcurrentQueue<int> queue)
    {
        foreach (var item in queue)
        {
            SaveItemToDB(item);
        }
    }
}

您需要稍微清理一下,但是这个设置应该可以工作。此外,您可能需要一些锁定机制来将队列转储到数据库并创建新实例。我会编写一个带有多个线程的控制台应用程序,这些线程可以访问这个队列来测试它。

于 2013-10-18T15:16:48.663 回答
5

这是使用MSMQ的一个很好的场景。对于每个请求,只需将项目发布到 MSMQ 队列。在同一个 webapp 或任何其他应用程序中,只需从队列中读取多个项目并将其发布到 solr。无论您的应用程序崩溃或被回收,MSMQ 都会安全地保存您的数据,供您以后检索。

MSMQ 是健壮、可靠和可扩展的。它非常适合您的问题。

于 2013-10-26T17:36:39.410 回答
0

另一种解决方案可能是将记录保存在与 WebApi 不在同一进程中的内存队列中。例如:MemcacheQueue https://github.com/coderrr/memcache_queue

其中一些队列实现具有持久性功能,因此在任何情况下都不会丢失数据。

于 2013-10-21T23:11:00.603 回答
0

您可以将请求排入内存中的队列,并且可以使用 Quartz .Net 定期将它们发送到数据库。您可以简单地在 Global.asax.cs 中进行如下操作:

public class RequestQueue
{
    private readonly Queue<HttpRequest> _requestHistory; 
    private RequestQueue()
    {
        _requestHistory = new Queue<HttpRequest>();
    }
    private static RequestQueue _singleton;

    public static RequestQueue Instance()
    {
        if (_singleton == null)
            _singleton = new RequestQueue();
        return _singleton;
    }

    public void Enqueue(HttpRequest request)
    {
        _requestHistory.Enqueue(request);
    }

    public void Flush()
    {
        while (_requestHistory.Count > 0)
        {
            var request = _requestHistory.Dequeue();
            try
            {
                //Write request To Db
            }
            catch (Exception)
            {
                _requestHistory.Enqueue(request);
            }
        }
    }
}

public class WebApiApplication : System.Web.HttpApplication
{
    public WebApiApplication()
    {
        base.BeginRequest += delegate
            {
                RequestQueue.Instance().Enqueue(HttpContext.Current.Request);
            };
    }

    private void InitializeQuartz()
    {
        ISchedulerFactory sf = new StdSchedulerFactory();
        IScheduler sched = sf.GetScheduler();

        DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.UtcNow);
        DateTimeOffset startTime = DateBuilder.NextGivenSecondDate(null, 5);

        IJobDetail job = JobBuilder.Create<QueueConsumer>()
            .WithIdentity("job1", "group1")
            .Build();
        ITrigger trigger = TriggerBuilder.Create()
            .WithIdentity("trigger1", "group1")
            .StartAt(runTime)
            .WithCronSchedule("5 0/1 * * * ?")
            .Build();

        sched.ScheduleJob(job, trigger);

        sched.Start();
    }

    public class QueueConsumer : IJob
    {
        public void Execute(IJobExecutionContext context)
        {
            RequestQueue.Instance().Flush();
        }
    }

    protected void Application_Start()
    {
        InitializeQuartz();
于 2013-10-21T14:22:56.003 回答
0
public class ThresholdBuffer<T>
{
    private ConcurrentBag<T> _buffer;

    private int _threshold;

    public ThresholdBuffer(int threshold)
    {
        _threshold = threshold;
        _buffer = new ConcurrentBag<T>();
    }

    public void Add(T item)
    {
        _buffer.Add(item);

        if(_buffer.Count >= _threshold)
        {
            Recycle();
        }
    }

    public void Recycle()
    {
        var value = Interlocked.Exchange<ConcurrentBag<T>>(ref _buffer, new ConcurrentBag<T>());
//flush value 
    }
}
  1. 创建刷新逻辑
  2. 在 Application_Start(Global.asax) 事件中创建 ThresholdBuffer 并将其存储在应用程序、静态字段等中
  3. 调用添加方法
  4. 在 Application_End 手动调用 Recycle

您可以在 Recycle 中添加锁定逻辑,以防止创建多个 ConcurrentBag 并刷新几乎空的包。但我的观点是,这不如锁那么邪恶。

更新。无需额外创建 ConcurrentBag 即可释放锁定

public class ThresholdBuffer<T>
{
    private ConcurrentBag<T> _buffer;

    private int _copacity;

    private int _threshold;

    public ThresholdBuffer(int threshold)
    {
        _threshold = threshold;
        _copacity = 0;
        _buffer = new ConcurrentBag<T>();
    }

    public void Add(T item)
    {
        _buffer.Add(item);
        if (Interlocked.Increment(ref _copacity) == _threshold)
        {
            Recycle();
        }
    }

    public void Recycle()
    {
        var value4flasshing = Interlocked.Exchange<ConcurrentBag<T>>(ref _buffer, new ConcurrentBag<T>());
        Thread.VolatileWrite(ref _copacity, 0);
    }
}

ps 你可以使用任何 ConcurrentCollection 代替 ConcurrentBag

于 2013-10-26T16:36:38.430 回答
0

您应该尝试实现NServiceBus具有调度消息和在未来发送消息的能力,从服务总线文档中您可以调度能力您可以调度一个任务或一个动作/lambda,以在给定的时间间隔内重复执行。

这意味着您可以拥有一个内存缓存并每隔 10 分钟将数组的内容写入您的 solr/lucene impl,这很容易:

Schedule.Every(TimeSpan.FromMinutes(10)).Action(() => { < task to be executed > })

如果您需要更大的灵活性来设置调度程序,您可以将其集成到quartz.net

情况应如下:

  • WCF 作为 Windows 服务和 NServiceBus 应该共享相同的上下文或实现可以在系统的这两个不同部分之间共享的 cacheManager。
  • 每次你发出请求时,你调用你的 wcf 传递参数并在你的 wcf 中添加一个内存行到数组(它可能是一个字符串数组,具有与你正在写入磁盘的相同 json 值)
  • ServiceBus 将处理管理数组操作的队列,并避免数组操作的任何冲突,例如:

    • 将项目添加到数组
    • 清空数组
    • 写入您的数据库
于 2013-10-22T15:07:09.457 回答