我现在得到的是一个每 5000 毫秒触发一次的计时器:

static Timer _aTimer = new System.Timers.Timer();

    static void Main(string[] args)
        _aTimer.Elapsed += new ElapsedEventHandler(OnTimedEvent);

        _aTimer.Interval = 5000;
        _aTimer.Enabled = true;

        Console.WriteLine("Press \'q\' to quit the sample.");
        while (Console.Read() != 'q') ;



        private static void OnTimedEvent(object source, ElapsedEventArgs e)
        // stop the timer so we dont reprocess files we already have in the queue

        // setup a list of queues
        var lists = new List<IncomingOrderQueue>();
        //get the accounts in which the files we are looking in
        var accounts = new List<string>() { "Account1", "Account2" };
        //loop through the accounts and set up the queue 
        foreach (var acc in accounts)
            // create the queue
            var tmp = new IncomingOrderQueue();
            // for each file in the folders add it to be processed in the queue
            foreach (var orderFile in OrderFiles(acc))
                tmp.EnqueueSweep(new QueueVariables() { Account = acc, File = orderFile });
            // add the queue to the list of queues
        // for each of the queues consume all the contents of them
        Parallel.ForEach(lists, l => l.Consume());

        //start the timer back up again because we have finished all the files we have in the current queue

        public static void StopTimer()
        Console.WriteLine("Stop Timer");
        _aTimer.Enabled = false;

    public static void StartTimer()
        Console.WriteLine("Start Timer");
        _aTimer.Enabled = true;


 public class IncomingOrderQueue 
    BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>();

    public void EnqueueSweep(QueueVariables incoming)
        // add items to the queue

    public void Consume()
        // stop anything been adding to the queue
        // consume all the objects in the blocking collection
        Parallel.ForEach(_orderQ.GetConsumingEnumerable(), Processor.Order.Object);

    public int QueueCount
            return _orderQ.Count;


我不禁认为有更好的方法来做我正在做的事情,特别是当将为帐户创建的队列数量为 200 - 400 时。



1 回答 1



我也可能从一个开始BlockingCollection,直到分析表明我需要另一个。根据生产者和消费者的相对速度,您可能需要调整他们的数量。如果它们是 IO 绑定的,它们应该是异步的,你可以有很多,如果它们是 CPU 绑定的,你可能不需要超过可用处理器的数量。

假设 IO 绑定的生产者和消费者,我重做了你的例子,希望它能给你一些想法。它以 10 秒的间隔触发生产者,并且可以继续运行,直到您通过CanellationToken. 只有在您取消并完成生产后,您CompleteAdding才能释放被阻止的消费者。

public class QueueVariables
    public string Account {get;set;}
    public string File {get;set;}

public static ConcurrentQueue<string> GetACcounts()
    return new ConcurrentQueue<string>(new []

public static List<string> GetFiles(string acct)
    return new List<string>

public static async Task StartPeriodicProducers(int numProducers, TimeSpan period, CancellationToken ct)
        var producers = StartProducers(numProducers, ct);

        // wait for production to finish
        await Task.WhenAll(producers.ToArray());

        // wait before running again
        Console.WriteLine("***Waiting " + period);
        await Task.Delay(period, ct);

public static List<Task> StartProducers(int numProducers, CancellationToken ct)
    List<Task> producingTasks = new List<Task>();
    var accounts = GetACcounts();

    for (int i = 0; i < numProducers; i++)
        producingTasks.Add(Task.Run(async () =>
            string acct;
            while(accounts.TryDequeue(out acct) && !ct.IsCancellationRequested)
                foreach (var file in GetFiles(acct))
                    _orderQ.Add(new UserQuery.QueueVariables{ Account = acct, File = file });
                    Console.WriteLine("Produced Account:{0} File:{1}", acct, file);
                    await Task.Delay(50, ct); // simulate production delay

            Console.WriteLine("Finished producing");

    return producingTasks;

public static List<Task> StartConsumers(int numConsumers)
    List<Task> consumingTasks = new List<Task>();

    for (int j = 0; j < numConsumers; j++)
        consumingTasks.Add(Task.Run(async () =>
                    var queueVar = _orderQ.Take();
                    Console.WriteLine("Consumed Account:{0} File:{1}", queueVar.Account, queueVar.File);
                    await Task.Delay(200); // simulate consumption delay
                Console.WriteLine("Finished Consuming");

    return consumingTasks;

private static async Task MainAsync()
    CancellationTokenSource cts = new CancellationTokenSource();
    var periodicProducers = StartPeriodicProducers(2, TimeSpan.FromSeconds(10), cts.Token);
    var consumingTasks = StartConsumers(4);

    await Task.Delay(TimeSpan.FromSeconds(120));

    // stop production

        // wait for producers to finish producing
        await periodicProducers;
        // operation was cancelled

    // complete adding to release blocked consumers

    // wait for consumers to finish consuming
    await Task.WhenAll(consumingTasks.ToArray());

// maximum size 10, after that capaicity is reached the producers block
private static BlockingCollection<QueueVariables> _orderQ = new BlockingCollection<QueueVariables>(10);

void Main()

// Define other methods and classes here
于 2015-05-30T01:18:20.190 回答