3

我正在对 Reactive Extensions 进行大量试验,现在我正在尝试创建一个系统,在该系统中,我可以对程序进行排队并以我想要的任何方式执行它们,同时能够向订阅者发送通知。

我目前将我的数据库访问封装在一个 UserAccess 类中,该类公开了添加用户的方法。在该方法中,我想排队一个将用户添加到数据库的操作。所以我创建了一个 T 类的 JobProcessor,它公开了一个方法 QueueJob(Action) 并让我的用户实现这个类。我的问题是我看不到如何从 Observable 的 OnNext 方法中调用 Action,因为该操作需要一个 User 参数。

我的攻角一定是错的,一定是我对设计的把握有问题。例如,我知道我应该以某种方式将我的用户传递给 QueueJob 过程,但我不知道如何以一种干净的方式进行操作。

    public class UserAccess : JobProcessor<User>
    {
        public void AddUser(User user)
        {
            QueueJob(usr =>
                     {
                         using (var db = new CenterPlaceModelContainer())
                         {
                             db.Users.Add(usr);
                         }

                     });
         [...]

    public abstract class JobProcessor<T>
    {
        // Either Subject<T> or Subject<Action<T>>
        private Subject<Action<T>> JobSubject = new Subject<Action<T>>();

        public JobProcessor()
        {
            JobSubject
            /* Insert Rx Operators Here */
            .Subscribe(OnJobNext, OnJobError, OnJobComplete);
        }

        private void OnJobNext(Action<T> action)
        {
            // ???
        }

        private void OnJobError(Exception exception)
        {

        }

        private void OnJobComplete()
        {

        }

        public void QueueJob(Action<T> action)
        {
            JobSubject.OnNext(action);
        }
    }

编辑 1:

我试图将 QueueJob 的签名更改为

QueueJob(T entity, Action<T> action)

现在我可以做

QueueJob(user, usr => { ... } );

但这似乎不是很直观。我还没有看到很多框架可以同时传递实体和动作。有了这个,我可能也不需要 JobProcessor。

编辑 2: 我将 JobProcessor 的主题类型更改为主题,完全删除了 T。因为我可以在外部引用它,所以不需要在过程中包含用户。现在唯一的问题是如果我传递给QueueJob的User的action在Action执行的实际时间之间发生了变化,用户会有修改的信息。不受欢迎,但我想我会继续寻找解决方案。

我的代码现在是(用于示例的缓冲区):

public abstract class JobProcessor
{
   public Subject<Action> JobSubject = new Subject<Action>();

   public JobProcessor()
   {
       JobSubject
           .Buffer(3)
           .Subscribe(OnJobNext, OnJobError, OnJobComplete);
   }

   private void OnJobNext(IList<Action> actionsList)
   {
       foreach (var element in actionsList)
        {
            element();
        }
   }

   private void OnJobError(Exception exception)
   {

   }

   private void OnJobComplete()
   {

   }

   public void QueueJob(Action action)
   {
       JobSubject.OnNext(action);
   }
}
4

4 回答 4

1

我最初的反应是 IObservable 通常最适合创建不可变数据结构的序列,而不是方法指针/委托/动作。

接下来,我建议如果您尝试“调度”以队列方式处理的操作,那么 Rx 中的 IScheduler 实现似乎非常合适!

或者,如果您实际上是在尝试创建一个 ProduceConsumer 队列,那么我认为 Rx 实际上并不是最适合这个的。即,如果您将一堆消息放入队列,然后让一些消费者读取这些消息并处理它们,我会寻找不同的框架。

于 2013-03-26T09:48:21.067 回答
1

首先,我必须同意 Lee 和 NSGaga 的观点,即您可能不想这样做 - 生产者/消费者队列的其他模式与您想要完成的(我认为)更加一致这里。

也就是说,由于我永远无法抗拒挑战……通过一些细微的调整,您可以消除“我将什么传递给行动?”的直接问题。通过捕获传入的用户参数并将其直接设置Action- 这是您的代码并进行了一些修改:

public class UserAccess : JobProcessor
{
    public void AddUser(User user)
    {
        QueueJob(() =>
                 {
                     using (var db = new CenterPlaceModelContainer())
                     {
                         db.Users.Add(user);
                     }

                 });
     [...]

public abstract class JobProcessor
{
    // Subject<Action>
    private Subject<Action> JobSubject = new Subject<Action>();

    public JobProcessor()
    {
        JobSubject
        /* Insert Rx Operators Here */
        .Subscribe(OnJobNext, OnJobError, OnJobComplete);
    }

    private void OnJobNext(Action action)
    {
        // Log something saying "Yo, I'm executing an action" here?
        action();
    }

    private void OnJobError(Exception exception)
    {
        // Log something saying "Yo, something broke" here?
    }

    private void OnJobComplete()
    {
        // Log something saying "Yo, we shut down" here?
    }

    public void QueueJob(Action action)
    {
        JobSubject.OnNext(action);
    }
}
于 2013-03-26T14:20:41.233 回答
1

坦率地说,我不确定您的“目标”是什么-但我认为您有点倒退了...

通常主题是通过诸如
IObservable<Action<T>> NewJob {get{return _subject;}}
...或其他东西的属性暴露出来的。(主题变得可观察 - 主题本质上是双重的 - 以及为什么它是特定的 - 并且有点争议 - 但适合玩耍等)

你只是OnNext从课堂内部打电话——就像你做的那样。

但是您通常不会自己订阅 observable
......您让外部用户通过“挂钩”到您的属性中来做到这一点 - 并定义 subscribe - 这会在他们到达时为他们获取新项目。

这当然是简化的,有很多案例和很多用途,但这可能会有所帮助,我希望

于 2013-03-26T01:02:11.400 回答
0

我完成了我的设计并找到了我喜欢的东西。如果其他人需要,这是代码。

public class JobProcessor<T> : IDisposable where T : new()
{
    private ISubject<Action<T>> jobsProcessor = new Subject<Action<T>>();

    private IDisposable disposer;

    private T _jobProvider = new T();

    public JobProcessor(Func<ISubject<Action<T>>, IObservable<IEnumerable<Action<T>>>> initializer)
    {
        Console.WriteLine("Entering JobProcessor Constructor");

        disposer = initializer(jobsProcessor)
            .Subscribe(OnJobsNext, OnJobsError, OnJobsComplete);

        Console.WriteLine("Leaving JobProcessor Constructor");
    }

    private void OnJobsNext(IEnumerable<Action<T>> actions)
    {
        Debug.WriteLine("Entering OnJobsNext");

        foreach (var action in actions)
        {
            action(_jobProvider);
        }

        Debug.WriteLine("Leaving OnJobsNext");
    }

    private void OnJobsError(Exception ex)
    {
        Debug.WriteLine("Entering OnJobsError");

        Debug.WriteLine(ex.Message);

        Debug.WriteLine("Leaving OnJobsError");
    }

    private void OnJobsComplete()
    {
        Debug.WriteLine("Entering OnJobsComplete");

        Debug.WriteLine("Leaving OnJobsComplete");
    }

    public void QueueJob(Action<T> action)
    {
        Debug.WriteLine("Entering QueueJobs");

        jobsProcessor.OnNext(action);

        Debug.WriteLine("Leaving QueueJobs");
    }

    public void Dispose()
    {
        disposer.Dispose();
    }
}

我选择了一个通用的 make 来支持分层架构,我可以在一个并发层中使用 JobProcessor,在那里我可以选择执行的快慢。JobProcessor 构造函数接受一个 Func,用于在代码中的其他位置声明 Observable 序列,并生成一个处理器,该处理器按照序列描述的顺序执行作业。OnNext 接受一个 IEnumerable> 以便能够支持像 .Buffer(3) 这样同时返回一批动作的序列。不利的一面是,当创建一次返回单个动作的序列时,我需要这样做

var x = new JobProcessor<DatabaseAccess<User>>(subject => subject.Select(action => action.Yield()));

T 的 Yield() 扩展方法返回单个元素的可枚举。我在这里找到了将单个项目作为 IEnumerable<T> 传递

于 2013-03-27T13:31:22.200 回答