6

我有一个对对象集合进行操作的任务队列(例如,假设对象是地址簿中的条目)。

一个示例任务可能是“将 Joe 的电话号码更新为 888-555-1212”。

队列中可能同时有多个“更新乔的电话号码...”任务,但电话号码不同。在这种情况下,必须应用更新以确保最后状态正确(不,为了争论,不可能将时间戳放在任务上,并将时间戳放在地址簿条目上并丢弃过时的任务)。

使用 Joe 的更新来为 Jane 应用更新是安全的。

我想对队列进行多线程处理,但我需要按人同步访问。

这种东西有一个方便的图书馆吗?或者我是否降级为使用 Executor 并在 Runnable 的 run() 方法中对“名称”进行自己的同步?

4

3 回答 3

3

这个问题的一个简单但不是很完美的解决方案是在一个数组中维护一组子队列,该数组等于您正在运行的处理线程的数量。单个主线程将项目从单个主队列中拉出,并将它们添加到通过对象键的 hashCode 模数索引的子队列中(识别和关联您的任务的 hashCode)。

例如

int queueIndex = myEntity.getKey().hashCode() % queues.length;

只有一个线程处理该队列,并且同一实体的所有任务都将提交到该队列,因此不会出现竞争条件。

这个解决方案是不完美的,因为一些线程最终可能会比其他线程更大。实际上,这不太重要,但需要考虑。

简单解决方案的问题:

将项目从单个队列中拉出然后锁定受影响实体的不同内容的更简单解决方案具有竞争条件(正如 Aurand 指出的那样)。鉴于:

Master Queue [ Task1(entity1), Task2(entity1), ... ]

wheretask1task2都编辑同一个实体entity1,并且队列上存在thread1thread2操作,那么预期/期望的事件序列是:

  • Thread1 接受 task1
  • Thread1 锁定在 entity1 上
  • Thread1 编辑 entity1
  • Thread1 解锁 entity1
  • Thread2 接受 task2
  • Thread2 锁定 entity1
  • Thread2 编辑 entity1
  • Thread2 解锁 entity1

不幸的是,即使锁是线程的 run 方法的第一条语句,也有可能发生以下序列:

  • Thread1 接受 task1
  • Thread2 接受 task2
  • Thread2 锁定 entity1
  • Thread2 编辑 entity1
  • Thread2 解锁 entity1
  • Thread1 锁定 entity1
  • Thread1 编辑 entity1
  • Thread1 解锁 entity1

为了避免这种情况,每个线程在从队列中获取任务之前必须锁定某些东西(比如队列),然后在仍然持有父锁的同时获取实体上的锁。但是,您不想在持有此父锁并等待获取实体锁时阻塞所有内容,因此您只需尝试获取实体锁,然后在获取失败时处理(可能将其放入另一个队列) . 总体而言,情况变得不平凡。

于 2013-08-21T17:07:05.353 回答
0

这种冲突总是通过为每个对象分配一个版本来解决。每次更新版本都会增加。因此,如果一个更新出现在错误的时间,它可能会被取消或延迟。无论如何,您应该有办法决定哪个更新是第一个,哪个是第二个。这种方法称为乐观锁定

于 2013-08-25T11:17:10.730 回答
-1

一种可能的解决方案

假设某个任务由某个类描述

class Task {
  Integer taskGroup;
  // other
}

其中 taskGroup 是一个 ID,它标识必须按到达顺序处理的任务(在您的示例中,每个“名称”可以定义自己的任务组 - 或更一般地 - 具有相同名称的任务属于同一个任务组)。

mainTaskQueue表示一个List任务对象。然后

  • 创建一个Map<Integer,List<Task>>,说taskGroupsQueues
  • 为每个 taskGroup 创建一个taskGroupsQueues.get(taskGroup)按顺序操作的线程。
  • 主线程task从您的主任务列表中删除 amainTaskQueue并将其附加到taskGroups.get(task.taskGroup)
  • 将任务从主队列移动到单个队列并从单个队列中获取必须同步。

换句话说:同名的任务在同一个线程上执行。

请注意,如果主线程执行任务的分发,那么他也可能执行某种负载平衡,即如果一个任务由于顺序一致性没有被强制到特定队列,则该任务应该进入 shortes 队列。但是,它可能成为单线程问题是您的问题所固有的 - 即当您只有属于同一个 taskGroup 的任务(在您的案例名称中)时。

另一种可能的解决方案 (未经测试,只是一个建议)

正如increment1s 帖子和Aurands 评论中所指出的,在tread 内的taskGroup(名称)上的同步存在一些问题。基本上:为时已晚,因为执行程序可能已经启动了两个尝试以相同名称同步的线程。但是,您可以尝试确保执行者级别的执行顺序。例如,请参阅这篇文章:Java Executors:如何设置任务优先级?(它引用了传递给执行程序的 PriorityBlockingQueue)。

于 2013-08-20T19:56:00.590 回答