4

我需要为审批流程建模。之前很简单。两个角色必须批准某件事,然后我们可以继续下一步:

public class Approved
{
    public string ApproverRole;
}

var approvals = Subscribe<Approved>();

var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP");
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP");

var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create);

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

但现在有一个新要求:批准某事所需的角色数量可能会有所不同:

public class ApprovalRequested
{
    public string[] Roles;
}
var approvalRequest = Subscribe<ApprovalRequested>().Take(1);
var approvals = Subscribe<Approved>();

var approvedByAll = ???;

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

我觉得我在这里遗漏了一些非常明显的东西......谁能指出我正确的方向?

编辑

澄清一下:批准过程是基于每个项目的。批准到达的顺序是未定义的。我们不在乎一个角色是否多次批准一个项目。

4

2 回答 2

2

在当前版本的 Rx(我从 NuGet 获得)中,有一个版本Zip()接受一个可观察对象的集合并返回一个可观察的集合。有了它,你可以做这样的事情:

string[] requiredApprovals = …;

var approvedByAll = requiredApprovals
    .Select(required => approvals.Where(a => a.ApproverRole == required))
    .Zip();

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

但正如@Enigmativity 指出的那样,这只有在您可以确定每个人都以相同的顺序批准并且所有项目最终将被所有必需角色批准的情况下才有效。如果没有,您将需要一些比Zip().

于 2012-09-22T08:32:37.787 回答
2

问题本质上可以简化为Set从值流中创建一个值,其中值可能是无序的或本质上很多。

如果 N 是集合的基数,我们可以简单地假设该过程将在至少 N 类型的值(在这种情况下为角色)被推送之前不会继续。

这是 Zip 运算符的示例解决方案;也许这可以让你开始:

    public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables)
    {
        return Observable.Create<IList<T>>(observer =>
        {
            List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>()));

            return new CompositeDisposable(observables.Select((o, i) => 
                o.Subscribe(value =>
                {
                    lock (store)
                    {
                        store[i].Add(value);

                        if (store.All(list => list.Count > 0))
                        {
                            observer.OnNext(store.Select(list => list[0]).ToList());
                            store.ForEach(list => list.RemoveAt(0));
                        }
                    }
                }))
            );
        });
    }

测试:

        Observable.Interval(TimeSpan.FromSeconds(0.5))
                  .GroupBy(i => i % 3)
                  .Select(gr => gr.AsObservable())
                  .Buffer(3)                      
                  .SelectMany(set => set.Zip())
                  .Subscribe(v => Console.WriteLine(String.Join(",", v)));

这里的一个问题是,在形成组时您可能会丢失初始值,因此您可能希望通过将方法重写为IObservable<IList<T>> Zip<TKey, T>(this IGroupedObservable<TKey, T> observables).

于 2012-09-22T18:16:37.457 回答