首先是一些背景知识,我编写了一个名为 Duplitiy(在 github.com 上)的开源 .NET 库,它使用FileSystemWatcher
复制两个目录之间的所有文件更改。
我编写了一个FileSystemObservable
实现的类IObservable<FileSystemChange>
(它使用FSWatcher来包装实际的FileSystemWatcher
)。当创建、修改或删除文件或目录时,这些更改将通过Subject<FileSystemChange>
使用响应式扩展发布。
然后我使用以下订阅订阅这个 observable。
return observable
.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1)))
.PrioritizeFileSystemChanges()
.SelectMany(x => x);
更改将被缓冲,直到至少有 2 秒的时间段没有任何更改,最长为 1 分钟。这是因为在删除目录时,FileSystemWatcher
会通知所有包含的文件和目录。我们可以通过吞下目录中包含的更改来优化行为,并简单地删除订阅者中的父级。这由PrioritizeFileSystemChanges
过滤器处理。它还允许我们忽略在缓冲区窗口中创建和随后删除的文件,再次减少目标上的 IO 操作。
这可行,尽管目前以一种幼稚的方式,不支持失败/重试。
但是我的问题是,这个 observable 的订阅者可能需要花费合理的时间来处理每个更改。例如,将大文件复制到慢速文件系统。当当前正在复制的同一文件发生新的文件系统更改时,我如何处理中止正在进行的操作。或者,如果文件包含在缓冲列表中但未完成,如何将其删除或排除?
我假设需要对原始 observable 进行另一个订阅,但不确定如何最好地共享状态或修改待处理的任务?必须按照接收到的顺序处理更改,这表示队列。但是,新的文件系统更改可能会应用于需要取消或删除的排队操作。队列不是为乱序删除而设计的。
例如,如果我们当前正在复制文件Foo\Bar.txt
并且Foo
目录已被删除。然后必须取消目录和所有子目录的任何正在进行或未决的更改。这可能是任务并行库的用例,还是我可以采取一些反应式方法?
也欢迎任何 github 拉取请求!