0

将以下代码复制粘贴到新的 C# 控制台应用程序中。

class Program
{
    static void Main(string[] args)
    {
        var enumerator = new QueuedEnumerator<long>();
        var listenerWaitHandle = Listener(enumerator);

        Publisher(enumerator);
        listenerWaitHandle.WaitOne();
    }

    private static AutoResetEvent Listener(IEnumerator<long> items)
    {
        var @event = new AutoResetEvent(false);
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (items.MoveNext())
            {
                Console.WriteLine("Received : " + items.Current);
                Thread.Sleep(2 * 1000);
            }
            (o as AutoResetEvent).Set();
        }, @event);
        return @event;
    }

    private static void Publisher(QueuedEnumerator<long> enumerator)
    {
        for (int i = 0; i < 10; i++)
        {
            enumerator.Set(i);
            Console.WriteLine("Sended : " + i);
            Thread.Sleep(1 * 1000);
        }
        enumerator.Finish();
    }

    class QueuedEnumerator<T> : IEnumerator<T>
    {
        private Queue _internal = Queue.Synchronized(new Queue());
        private T _current;
        private bool _finished;
        private AutoResetEvent _setted = new AutoResetEvent(false);

        public void Finish()
        {
            _finished = true;
            _setted.Set();
        }

        public void Set(T item)
        {
            if (_internal.Count > 3)
            {
                Console.WriteLine("I'm full, give the listener some slack !");
                Thread.Sleep(3 * 1000);
                Set(item);
            }
            else
            {
                _internal.Enqueue(item);
                _setted.Set();
            }
        }

        public T Current
        {
            get { return _current; }
        }

        public void Dispose()
        {
        }


        object System.Collections.IEnumerator.Current
        {
            get { return _current; }
        }

        public bool MoveNext()
        {
            if (_finished && _internal.Count == 0)
                return false;
            else if (_internal.Count > 0)
            {
                _current = (T)_internal.Dequeue();
                return true;
            }
            else
            {
                _setted.WaitOne();
                return MoveNext();
            }
        }

        public void Reset()
        {
        }
    }
}

2 线程 (A,B)

一个线程可以一次提供一个实例并调用Set方法B线程想要接收一系列实例(由线程A提供)

所以从字面上将 Add(item), Add(item), .. 转换为不同线程之间的 IEnumerable

当然也欢迎其他解决方案!

4

1 回答 1

1

当然 - 这段代码可能不是最好的方法,但这是我最初的尝试:

Subject<Item> toAddObservable;
ListObservable<Item> buffer;

void Init()
{
    // Subjects are an IObservable we can trigger by-hand, they're the 
    // mutable variables of Rx
    toAddObservable = new Subject(Scheduler.TaskPool);

    // ListObservable will hold all our items until someone asks for them
    // It will yield exactly *one* item, but only when toAddObservable
    // is completed.
    buffer = new ListObservable<Item>(toAddObservable);
}

void Add(Item to_add)
{
    lock (this) {
        // Subjects themselves are thread-safe, but we still need the lock
        // to protect against the reset in FetchResults
        ToAddOnAnotherThread.OnNext(to_add);
    }
}

IEnumerable<Item> FetchResults()
{
    IEnumerable<Item> ret = null;
    buffer.Subscribe(x => ret = x);

    lock (this) {
        toAddObservable.OnCompleted();
        Init();     // Recreate everything
    }

    return ret;
}
于 2010-08-24T03:36:48.283 回答