创建可观察数据流块时有两个选项需要考虑。您可以:
- 每次处理消息时发出通知,或
- 当存储在块的输出缓冲区中的先前处理的消息被链接块接受时发出通知。
两种选择都有优点和缺点。第一个选项提供及时但无序的通知。第二个选项提供有序但延迟的通知,并且还必须处理块到块链接的可处置性。当两个块之间的链接在块完成之前手动设置时,observable 会发生什么?
下面是第一个选项的实现,它创建了TransformBlock
一个不消耗IObservable
这个块的块。还有一个ActionBlock
等效的实现,基于第一个实现(尽管它也可以通过复制粘贴和调整实现来独立TransformBlock
实现,因为代码不多)。
public static TransformBlock<TInput, TOutput>
CreateObservableTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
out IObservable<(TInput Input, TOutput Output,
int StartedIndex, int CompletedIndex)> observable,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
var semaphore = new SemaphoreSlim(1);
int startedIndexSeed = 0;
int completedIndexSeed = 0;
var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
new DataflowBlockOptions() { BoundedCapacity = 100 });
var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
{
var startedIndex = Interlocked.Increment(ref startedIndexSeed);
var result = await transform(item).ConfigureAwait(false);
await semaphore.WaitAsync().ConfigureAwait(false);
try
{
// Send the notifications in synchronized fashion
var completedIndex = Interlocked.Increment(ref completedIndexSeed);
await notificationsBlock.SendAsync(
(item, result, startedIndex, completedIndex)).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
return result;
}, dataflowBlockOptions);
_ = transformBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
else notificationsBlock.Complete();
}, TaskScheduler.Default);
observable = notificationsBlock.AsObservable();
// A dummy subscription to prevent buffering in case of no external subscription.
observable.Subscribe(
DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
return transformBlock;
}
// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
CreateObservableTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform,
out IObservable<(TInput Input, TOutput Output,
int StartedIndex, int CompletedIndex)> observable,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
out observable, dataflowBlockOptions);
}
// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
CreateObservableActionBlock<TInput>(
Func<TInput, Task> action,
out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateObservableTransformBlock<TInput, object>(
async item => { await action(item).ConfigureAwait(false); return null; },
out var sourceObservable, dataflowBlockOptions);
block.LinkTo(DataflowBlock.NullTarget<object>());
observable = sourceObservable
.Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
return block;
}
// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
CreateObservableActionBlock<TInput>(
Action<TInput> action,
out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return CreateObservableActionBlock(
item => { action(item); return Task.CompletedTask; },
out observable, dataflowBlockOptions);
}
Windows 窗体中的使用示例:
private async void Button1_Click(object sender, EventArgs e)
{
var block = CreateObservableTransformBlock((int i) => i + 20,
out var observable,
new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var vals = Enumerable.Range(1, 20).ToList();
TextBox1.Clear();
ProgressBar1.Value = 0;
observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
{
TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
}, onError: ex =>
{
TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
},
onCompleted: () =>
{
TextBox1.AppendText("The job completed successfully\r\n");
});
block.LinkTo(DataflowBlock.NullTarget<int>());
foreach (var i in vals) await block.SendAsync(i);
block.Complete();
}
在上面的例子中,observable
变量的类型是:
IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>
这两个指数是从 1 开始的。