3

我有一个要并行处理的可观察集合,然后在过滤时观察处理后的值,最后订阅一个接收过滤值的处理程序。

我的示例在语法上是正确的并且编译得很好,当我运行代码时,Where会评估执行过滤的语句。但是没有数据通过订阅。如果我删除AsParallel以便通过常规完成处理,则IEnumerable数据会通过并且一切都按预期工作。

这是我的示例,对字符串进行了一些处理:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

下一个奇怪的事情是,如果我使用TakeWhile运算符,在我看来,它在概念上类似于 Where,观察 ParallelQuery 按预期工作:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

向订阅添加一些日志记录代码表明在ToObservable转换之前接收到数据,但不是在转换之后:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

第 4 行的 lambda 中的断点被命中,而第 6 行的 lambda 中的断点从未被命中。

为什么将TakeWhile数据传递给订阅者而Where不会?

如果它很重要,我会在 Visual Studio 2010 RC 中使用针对 .Net 4.0 Framework Client Profile 的项目开发我的代码。

更新:根据@Sergeys 的回答,我重新设计了Where过滤器的位置。以下代码按预期工作:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

processedStrings必须首先将初始的 observable转换为 enumerable 以使其并行化,然后将其转换回 observable 以订阅最终结果,这仍然感觉有点尴尬。

4

2 回答 2

2

TakeWhile在概念上不等同于Where,因为它取决于排序。我怀疑查询实际上是按顺序执行的(请参阅此博客文章)。尝试.WithExecutionMode(ParallelExecutionMode.ForceParallelism)在您的TakeWhile示例中调用,我怀疑您会看到相同的结果。

我不知道为什么它在并行情况下不起作用...我可以建议您进行一些日志记录以查看数据到达多远?例如,您可以使用 Select 执行有用的记录,该 Select 在记录后返回原始项目。

于 2010-02-18T10:59:28.530 回答
2

简而言之,来自C# 4.0


目前 PLINQ 可以并行化的内容存在一些实际限制。这些限制可能会随着后续的服务包和框架版本而放松。以下查询运算符可防止查询被并行化,除非源元素位于其原始索引位置:

  • Take、TakeWhile、Skip 和 SkipWhile
  • Select、SelectMany 和 ElementAt 的索引版本

大多数查询运算符会更改元素的索引位置(包括那些删除元素的,例如 Where)。这意味着如果您想使用前面的运算符,它们通常需要位于查询的开头


所以,事实上,使用 TakeWhile 可以防止 .AsParallel() 并行化。很难说为什么Where 会杀死订阅,但将它放在 AsParallel 之前可能会解决问题。

于 2010-02-18T17:13:47.510 回答