0

首先是一些背景知识,我编写了一个名为 Duplitiy(在 github.com 上)的开源 .NET 库,使用FileSystemWatcher复制两个目录之间的所有文件更改。

我编写了一个FileSystemObservable实现的类IObservable<FileSystemChange>(它使用FSWatcher来包装实际的FileSystemWatcher)。当创建、修改或删除文件或目录时,这些更改将通过Subject<FileSystemChange>使用响应式扩展发布。

然后我使用以下订阅订阅这个 observable。

 return observable
          .Buffer(() => observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1)))     
          .PrioritizeFileSystemChanges()           
          .SelectMany(x => x);

更改将被缓冲,直到至少有 2 秒的时间段没有任何更改,最长为 1 分钟。这是因为在删除目录时,FileSystemWatcher会通知所有包含的文件和目录。我们可以通过吞下目录中包含的更改来优化行为,并简单地删除订阅者中的父级。这由PrioritizeFileSystemChanges过滤器处理。它还允许我们忽略在缓冲区窗口中创建和随后删除的文件,再次减少目标上的 IO 操作。

这可行,尽管目前以一种幼稚的方式,不支持失败/重试。

但是我的问题是,这个 observable 的订阅者可能需要花费合理的时间来处理每个更改。例如,将大文件复制到慢速文件系统。当当前正在复制的同一文件发生新的文件系统更改时,我如何处理中止正在进行的操作。或者,如果文件包含在缓冲列表中但未完成,如何将其删除或排除?

我假设需要对原始 observable 进行另一个订阅,但不确定如何最好地共享状态或修改待处理的任务?必须按照接收到的顺序处理更改,这表示队列。但是,新的文件系统更改可能会应用于需要取消或删除的排队操作。队列不是为乱序删除而设计的。

例如,如果我们当前正在复制文件Foo\Bar.txt并且Foo目录已被删除。然后必须取消目录和所有子目录的任何正在进行或未决的更改。这可能是任务并行库的用例,还是我可以采取一些反应式方法?

也欢迎任何 github 拉取请求!

4

1 回答 1

1

您似乎在这里有几个目标/问题:

  1. 删除由于后来的更改而不再需要的早期更改。 链表可能很适合这个。它为一般队列使用和良好的项目删除性能提供了良好的性能。
  2. 取消由于以后的更改而不再需要的正在进行的操作。这还将包括需要重新启动的操作。这将要求您找到一个允许您取消正在进行的操作的库。System.IO 类不提供这样的取消,因此您需要找到一个库或滚动您自己的库。
  3. 这可能是任务并行库的用例,还是我可以采取一些反应式方法? 你的措辞让我印象深刻,好像这里有一个或另一个选择,但你没有理由不能将两者混合在一起。您应该对文件更改进行观察是一个很好的起点(RX)。进行中的操作可能会被实现为采用 aCancellationToken并返回 a Task(TPL) 的方法。

这里缺少的步骤似乎是如何从更改“队列”到实际工作。基本上,订阅必须(快速)排队更改并启动(慢速,异步)方法,如果它尚未运行,则“递归”处理队列;就像是:

'changes is your returned observable
'toProcess is the "queue" of changes
'processor holds information about and the task of the in-progress operation
changes.Subscribe(Sub(c)
                     UpdateQueueWithChange(c, toProcess, processor)
                     If processor.Task.IsCompleted Then
                         ProcessNextChange(processor, toProcess)
                     End If
                  End Sub)

ProcessNextChange是一个方法,它将获取队列中的下一个更改,开始操作,设置操作任务的回调以重新调用 ProcessNextChange。如果没有留下任何更改,则processor应该给出一个不重新调用 ProcessNextChange 的已完成任务。

UpdateQueueWithChange将需要更新“队列”并在必要时取消进行中的操作,这将触发ProcessNextChange由于任务完成而开始下一个操作的调用。

如果您想在取消对可观察的更改的订阅时取消操作,我建议将订阅一次性放入 aCompositeDisposable以及SerialDisposable将存储 a CancellationDispoable(由 更新ProcessNextChange并另外存储在processor)中,这CancellationToken是操作方法。ProcessNextChange 将检查 SerialDisposable 以查看它是否已在启动操作之前被释放。CompositeDisposable 将是您存储在某处以结束整个事情的东西。

CompositeDisposable 'this is what your application keeps around
|- IDisposable from subscription to changes observable
|- SerialDisposable
   |- .Disposable property = CancellationDisposable 
      'changed each time ProcessNextChange is called
于 2012-06-20T04:46:00.887 回答