1

我有大量的状态机。有时,需要将状态机从一种状态转移到另一种状态,这可能很便宜,也可能很昂贵,并且可能涉及数据库读取和写入等。

这些状态更改是由于来自客户端的传入命令而发生的,并且可以随时发生。

我想并行化工作负载。我想要一个队列说“将这台机器从这个状态移动到这个状态”。显然,任何一台机器的命令都需要按顺序执行,但如果我有很多线程,我可以并行移动多台机器。

每个状态机我可以有一个线程,但状态机的数量取决于数据,可能有数百或数千;我不想要每个状态机都有一个专用线程,我想要某种类型的池。

我怎样才能拥有一个工人池,但要确保每个状态机的命令都严格按顺序处理?

更新:想象一下Machine实例有一个未完成的命令列表。当线程池中的执行器完成了一个命令的消费后,如果它有更多未完成的命令,它将Machine返回到线程池的任务队列中。Machine所以问题是,当你追加第一个命令时,如何原子地放入并确保这都是线程安全的?

4

3 回答 3

2

我建议你这个场景:

  1. 创建线程池,可能是一些固定大小Executors.newFixedThreadPool
  2. 创建一些结构(可能是 a HashMap),Semaphore每个状态机都有一个。该信号量的值为 1 并且是保持序列的公平信号量
  3. 在 Runnable 中,它将完成乞求工作,只需添加semaphore.aquire()其状态机的信号量并semaphore.release()在运行方法结束时添加。

使用线程池的大小,您将控制并行度。

于 2013-01-21T08:50:36.807 回答
1

我建议另一种方法。不要使用线程池来移动状态机中的状态,而是使用线程池来处理所有事情,包括完成工作。在完成一些导致状态更改的工作后,应将状态更改事件添加到队列中。处理完状态更改后,应将另一个 do-work 事件添加到队列中。

假设状态转换是工作驱动的,反之亦然,则不可能进行顺序处理。

将信号量存储在特殊映射中的想法非常危险。地图必须同步(添加/删除 objs 是线程不安全的),并且执行搜索(可能在地图上同步)然后使用信号量的开销相对较大。

此外 - 如果你想在你的应用程序中使用多线程架构,我认为你应该一路走下去。混合不同的架构以后可能会很麻烦。

于 2013-01-21T09:16:48.740 回答
1

每台机器都有一个线程 ID。产生所需数量的线程。让所有线程贪婪地处理来自全局队列的消息。每个线程锁定当前消息的服务器以供其自己独占使用(直到它处理完当前消息及其队列中的所有消息),其他线程将该服务器的消息放入其内部队列中。

编辑:处理消息伪代码:

void handle(message)
  targetMachine = message.targetMachine
  if (targetMachine.thread != null)
    targetMachine.thread.addToQueue(message);
  else
    targetMachine.thread = this;
    process(message);
    processAllQueueMessages();
    targetMachine.thread = null;

处理消息 Java 代码:(我可能有点过于复杂了,但这应该是线程安全的)

/* class ThreadClass */
void handle(Message message)
{
  // get targetMachine from message
  targetMachine.mutexInc.aquire(); // blocking
  targetMachine.messages++;
  boolean acquired = targetMachine.mutex.aquire(); // non-blocking
  if (acquired)
    targetMachine.threadID = this.ID;
  targetMachine.mutexInc.release();
  if (!acquired)
    // can put this before release, it may speed things up
    threads[targetMachine.threadID].addToQueue(message);
  else
  {
    process(message);
    targetMachine.messages--;
    while (true)
    {
      while (!queue.empty())
      {
        process(queue.pop());
        targetMachine.messages--;
      }
      targetMachine.mutexInc.acquire(); // blocking
      if (targetMachine.messages > 0)
      {
        targetMachine.mutexInc.release();
        Thread.sleep(1);
      }
      else
        break;
    }
    targetMachine.mutex.release();
    targetMachine.mutexInc.release();
  }
}
于 2013-01-21T09:38:07.277 回答