0

我正在尝试限制生成的事件,我正在创建一个合并两种类型事件的流。我正在监视一组记录,如果发生事件可能会触发

1. 添加/删除行时的 CollectionChangedEvent。
2.当一项的值发生变化时的PropertyChangedEvent

public IObservable<IEnumerable<ChangeDTO>> GetChangeStream()
{
     IObservable<IEnumerable<ChangeDTO>> allItems = 
     Observable.FromEventPattern<CollectionChangedEventArgs>(
                  x => source.CollectionChanged += x,
                  x => source.CollectionChanged-= x)
                  .Select(i => new ChangeDTO(source.SelectedItems, true, null)})
                  .Buffer(Timespan.FromMilliSeconds(300);                      

     IObservable<IEnumerable<ChangeDTO>> updatedItem = 
     Observable.FromEventPattern<PropertyChangedEventArgs>(
              x => source.PropertyChanged += x,
              x => source.PropertyChanged -= x).
              Select(i => new List<ChangeDTO>() {new ChangeDTO(new[] {source.Item}, false, source.UpdatedProperties)};

    return allItems.Merge(updatedItems);       
}

public class ChangeDTO
{
    ChangeDTO(IEnumerable<SourceDTO> items, bool recalculateAll, IEnumerable<string>   
     propertiesChanged)    
}

如果 CollectionChangedEvent 在 300 毫秒内触发,我正在尝试清除 PropertyChangedEvent 缓冲区,即对属性更改不感兴趣并想忽略它们。仅当没有触发 CollectionChangedEvent 时,才对属性更改感兴趣。

4

1 回答 1

2

这有点涉及,但我会尽量简洁地解释它!

核心思想是从 PropertyChangedEvents(PCE) 的缓冲流开始,然后每次有 CollectionChangedEvent (CCE) 时切换到新的缓冲流。最后,我们将 CCE 和 PCE 的扁平化缓冲流合并在一起。

我们通过将 CCE 转换为一个流Unit.DefaultStartWith一个初始值来开始 collectUpdateItems(这让我们可以在第一个 CCE 之前开始收集 PCE)。然后将 ( Select) 每个脉冲投射到新的缓冲 PCE 流中。这Switch将确保仅返回最新的缓冲区流。只是丢弃空Where缓冲区。

最后,我们Merge使用原始 CCE 流(投影到单个元素列表中以与 PCE 缓冲区兼容)。

希望这是有道理的!

稍微修改您的流以从普通的、无缓冲的 ChangeDTO 对象流开始:

IObservable<IEnumerable<ChangeDTO>> allItems = 
Observable.FromEventPattern<CollectionChangedEventArgs>(
    x => source.CollectionChanged += x,
    x => source.CollectionChanged-= x)
    .Select(i => new ChangeDTO(source.SelectedItems, true, null)});

IObservable<IEnumerable<ChangeDTO>> updatedItem = 
Observable.FromEventPattern<PropertyChangedEventArgs>(
    x => source.PropertyChanged += x,
    x => source.PropertyChanged -= x).
    .Select(i => new ChangeDTO(new[] {source.Item}, false, source.UpdatedProperties));

var collectUpdatedItems = allItems
    // We only need to know the CCE happened, not the details of it, so convert to Units
    .Select(_ => Unit.Default)
    // We must insert an event so PCEs are buffered before the first CCE
    .StartWith(Unit.Default)
    // Here we create a new PCE buffer stream at the start and after each CCE
    .Select(_ => updateItem.Buffer(TimeSpan.FromMilliseconds(300)))
    // On a CCE, switch to the new PCE buffer stream, dropping the current PCE buffer
    .Switch()
    // Finally drop any empty buffers
    .Where(i => i.Count > 0);

// merge the CollectionChangedEvents with the PropertyChangedEvent buffers
return collectionUpdateItems.Merge(allItems.Select(i => new List<ChangeDTO> {i}));
于 2013-10-02T11:02:01.543 回答