2

我一个接一个地运行许多时间\CPU 密集型进程(TimeExpensive 类型)。主线程 (A) 在另一个线程 (B) 中异步启动 TimeExpensive 进程并变为非活动状态。在完成时,线程 B 同步触发调用者的完成处理程序并在线程 B 中启动下一个 TimeExpensive 进程。创建了一个新线程 (C),但在启动 C 后,B 完成。因此,对于 n 个进程,创建了 n 个线程,并且大多数时候,它们并不共存。

有人可能希望它以线性单线程方式实现,但 TimeExpensive 是由第三方实现的,并且在运行时会使用所有系统内核并运行数小时。

//This will run as console app 
class Program
{
    static void Main(string[] args)
    {
        new Program().StartJobs();
    }

    void StartJobs()
    {
        Main mainJob = new Main();
        mainJob.MainCompletionEvent += 
                     new Action<object, EventArgs>(mainJob_MainCompletionEvent);
        mainJob.Start();
    }

    void mainJob_MainCompletionEvent(object sender, EventArgs e)
    {
        //if(success) Environment.Exit(0);
    }

}

class Main
{
    int processCounter = 0;
    public event Action<object, EventArgs> MainCompletionEvent;
    public void Start()
    {
        //...do other important tasks here...
        processCounter++;
        TimeExpensive te = new TimeExpensive();
        te.CompletionEvent += new Action(TimeExpensive_CompletionHandler);
        Thread aThread = new Thread(te.Run);
        aThread.IsBackground = false;
        aThread.Name = "TimeExpensive Thread: " + processCounter;
        aThread.Start();
    }

    void TimeExpensive_CompletionHandler()
    {
        Console.WriteLine("current Thread Name: " + Thread.CurrentThread.Name);
        //Start another Process In Background if
        if (processCounter < 5)
        {
            Start();
        }
        else
        {
            Console.ReadKey();
            if (JobCompletionEvent != null)
                JobCompletionEvent(this, new EventArgs());
        }
    }
}

class TimeExpensive
{
    public event Action CompletionEvent;

    public void Run()
    {
        //doing time expensive task
        //...

        //when finish Notify completion Handler...
        if (CompletionEvent != null)
        {
            CompletionEvent();
        }
    }
}

//Output
current Thread Name: TimeExpensive Thread: 1
current Thread Name: TimeExpensive Thread: 2
current Thread Name: TimeExpensive Thread: 3
current Thread Name: TimeExpensive Thread: 4
current Thread Name: TimeExpensive Thread: 5

上面的实现模仿了我描述的行为。困扰我的事情是事件处理程序同步运行,直到下一个线程启动,在此期间,它正在执行许多它不是为之设计的任务。

不确定这是否好,有没有办法可以在线程 B 的完成处理程序中返回线程 A?还是我应该更好地使用另一个 delegate.BeginInvoke 启动事件处理程序执行?

我希望用简单而安全的方法来做到这一点。任何帮助是极大的赞赏。

PS 我读了很多帖子,但没有人能很好地处理这种情况。

编辑

添加了静态 main 以显示如何在控制台应用程序中启动此代码。请记住,还可以创建 UI 来启动“主要”工作。它肯定会创建 BackgroundWorker 线程来创建 mainJob 对象并运行它。谢谢!

4

2 回答 2

7

有没有办法在线程 B 的完成处理程序中返回线程 A?

不,您没有合适的管道将呼叫从一个线程编组到另一个线程。这种管道由 GUI 应用程序的主线程提供。出于必要,用户界面基本上是线程不安全的。支持这种封送处理的 UI 线程有几个实现细节。它在生产者/消费者线程模型的典型实现中充当消费者。

这需要一个线程安全队列和一个读取队列的消费者循环。您可能会将其识别为 Windows GUI 应用程序中的消息队列。使用调用 GetMessage 的消息循环来读取通知并对其采取行动。编组调用现在很简单,您只需将消息发布到消息队列,UI 线程读取它并执行请求。发布由 Winforms 的 Control.BeginInvoke 和 WPF 的 Dispatcher.BeginInvoke 实现。

你当然可以自己实现这个同步机制。.NET 4 BlockingCollection 类使它变得容易。但记住,您必须从根本上改变线程 A 的执行方式。对发布到队列的请求保持响应很重要。像 BackgroundWorker 这样的类试图解决的问题。请记住,GUI 消息循环的存在是因为它是必要的,UI 不是线程安全的。控制台应用程序(通常)没有相同的负担,控制台是线程安全的。

于 2012-05-04T11:30:23.513 回答
0

您遇到的问题是由于正确穿线的难度。我用演员编写了你的​​例子:

type Actor<'a> = MailboxProcessor<'a>

type SupMsg = WaitForDone of AsyncReplyChannel<string>
type ProgramState = RunNumber of int * Actor<WorkerMsg> option
and WorkerMsg = Begin of Id * AsyncReplyChannel<string>
and Id = int

let startComputation () = Actor.Start(fun inbox ->
  async { 
    let! Begin(id, chan) = inbox.Receive()
    printfn "Running Computation"
    do! Async.Sleep(20) // zZz
    chan.Reply(sprintf "'%i is done!'" id) })

let sup () = Actor.Start(fun inbox ->
  let rec loop state =
    async {
      match state with
      | RunNumber(_, None) -> return! loop <| RunNumber(1, Some(startComputation ()))
      | RunNumber(run, Some(active)) ->
        let! completed = active.PostAndAsyncReply(fun chan -> Begin(run, chan))
        printfn "sup observed: %s" completed
        let active' = Some(startComputation ())
        if run <> 5 then return! loop <| RunNumber(run + 1, active')
        else return! isDone () }
  and isDone () =
    async {
      let! WaitForDone(chan) = inbox.Receive()
      return chan.Reply("all done") }

  loop <| RunNumber(0, None))

[<EntryPoint>]
let main args =
  printfn "%s" <| (sup ()).PostAndReply(fun chan -> WaitForDone(chan))
  0

作为输出:

> main();;
Running Computation
sup observed: '1 is done!'
Running Computation
sup observed: '2 is done!'
Running Computation
sup observed: '3 is done!'
Running Computation
sup observed: '4 is done!'
Running Computation
sup observed: '5 is done!'
all done
val it : int = 0

如您所见,跨线程通信变得轻而易举。如果您的库是第三方库,那么很容易替换Async.Sleep(20)为对该库的调用。

于 2012-05-04T10:14:06.800 回答