10

我有一个程序,我正在接收事件并希望分批处理它们,以便在我处理当前批次时进入的所有项目都将出现在下一个批次中。

Rx 中简单的 TimeSpan 和基于计数的 Buffer 方法将给我多批项目,而不是给我一大批已经进来的所有东西(如果订阅者花费的时间超过指定的 TimeSpan 或超过 N 个项目进来并且N 大于计数)。

我研究了使用更复杂的缓冲区重载,它们采用Func<IObservable<TBufferClosing>>IObservable<TBufferOpening> 和 Func<TBufferOpening, IObservable<TBufferClosing>>,但我找不到如何使用这些的示例,更不用说数字了了解如何将它们应用于我正在尝试做的事情。

4

4 回答 4

2

这是做你想做的吗?

var xs = new Subject<int>();
var ys = new Subject<Unit>();

var zss =
    xs.Buffer(ys);

zss
    .ObserveOn(Scheduler.Default)
    .Subscribe(zs =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(String.Join("-", zs));
        ys.OnNext(Unit.Default);
    });

ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);

我的结果:

1-2-3
4-5
6-7
于 2012-11-27T21:38:50.233 回答
1

我之前这样做的方法是在 DotPeek/Reflector 中提取 ObserveOn 方法,并采用它所具有的排队概念并使其适应我们的要求。例如,在具有快速计时数据的 UI 应用程序(如金融)中,UI 线程可能会被事件淹没,有时它不能足够快地更新。在这些情况下,我们希望删除除最后一个事件之外的所有事件(对于特定工具)。在这种情况下,我们将 ObserveOn 的内部队列更改为单个值 T(查找 ObserveLatestOn(IScheduler))。在您的情况下,您想要队列,但是您想要推送整个队列而不仅仅是第一个值。这应该让你开始。

于 2012-12-10T07:54:04.877 回答
1

您需要的是缓冲值的东西,然后当工作人员准备好时,它会请求当前缓冲区,然后将其重置。这可以通过 RX 和 Task 的组合来完成

class TicTac<Stuff> {

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();

    List<Stuff> in = new List<Stuff>();

    public void push(Stuff stuff){
        lock(this){
            if(in == null){
                in = new List<Stuff>();
                Items.SetResult(in);
            }
            in.Add(stuff);
        }
    }

    private void reset(){
        lock(this){
            Items = new TaskCompletionSource<List<Stuff>>();
            in = null;
        }
    }

    public async Task<List<Stuff>> Items(){
        List<Stuff> list = await Items.Task;
        reset();
        return list;
    }
}

然后

var tictac = new TicTac<double>();

IObservable<double> source = ....

source.Subscribe(x=>tictac.Push(x));

然后在你的工人

while(true){

    var items = await tictac.Items();

    Thread.Sleep(100);

    for each (item in items){
        Console.WriteLine(item);
    }

}
于 2012-11-28T09:04:26.200 回答
1

@Enigmativity 的答案的一种扩展。我用它来解决问题:

public static IObservable<(Action ready, IReadOnlyList<T> values)> BufferUntilReady<T>(this IObservable<T> stream)
{
    var gate = new BehaviorSubject<Guid>(Guid.NewGuid());

    void Ready() => gate.OnNext(Guid.NewGuid());

    return stream.Publish(shared => shared
        .Buffer(gate.CombineLatest(shared, ValueTuple.Create)
            .DistinctUntilChanged(new AnyEqualityComparer<Guid, T>()))
        .Where(x => x.Any())
        .Select(x => ((Action) Ready, (IReadOnlyList<T>) x)));
}

public class AnyEqualityComparer<T1, T2> : IEqualityComparer<(T1 a, T2 b)>
{
    public bool Equals((T1 a, T2 b) x, (T1 a, T2 b) y) => Equals(x.a, y.a) || Equals(x.b, y.b);
    public int GetHashCode((T1 a, T2 b) obj) => throw new NotSupportedException();
}

订阅者接收到准备好接收下一个缓冲区时调用的 Ready() 函数。我没有在同一个线程上观察每个缓冲区以避免循环,但我想你可以在其他地方打破它,如果你需要在同一个线程上处理每个缓冲区。

于 2018-10-11T19:37:01.000 回答