What would be the cleanest way to await
for a file to be created by an external application?
async Task doSomethingWithFile(string filepath)
{
// 1. await for path exists
// 2. Do something with file
}
What would be the cleanest way to await
for a file to be created by an external application?
async Task doSomethingWithFile(string filepath)
{
// 1. await for path exists
// 2. Do something with file
}
所以第一个关键点是,FileSystemWatcher
当文件系统事件在特定路径发生更改时,您可以使用 a 得到通知。例如,如果您希望在特定位置创建文件时收到通知,您可以找到。
接下来,我们可以创建一个方法,TaskCompletionSource
当文件系统观察者触发相关事件时,使用 a 来触发任务的完成。
public static Task WhenFileCreated(string path)
{
if (File.Exists(path))
return Task.FromResult(true);
var tcs = new TaskCompletionSource<bool>();
FileSystemWatcher watcher = new FileSystemWatcher(Path.GetDirectoryName(path));
FileSystemEventHandler createdHandler = null;
RenamedEventHandler renamedHandler = null;
createdHandler = (s, e) =>
{
if (e.Name == Path.GetFileName(path))
{
tcs.TrySetResult(true);
watcher.Created -= createdHandler;
watcher.Dispose();
}
};
renamedHandler = (s, e) =>
{
if (e.Name == Path.GetFileName(path))
{
tcs.TrySetResult(true);
watcher.Renamed -= renamedHandler;
watcher.Dispose();
}
};
watcher.Created += createdHandler;
watcher.Renamed += renamedHandler;
watcher.EnableRaisingEvents = true;
return tcs.Task;
}
请注意,这首先检查文件是否存在,以允许它在适用时立即退出。它还使用创建的和重命名的处理程序,因为任何一个选项都可能允许文件在将来的某个时候存在。也只监视目录,因此FileSystemWatcher
获取指定路径的目录然后在事件处理程序中检查每个受影响文件的文件名很重要。
另请注意,代码完成后会删除事件处理程序。
这允许我们写:
public static async Task Foo()
{
await WhenFileCreated(@"C:\Temp\test.txt");
Console.WriteLine("It's aliiiiiive!!!");
}
这是Servy解决方案的功能更丰富的版本。它允许监视特定的文件系统状态和事件,以涵盖不同的场景。它也可以通过 timeout 和 a 取消CancellationToken
。
[Flags]
public enum WatchFileType
{
Created = 1,
Deleted = 2,
Changed = 4,
Renamed = 8,
Exists = 16,
ExistsNotEmpty = 32,
NotExists = 64,
}
public static Task<WatchFileType> WatchFile(string filePath,
WatchFileType watchTypes,
int timeout = Timeout.Infinite,
CancellationToken cancellationToken = default)
{
var tcs = new TaskCompletionSource<WatchFileType>();
var fileName = Path.GetFileName(filePath);
var folderPath = Path.GetDirectoryName(filePath);
var fsw = new FileSystemWatcher(folderPath);
fsw.Filter = fileName;
if (watchTypes.HasFlag(WatchFileType.Created)) fsw.Created += Handler;
if (watchTypes.HasFlag(WatchFileType.Deleted)) fsw.Deleted += Handler;
if (watchTypes.HasFlag(WatchFileType.Changed)) fsw.Changed += Handler;
if (watchTypes.HasFlag(WatchFileType.Renamed)) fsw.Renamed += Handler;
void Handler(object sender, FileSystemEventArgs e)
{
WatchFileType result;
switch (e.ChangeType)
{
case WatcherChangeTypes.Created: result = WatchFileType.Created; break;
case WatcherChangeTypes.Deleted: result = WatchFileType.Deleted; break;
case WatcherChangeTypes.Changed: result = WatchFileType.Changed; break;
case WatcherChangeTypes.Renamed: result = WatchFileType.Renamed; break;
default: throw new NotImplementedException(e.ChangeType.ToString());
}
fsw.Dispose();
tcs.TrySetResult(result);
}
fsw.Error += (object sender, ErrorEventArgs e) =>
{
fsw.Dispose();
tcs.TrySetException(e.GetException());
};
CancellationTokenRegistration cancellationTokenReg = default;
fsw.Disposed += (object sender, EventArgs e) =>
{
cancellationTokenReg.Dispose();
};
fsw.EnableRaisingEvents = true;
var fileInfo = new FileInfo(filePath);
if (watchTypes.HasFlag(WatchFileType.Exists) && fileInfo.Exists)
{
fsw.Dispose();
tcs.TrySetResult(WatchFileType.Exists);
}
if (watchTypes.HasFlag(WatchFileType.ExistsNotEmpty)
&& fileInfo.Exists && fileInfo.Length > 0)
{
fsw.Dispose();
tcs.TrySetResult(WatchFileType.ExistsNotEmpty);
}
if (watchTypes.HasFlag(WatchFileType.NotExists) && !fileInfo.Exists)
{
fsw.Dispose();
tcs.TrySetResult(WatchFileType.NotExists);
}
if (cancellationToken.CanBeCanceled)
{
cancellationTokenReg = cancellationToken.Register(() =>
{
fsw.Dispose();
tcs.TrySetCanceled(cancellationToken);
});
}
if (tcs.Task.IsCompleted || timeout == Timeout.Infinite)
{
return tcs.Task;
}
// Handle timeout
var cts = new CancellationTokenSource();
var delayTask = Task.Delay(timeout, cts.Token);
return Task.WhenAny(tcs.Task, delayTask).ContinueWith(_ =>
{
cts.Cancel();
if (tcs.Task.IsCompleted) return tcs.Task;
fsw.Dispose();
return Task.FromCanceled<WatchFileType>(cts.Token);
}, TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}
使用示例:
var result = await WatchFile(@"..\..\_Test.txt",
WatchFileType.Exists | WatchFileType.Created, 5000);
在此示例中,结果通常为WatchFileType.Exists
或WatchFileType.Created
。在文件不存在且 5000 毫秒内未创建的异常情况下,TaskCanceledException
将抛出 a。
场景
• WatchFileType.Exists | WatchFileType.Created
:用于一次性创建的文件。
• WatchFileType.ExistsNotEmpty | WatchFileType.Changed
:对于先创建为空然后填充数据的文件。
• WatchFileType.NotExists | WatchFileType.Deleted
:用于即将删除的文件。
使用自定义 ReactiveExtension 运算符的完整解决方案:WaitIf。这需要 通过 NuGet 获得的 Genesis.RetryWithBackoff
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
public class TestWatcher
{
public static void Test()
{
FileSystemWatcher Watcher = new FileSystemWatcher("C:\\test")
{
EnableRaisingEvents = true,
};
var Created = Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(h => Watcher.Created += h, h => Watcher.Created -= h)
.Select(e => e.EventArgs.FullPath);
var CreatedAndNotLocked = Created.WaitIf(IsFileLocked,100, attempt =>TimeSpan.FromMilliseconds(100), Scheduler.Default);
var FirstCreatedAndNotLocked = CreatedAndNotLocked.Take(1)
.Finally(Watcher.Dispose);
var task = FirstCreatedAndNotLocked.GetAwaiter().ToTask();
task.Wait();
Console.WriteLine(task.Result);
}
public bool IsFileLocked(string filePath)
{
var ret = false;
try
{
using (File.Open(filePath, FileMode.Open)) { }
}
catch (IOException e)
{
var errorCode = Marshal.GetHRForException(e) & ((1 << 16) - 1);
ret = errorCode == 32 || errorCode == 33;
}
return ret;
}
}
public static class ObservableExtensions
{
public class NotReadyException : Exception
{
public NotReadyException (string message) : base(message)
{
}
}
public static IObservable<T> WaitIf<T>(
this IObservable<T> @this,
Func<T, bool> predicate,
int? retryCount = null,
Func<int, TimeSpan> strategy = null,
Func<Exception, bool> retryOnError = null,
IScheduler scheduler = null)
{
scheduler = scheduler ?? DefaultScheduler.Instance;
return @this.SelectMany(f =>
Observable.Defer(() =>
Observable.FromAsync<bool>(() => Task.Run<bool>(() => predicate.Invoke(f)),scheduler)
.SelectMany(b => b ? Observable.Throw<T>(new NotReadyException(f + " not ready")) :
Observable.Return(f)
).RetryWithBackoff(retryCount, strategy, retryOnError, scheduler)));
}
}
这就是我的做法:
await Task.Run(() => {while(!File.Exists(@"yourpath.extension")){} return;});
//do all the processing
你也可以把它打包成一个方法:
public static Task WaitForFileAsync(string path)
{
if (File.Exists(path)) return Task.FromResult<object>(null);
var tcs = new TaskCompletionSource<object>();
FileSystemWatcher watcher = new FileSystemWatcher(Path.GetDirectoryName(path));
watcher.Created += (s, e) =>
{
if (e.FullPath.Equals(path))
{
tcs.TrySetResult(null);
if (watcher != null)
{
watcher.EnableRaisingEvents = false;
watcher.Dispose();
}
}
};
watcher.Renamed += (s, e) =>
{
if (e.FullPath.Equals(path))
{
tcs.TrySetResult(null);
if (watcher != null)
{
watcher.EnableRaisingEvents = false;
watcher.Dispose();
}
}
};
watcher.EnableRaisingEvents = true;
return tcs.Task;
}
然后就这样使用它:
await WaitForFileAsync("yourpath.extension");
//do all the processing