1

我有一个返回我的业务对象的 IEnumerable 的方法。在这种方法中,我将大型文本文件的内容解析为业务对象模型。里面没有穿线的东西。

在我的 ViewModel (WPF) 中,我需要存储和显示该方法的结果。Store 是一个 ObservableCollection。

这是可观察的代码:

private void OpenFile(string file)
{
    _parser = new IhvParser();
    App.Messenger.NotifyColleagues(Actions.ReportContentInfo, new Model.StatusInfoDisplayDTO { Information = "Lade Daten...", Interval = 0 });

    _ihvDataList.Clear();

    var obs = _parser.ParseDataObservable(file)
                     .ToObservable(NewThreadScheduler.Default)
                     .ObserveOnDispatcher()
                     .Subscribe<Ihv>(AddIhvToList, ReportError, ReportComplete);
}

private void ReportComplete()
{
    App.Messenger.NotifyColleagues(Actions.ReportContentInfo, new Model.StatusInfoDisplayDTO { Information = "Daten fertig geladen.", Interval = 3000 });
    RaisePropertyChanged(() => IhvDataList);
}

private void ReportError(Exception ex)
{
    MessageBox.Show("...");
}

private void AddIhvToList(Ihv ihv)
{
    _ihvDataList.Add(ihv);
}

这是解析器代码:

public IEnumerable<Model.Ihv> ParseDataObservable(string file)
{
    using (StreamReader reader = new StreamReader(file))
    {
        var head = reader.ReadLine(); //erste Zeile ist Kopfinformation

        if (!head.Contains("BayBAS") || !head.Contains("2.3.0"))
        {
            _logger.ErrorFormat("Die Datei {0} liegt nicht im BayBAS-Format 2.3.0 vor.");
        }
        else
        {
            while (!reader.EndOfStream)
            {
                var line = reader.ReadLine();
                if (line.Length != 1415)
                {
                    _logger.ErrorFormat("Die Datei {0} liegt nicht im BayBAS-Format 2.3.0 vor.");
                    break;
                }

                var tempIhvItem = Model.Ihv.Parse(line);
                yield return tempIhvItem;
            }
            reader.Close();

        }
    }

}

为什么我没有得到异步结果?在我的 DataGrid 中看到结果之前,所有项目都已解析并交付。

有人可以帮忙吗?

安德烈亚斯

4

1 回答 1

3

你确定这不是异步发生的吗?您是根据您在 UI 中的感知来假设这一点,还是您设置了断点并确定这实际上是这种情况?

请注意,WPFDispatcher使用优先级队列,并DispatcherScheduler安排具有Normal优先级的项目,这优于用于输入、布局和呈现的优先级。如果结果来得足够快,那么 UI 可能直到处理完最后一个结果后才会更新:调度程序可能太忙于处理结果而无法执行 UI 的布局和呈现。

DispatcherScheduler您可以尝试以自定义优先级覆盖 to 的行为,如下所示:

public class PriorityDispatcherScheduler : DispatcherScheduler
{
    private readonly DispatcherPriority _priority;

    public PriorityDispatcherScheduler(DispatcherPriority priority)
        : this(priority, Dispatcher.CurrentDispatcher) {}

    public PriorityDispatcherScheduler(DispatcherPriority priority, Dispatcher dispatcher)
        : base(dispatcher)
    {
        _priority = priority;
    }

    public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
    {
        if (action == null)
            throw new ArgumentNullException("action");

        var d = new SingleAssignmentDisposable();

        this.Dispatcher.BeginInvoke(
            _priority,
            (Action)(() =>
                     {
                         if (d.IsDisposed)
                             return;
                         d.Disposable = action(this, state);
                     }));

        return d;
    }
}

然后通过替换来修改您的可观察序列ObserveOnDispatcher()ObserveOn(new PriorityDispatcherScheduler(p))其中p是适当的优先级(例如,Background)。

此外,这看起来非常可疑ToObservable(NewThreadScheduler.Default):我相信这将导致每次结果进入时都会创建一个新线程,其唯一目的是将其传递给调度程序,之后新线程将终止。这几乎肯定不是你想要的。我假设您只是希望在单独的线程上处理文件;如所写,如果您产生 1,000 个项目,您的代码实际上最终会创建 1,000 个短期线程IEnumerable,而这些项目实际上都不会执行读取文件的工作。

最后,是OpenFile()在调度线程上调用吗?如果是这样,我相信会发生如下情况:

  1. Dispatcher(在 UI 线程上)将调用Subscribe(),它将处理可观察操作符链一直返回到ParseDataObservable(file).
  2. Dispatcher 将遍历您的IEnumerable序列,将每个结果触发到由ToObservable().
  3. 传递到可观察序列的每个结果都将安排在调度程序(当前正在运行的同一调度程序)上交付。

如果是这种情况,那么将在任何结果传递给之前AddIhvToList()读取整个文件,因为调度程序一直在读取文件,并且在完成之前不会处理其队列中的结果。如果发生这种情况,您可以尝试按如下方式更改代码:

var obs = _parser.ParseDataObservable(file)
                 .ToObservable()
                 .SubscribeOn(/*NewThread*/Scheduler.Default)
                 .ObserveOnDispatcher() // consider using PriorityDispatcherScheduler
                 .Subscribe<Ihv>(AddIhvToList, ReportError, ReportComplete);

注入SubscribeOn()应确保您的迭代IEnumerable(即文件的读取)发生在单独的线程上。 Scheduler.Default在这里应该足够了,但NewThreadScheduler如果你真的需要(你可能不需要),你可以使用 a。调度程序线程将Subscribe()在一切设置完成后返回,将其释放以继续处理其队列,即将结果传递给AddIhvToList()他们。这应该会给您所需的异步行为。

于 2013-10-25T15:41:28.950 回答