8

我编写了一些代码,将FileSystemWatcher'Changed事件转换为可观察的序列。

我的目标是将所有文件系统更改拆分为单独的流并限制它们。

例如,如果我有 10 个不同的文件,它们在半秒内更改了 3 次,那么每个文件我只会收到一次通知。

我担心的是GroupBy()运营商。为此,(我假设)它需要随着时间的推移不断建立组并消耗少量内存。

这会导致“泄漏”吗?如果是,我该如何预防?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") {
    EnableRaisingEvents = true,
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
};

void Main()
{
    var fileSystemEventStream = 
        Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
            (
                _ => _watcher.Changed += _, 
                _ => _watcher.Changed -= _
            )
            .ObserveOn(ThreadPoolScheduler.Instance)
            .SubscribeOn(ThreadPoolScheduler.Instance)
            .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
            ;

    var res = 
        from fileGroup in fileSystemEventStream
        from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
        select file;

    res.Subscribe(
        ReceiveFsFullPath, 
        exception => {
            Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
        });

    Console.Read();
}

void ReceiveFsFullPath(string s){
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine(s);
}
4

2 回答 2

4

是的,对于每个新键,GroupBy 创建一个主题,并维护这些主题的字典。你正在订阅其中的每一个。所以这是一小块内存,它会随着时间的推移而增长,而不会释放旧条目。您真正需要的是在油门计时器到期时删除密钥。我想不出用内置运算符做到这一点的方法。所以你需要一个自定义运算符。这是一个刺。

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay)
{
    return Observable.Create(observer =>
    {
        var notifications = new Subject<IObservable<T>>();
        var subscription = notifications.Merge().Subscribe(observer);
        var d = new Dictionary<T, IObserver<T>>();
        var gate = new object();
        var sourceSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(subscription, sourceSubscription);
        sourceSubscription.Disposable = source.Subscribe(value =>
        {
           IObserver<T> entry;
           lock(gate)
           {
             if (d.TryGetValue(value, out entry))
             {
               entry.OnNext(value);
             }
             else
             {
               var s = new Subject<T>();
               var o = s.Throttle(delay).FirstAsync().Do(() =>
               {
                 lock(gate)
                 {
                   d.Remove(value);
                 }
               });
               notifications.OnNext(o);
               d.Add(value, s);
               s.OnNext(value);
             }
          }
        }, observer.OnError, notifications.OnCompleted);

        return subscriptions;
    });
}

...
Observable.FromEventPattern(...)
    .Select(e => e.EventArgs.FullPath)
    .ThrottleDistinct(TimeSpan.FromSeconds(1))
    .Subscribe(...);
于 2013-08-04T12:57:30.310 回答
1

根据 Brandon 的回复,受试者会成长并且无法被回收*。我对这里内存泄漏的主要担心是您没有捕获订阅!IE

res.Subscribe(...

必须替换为

subscription = res.Subscribe(...

如果您不捕获订阅,则永远无法处置订阅,因此您永远不会释放事件处理程序,因此您“内存泄漏”。显然,如果您没有在某个地方实际处理订阅,这将毫无用处。

*好吧,如果他们完成了,那么他们将被自动处置,这样就可以了。当 FileDeleted 事件发生时,您可能希望完成一个序列?

于 2013-08-05T09:55:53.297 回答