-1

我的 C# 4.0 应用程序中有一个对象列表。假设这个列表包含 100 个学生类的对象。Reactive Framework 中是否有任何方法可以一次并行执行 10 个对象?

每个学生对象运行一个方法,该方法耗时大约 10 到 15 秒。所以第一次,从列表中取出前 10 个学生对象并等待所有 10 个学生对象完成它的工作,然后取出接下来的 10 个学生对象,依此类推,直到它完成列表中的全部项目?

  1. 我有一个List<Student>100 计数。
  2. 首先从列表中取出 10 个项目并并行调用每个对象的长期运行方法。
  3. 接收每个对象的返回值并更新 UI [订阅部分]。
  4. 仅当前 10 轮完成并释放所有内存时,下一轮才开始。
  5. 对列表中的所有项目重复相同的过程。
  6. 如何捕捉每个过程中的错误?
  7. 如何从内存中释放每个学生对象的资源和其他资源?
  8. 在 Reactive Framework 中做所有这些事情的最佳方法是什么?
4

3 回答 3

1

此版本将始终有 10 名学生同时运行。当一个学生完成时,另一个将开始。当每个学生完成时,您可以处理它出现的任何错误,然后清理它(这将在下一个学生开始运行之前发生)。

students
    .ToObservable()
    .Select(student => Observable.Defer(() => Observable.Start(() =>
        {
            // do the work for this student, then return a Tuple of the student plus any error
            try
            {
                student.DoWork();
                return { Student = student, Error = (Exception)null };
            }
            catch (Exception e)
            {
                return { Student = student, Error = e };
            }
        })))
    .Merge(10) // let 10 students be executing in parallel at all times
    .Subscribe(studentResult =>
    {
        if (studentResult.Error != null)
        {
            // handle error
        }

        studentResult.Student.Dispose(); // if your Student is IDisposable and you need to free it up.
    });

这不完全是您所要求的,因为它在开始下一批之前没有完成第一批 10 个。这始终保持 10 并行运行。如果你真的想要 10 个批次,我会为此调整代码。

于 2013-02-27T16:53:45.337 回答
0

这对我来说听起来很像 TPL 的问题。您有一组已知的静止数据。您希望将一些繁重的处理划分为并行运行,并且您希望能够批处理负载。

我在您的问题的任何地方都看不到异步源、动态数据源或需要反应的消费者。这是我建议您改用 TPL 的理由。

在单独的说明中,为什么要并行处理 10 的幻数?这是业务需求,还是潜在的优化性能的尝试?通常,最佳实践是允许 TaskPool 根据内核数量和当前负载计算出最适合客户端 CPU 的内容。我想随着设备及其 CPU 结构(单核、多核、多核、低功耗/禁用内核等)的巨大变化,这一点变得越来越重要。

这是您可以在 LinqPad 中执行的一种方法(但请注意缺少 Rx)

void Main()
{
    var source = new List<Item>();
    for (int i = 0; i < 100; i++){source.Add(new Item(i));}

    //Put into batches of ten, but only then pass on the item, not the temporary tuple construct.
    var batches = source.Select((item, idx) =>new {item, idx} )
                        .GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item);

    //Process one batch at a time (serially), but process the items of the batch  in parallel (concurrently).
    foreach (var batch in batches)
    {
        "Processing batch...".Dump();
        var results = batch.AsParallel().Select (item => item.Process());
        foreach (var result in results)
        {
            result.Dump();
        }
        "Processed batch.".Dump();
    }
}


public class Item
{
    private static readonly Random _rnd = new Random();
    private readonly int _id;
    public Item(int id)
    {
        _id = id;
    }

    public int Id { get {return _id;} }

    public double Process()
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        string.Format("Processing on thread:{0}", threadId).Dump(Id);
        var loopCount = _rnd.Next(10000,1000000);
        Thread.SpinWait(loopCount);
        return _rnd.NextDouble();
    }
    public override string ToString()
    {
        return string.Format("Item:{0}", _id);
    }
}

我很想知道您是否确实存在动态数据问题或反应性消费者问题,但只是“简化”了问题以使其更易于解释。

于 2013-03-19T10:50:31.460 回答
0

我的尝试……

var students = new List<Student>();
{....}
var cancel = students
    .ToObservable(Scheduler.Default)
    .Window(10)
    .Merge(1)
    .Subscribe(tenStudents =>
    {
        tenStudents.ObserveOn(Scheduler.Default)
            .Do(x => DoSomeWork(x))
            .ObserverOnDispatcher()
            .Do(tenStudents => UpdateUI(tenStudents))
            .Subscribe();               
    });
于 2013-02-25T15:01:18.063 回答