2

我已经阅读了有关 TPL 的文档和许多教程,但没有一个涵盖我想要实现的模型。

某些算法总是有固定的迭代次数。

我需要不断运行线程(尽可能多):

而(真)

  • 从主线程获取数据
  • 执行繁重的耗时任务(在单独的线程中)
  • 更新主线程信息

另外,我需要能够设置闹钟(例如 5 秒)的机制。五秒钟后,所有工作必须暂停一段时间,然后再恢复。

我应该使用 Task.ContinueWith 相同的任务吗?但我不是在处理先前任务启动的结果,而是在主线程中更新数据结构,然后决定新任务迭代的输入是什么......

我如何让 TPL 决定应该创建多少任务以获得最佳效率?

不,我正在使用 BackgroundWorkers,因为它们有很好的 RunEventCompleted 事件 - 在它里面我在我的主线程上,所以我可以更新我的 MAIN 结构,检查时间限制,然后最终在完成的 BackgroundWorker 上再次调用 StartAsync。它很好很清晰,但可能非常无用。我需要让它在多处理器、多核服务器上高效。

一个问题是计算总是在线的,永远不会停止。还有一些网络,可以远程询问 MAIN 结构的当前状态。

第二个问题是关键时间控制(我必须有精确的计时器——当它停止时,没有线程可以重新启动)。然后是特殊的高优先级任务,结束后,所有工作都恢复了。

第三个问题是操作没有上限。

根据我的观察,这三个约束不能很好地遵循 TPL - 我不能使用像 Parallel.For 这样的东西,因为集合是由任务本身的结果实时修改的......我也不知道如何组合:

  • 让 TPL 决定应该创建多少线程的能力
  • 具有线程的生命周期运行(在连续重新启动之间具有暂停和同步点)
  • 在开始时只创建一次线程(它们应该只用不断的新参数重新启动)

有人可以给我线索吗?我知道如何做坏事,效率低下。我描述了一些小要求,这使我无法做到这一点。我有点困惑。

4

2 回答 2

3

您需要使用消息传递 + 演员 + 调度程序 imo。然后你需要使用一种能够胜任它的语言。查看从 Azure 服务总线异步接收、在共享队列中排队并通过参与者管理运行时状态的代码。

排队:

我应该使用 Task.ContinueWith 相同的任务吗?

不,ContinueWith 将根据每个延续传递内部的异常处理来终止您的程序;TPL 中没有将失败状态编组到调用端/主线程的好方法。

但我不是在处理先前任务启动的结果,而是在主线程中更新数据结构,然后决定新任务迭代的输入是什么......

为此,您需要超越线程,除非您愿意在问题上花费大量时间。

我如何让 TPL 决定应该创建多少任务以获得最佳效率?

这由运行您的异步工作流的框架处理。

不,我正在使用 BackgroundWorkers,因为它们有很好的 RunEventCompleted 事件 - 在它里面我在我的主线程上,所以我可以更新我的 MAIN 结构,检查时间限制,然后最终在完成的 BackgroundWorker 上再次调用 StartAsync。它很好很清晰,但可能非常无用。我需要让它在多处理器、多核服务器上高效。

一个问题是计算总是在线的,永远不会停止。还有一些网络,可以远程询问 MAIN 结构的当前状态。第二个问题是关键时间控制(我必须有精确的计时器——当它停止时,没有线程可以重新启动)。

如果您以异步方式运行所有内容,则可以将消息传递给暂停它的参与者。你的调度actor负责用他们的调度消息调用它的所有订阅者;查看paused链接代码中的状态。如果您有未完成的请求,您可以向他们传递取消令牌并以这种方式处理“硬”取消/套接字中止。

然后是特殊的高优先级任务,结束后,所有工作都恢复了。根据我的观察,这两个约束不能很好地遵循 TPL - 我不能使用类似 Parallel.For 之类的东西,因为集合是由任务本身的结果实时修改的......

您可能需要一种称为管道和过滤器的模式。您将输入输入到一系列工人(演员)中;每个工人从其他工人的产出中消费。使用控制通道(在我的情况下是参与者的收件箱)完成信号发送。

于 2012-04-05T10:41:59.620 回答
0

我认为你应该阅读

MSDN:如何实现生产者/消费者数据流模式

我遇到了同样的问题:一个生产者生产商品,而几个消费者消费了它们并决定将它们发送给其他消费者。每个消费者都在异步工作,并且独立于其他消费者。

你的主要任务是生产者。他生产您的其他任务应该处理的项目。带有主要任务代码的类有一个功能:

public async Task ProduceOutputAsync(...)

您的主程序使用以下命令启动此任务:

var producerTask = Task.Run( () => MyProducer.ProduceOutputAsync(...)

一旦这被调用,生产者任务开始产生输出。同时你的主程序可以继续做其他事情,比如启动消费者。

但让我们首先关注 Producer 任务。

生产者任务产生类型为 T 的项目以供其他任务处理。使用实现 ITargetBlock 的对象将它们转移到其他任务。

每次生产者任务完成创建类型 T 的对象时,它都会使用 ITargetBlock.Post 或最好是异步版本将其发送到目标块:

while (continueProducing())
{
    T product = await CreateProduct(...)
    bool accepted = await this.TargetBlock(product)
    // process the return value
}
// if here, nothing to produce anymore. Notify the consumers:
this.TargetBlock.Complete();

生产者需要一个 ITargetBlock <T>。在我的应用程序中,一个 BufferBlock <T> 就足够了。检查 MSDN 以了解其他可能的目标。

无论如何,数据流块也应该实现 ISourceBlock <T>。您的接收器等待输入到达源,获取并处理它。完成后,它可以将结果发送到自己的目标块,并等待下一个输入,直到不再有预期的输入。当然,如果您的消费者不产生输出,它就不必向目标发送任何内容。

等待输入的过程如下:

ISourceBlock`<T`> mySource = ...;
while (await mySource.ReceiveAsync())
{   // a object of type T is available at the source
    T objectToProcess = await mySource.ReceiveAsync();
    // keep in mind that someone else might have fetched your object
    // so only process it if you've got it.
    if (objectToProcess != null)
    {
        await ProcessAsync(objectToProcess);

        // if your processing produces output send the output to your target:
        var myOutput = await ProduceOutput(objectToprocess);
        await myTarget.SendAsync(myOutput);
    }
}
// if here, no input expected anymore, notify my consumers:
myTarget.Complete();
  • 构建你的生产者
  • 构建所有消费者
  • 给生产者一个 BufferBlock 以将其输出发送到
  • 启动生产者 MyProducer.ProduceOutputAsync(...)
  • 当生产者产生输出并将其发送到缓冲区块时:
  • 给消费者相同的 BufferBlock
  • 将消费者作为单独的任务启动
  • await Task.WhenAll(...) 等待所有任务完成。

每个消费者将在听到不再需要输入时立即停止。完成所有任务后,您的 main 函数可以读取结果并返回

于 2015-07-29T08:38:06.053 回答