我正在对 Reactive Extensions 进行大量试验,现在我正在尝试创建一个系统,在该系统中,我可以对程序进行排队并以我想要的任何方式执行它们,同时能够向订阅者发送通知。
我目前将我的数据库访问封装在一个 UserAccess 类中,该类公开了添加用户的方法。在该方法中,我想排队一个将用户添加到数据库的操作。所以我创建了一个 T 类的 JobProcessor,它公开了一个方法 QueueJob(Action) 并让我的用户实现这个类。我的问题是我看不到如何从 Observable 的 OnNext 方法中调用 Action,因为该操作需要一个 User 参数。
我的攻角一定是错的,一定是我对设计的把握有问题。例如,我知道我应该以某种方式将我的用户传递给 QueueJob 过程,但我不知道如何以一种干净的方式进行操作。
public class UserAccess : JobProcessor<User>
{
public void AddUser(User user)
{
QueueJob(usr =>
{
using (var db = new CenterPlaceModelContainer())
{
db.Users.Add(usr);
}
});
[...]
public abstract class JobProcessor<T>
{
// Either Subject<T> or Subject<Action<T>>
private Subject<Action<T>> JobSubject = new Subject<Action<T>>();
public JobProcessor()
{
JobSubject
/* Insert Rx Operators Here */
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}
private void OnJobNext(Action<T> action)
{
// ???
}
private void OnJobError(Exception exception)
{
}
private void OnJobComplete()
{
}
public void QueueJob(Action<T> action)
{
JobSubject.OnNext(action);
}
}
编辑 1:
我试图将 QueueJob 的签名更改为
QueueJob(T entity, Action<T> action)
现在我可以做
QueueJob(user, usr => { ... } );
但这似乎不是很直观。我还没有看到很多框架可以同时传递实体和动作。有了这个,我可能也不需要 JobProcessor。
编辑 2: 我将 JobProcessor 的主题类型更改为主题,完全删除了 T。因为我可以在外部引用它,所以不需要在过程中包含用户。现在唯一的问题是如果我传递给QueueJob的User的action在Action执行的实际时间之间发生了变化,用户会有修改的信息。不受欢迎,但我想我会继续寻找解决方案。
我的代码现在是(用于示例的缓冲区):
public abstract class JobProcessor
{
public Subject<Action> JobSubject = new Subject<Action>();
public JobProcessor()
{
JobSubject
.Buffer(3)
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}
private void OnJobNext(IList<Action> actionsList)
{
foreach (var element in actionsList)
{
element();
}
}
private void OnJobError(Exception exception)
{
}
private void OnJobComplete()
{
}
public void QueueJob(Action action)
{
JobSubject.OnNext(action);
}
}