我有一个要并行处理的可观察集合,然后在过滤时观察处理后的值,最后订阅一个接收过滤值的处理程序。
我的示例在语法上是正确的并且编译得很好,当我运行代码时,Where
会评估执行过滤的语句。但是没有数据通过订阅。如果我删除AsParallel
以便通过常规完成处理,则IEnumerable
数据会通过并且一切都按预期工作。
这是我的示例,对字符串进行了一些处理:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
下一个奇怪的事情是,如果我使用TakeWhile
运算符,在我看来,它在概念上类似于 Where,观察 ParallelQuery 按预期工作:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
向订阅添加一些日志记录代码表明在ToObservable
转换之前接收到数据,但不是在转换之后:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
第 4 行的 lambda 中的断点被命中,而第 6 行的 lambda 中的断点从未被命中。
为什么将TakeWhile
数据传递给订阅者而Where
不会?
如果它很重要,我会在 Visual Studio 2010 RC 中使用针对 .Net 4.0 Framework Client Profile 的项目开发我的代码。
更新:根据@Sergeys 的回答,我重新设计了Where
过滤器的位置。以下代码按预期工作:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
processedStrings
必须首先将初始的 observable转换为 enumerable 以使其并行化,然后将其转换回 observable 以订阅最终结果,这仍然感觉有点尴尬。