0

当使用 StartNew() 方法在新线程上启动进程时,我需要弄清楚如何在同一个线程中对该对象进行另一个调用(我假设这将是某种 Join 操作?)。

以下示例被简化以说明我正在尝试做的事情。我很清楚它严重缺乏基本的并发考虑。但是我不想用所有这些逻辑来混淆代码,所以请原谅我。

以下控制台应用程序显示了我要完成的工作。假设在 StartNew() 调用中创建了一个 ID 为 9976 的新线程并在那里调用了该方法。我希望在文件系统观察程序更改事件处理程序中对 ProcessImmediate() 的后续调用也可以在线程 9976 上进行。就目前而言,该调用将共享用于文件系统观察程序更改事件的同一线程。

可以做到这一点,如果可以,怎么做?

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var runner = new Runner();
            runner.Run();

            Console.ReadKey();
        }
    }

    public class Runner
    {
        private Activity _activity = null;
        private FileSystemWatcher _fileSystemWatcher;

        public void Run()
        {
            _activity = new Activity();

            // start activity on a new thread
            Task.Factory.StartNew(() => _activity.Go());

            _fileSystemWatcher = new FileSystemWatcher();
            _fileSystemWatcher.Filter = "*.watcher";
            _fileSystemWatcher.Path = "c:\temp";
            _fileSystemWatcher.Changed += FileSystemWatcher_Changed;
            _fileSystemWatcher.EnableRaisingEvents = true;
        }

        private void FileSystemWatcher_Changed(object sender, FileSystemEventArgs e)
        {
            // WANT TO CALL THIS FOR ACTIVITY RUNNING ON PREVIOUSLY CALLED THREAD
            _activity.ProcessImmediate();
        }
    }

    public class Activity
    {
        public void Go()
        {
            while (!Stop)
            {
                // for purposes of this example, magically assume that ProcessImmediate has not been called when this is called
                DoSomethingInteresting();

                System.Threading.Thread.Sleep(2000);
            }
        }
        protected virtual void DoSomethingInteresting() { }

        public void ProcessImmediate()
        {
            // for purposes of this example, assume that Go is magically in its sleep state when ProcessImmediate is called
            DoSomethingInteresting();
        }

        public bool Stop { get; set; }
    }
}

* 更新 *

感谢您的出色回复。我接受了 Mike 的建议并为我的控制台应用程序实施了它。下面是完整的工作代码,其中还包括使用取消令牌。我发布这个以防其他人发现它有用。

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var runner = new Runner();
            runner.Run();
            Console.ReadKey();
            runner.Stop();
            Console.ReadKey();
        }
    }

    public class Runner
    {
        private Activity _activity = null;
        private FileSystemWatcher _fileSystemWatcher;
        private CancellationTokenSource _cts = new CancellationTokenSource();

        public void Stop() { _cts.Cancel(); }

        public void Run()
        {
            _activity = new Activity();

            // start activity on a new thread
            var task = new Task(() => _activity.Go(_cts.Token), _cts.Token, TaskCreationOptions.LongRunning);
            task.Start();

            _fileSystemWatcher = new FileSystemWatcher();
            _fileSystemWatcher.Filter = "*.watcher";
            _fileSystemWatcher.Path = "C:\\Temp\\FileSystemWatcherPath";
            _fileSystemWatcher.Changed += FileSystemWatcher_Changed;
            _fileSystemWatcher.EnableRaisingEvents = true;
        }

        private void FileSystemWatcher_Changed(object sender, FileSystemEventArgs e)
        {
            // WANT TO CALL THIS FOR ACTIVITY RUNNING ON PREVIOUSLY CALLED THREAD
            _activity.ProcessImmediate();
        }
    }

    public class Activity : IDisposable
    {
        private AutoResetEvent _processing = new AutoResetEvent(false);

        public void Go(CancellationToken ct)
        {
            Thread.CurrentThread.Name = "Go";

            while (!ct.IsCancellationRequested)
            {
                // for purposes of this example, magically assume that ProcessImmediate has not been called when this is called
                DoSomethingInteresting();
                _processing.WaitOne(5000);
            }

            Console.WriteLine("Exiting");
        }
        protected virtual void DoSomethingInteresting()
        {
            Console.WriteLine(string.Format("Doing Something Interesting on thread {0}", Thread.CurrentThread.ManagedThreadId));
        }

        public void ProcessImmediate()
        {
            // for purposes of this example, assume that Go is magically in its sleep state when ProcessImmediate is called
            _processing.Set();
        }

        public void Dispose()
        {
            if (_processing != null)
            {
                _processing.Dispose();
                _processing = null;
            }
        }
    }
}
4

3 回答 3

1

首先,TaskCreationOptions.LongRunning如果您正在创建一个无法快速完成的任务,您应该使用它。其次,使用 anAutoResetEvent来通知等待线程唤醒。请注意,下面ProcessImmediateDoSomethingInteresting在另一个线程上完成运行之前返回。例子:

using System.Threading;

public class Activity : IDisposable
{
    private AutoResetEvent _processing = new AutoResetEvent(false); 

    public void Go()
    {
        while (!Stop)
        {
            // for purposes of this example, magically assume that ProcessImmediate has not been called when this is called
            DoSomethingInteresting();

            _processing.WaitOne(2000);
        }
    }
    protected virtual void DoSomethingInteresting() { }

    public void ProcessImmediate()
    {
        _processing.Set();
    }

    public bool Stop { get; set; }

    public void Dispose() 
    {
        if (_processing != null)
        {
            _processing.Dispose();
            _processing = null;
        }
    }
}
于 2013-09-25T19:15:16.233 回答
1

用户 mike 给出了一个更好的解决方案,当您想立即调用相同的方法时,这将是合适的。如果您想立即调用不同的方法,我将扩展 mike 的答案来实现这一点。

using System.Threading;

public class Activity : IDisposable
{
    private AutoResetEvent _processing = new AutoResetEvent(false);
    private ConcurrentQueue<Action> actionsToProcess = new ConcurrentQueue<Action>();

    public void Go()
    {
        while (!Stop)
        {
            // for purposes of this example, magically assume that ProcessImmediate has not been called when this is called
            DoSomethingInteresting();

            _processing.WaitOne(2000);
             while(!actionsToProcess.IsEmpty)
             {
                 Action action;
                 if(actionsToProcess.TryDeque(out action))
                     action();
             }
        }
    }
    protected virtual void DoSomethingInteresting() { }

    public void ProcessImmediate(Action action)
    {
        actionsToProcess.Enqueue(action);
        _processing.Set();
    }

    public bool Stop { get; set; }

    public void Dispose() 
    {
        if (_processing != null)
        {
            _processing.Dispose();
            _processing = null;
        }
    }
}
于 2013-09-25T19:30:10.787 回答
0

要在同一线程上执行不同的方法,您可以使用分派传入请求的消息循环。一个简单的选择是使用响应式扩展的事件循环调度程序并“递归”调度您的 Go() 函数 - 如果同时安排了不同的操作,它将在下一个 Go() 操作之前处理。

这是一个示例:

class Loop
    : IDisposable
{
    IScheduler scheduler = new EventLoopScheduler();
    MultipleAssignmentDisposable stopper = new MultipleAssignmentDisposable();

    public Loop()
    {
        Next();
    }

    void Next()
    {
        if (!stopper.IsDisposed)
            stopper.Disposable = scheduler.Schedule(Handler);
    }

    void Handler()
    {
        Thread.Sleep(1000);
        Console.WriteLine("Handler: {0}", Thread.CurrentThread.ManagedThreadId);
        Next();
    }

    public void Notify()
    {
        scheduler.Schedule(() =>
        {
            Console.WriteLine("Notify: {0}", Thread.CurrentThread.ManagedThreadId);
        });
    }

    public void Dispose()
    {
        stopper.Dispose();
    }
}

static void Main(string[] args)
{
    using (var l = new Loop())
    {
        Console.WriteLine("Press 'q' to quit.");
        while (Console.ReadKey().Key != ConsoleKey.Q)
            l.Notify();
    }
}
于 2013-09-25T19:47:36.227 回答