5

我正在寻找一个 java 并发习语来匹配具有最高吞吐量的大量元素。

考虑一下我有来自多个线程的“人”。每个“人”都在寻找匹配项。当它找到另一个等待匹配的“人”时,它们都被分配给彼此并被删除以进行处理。

我不想锁定一个大结构来改变状态。考虑 Person 有 getMatch 和 setMatch。在提交之前,每个人的#getMatch 为空。但是,当它们解锁(或被钓)时,它们要么已经过期,因为它们等待匹配的时间太长,要么#getMatch 不为空。

保持高吞吐量的一些问题是,如果 PersonA 与 PersonB 同时提交。它们相互匹配,但 PersonB 也匹配已经等待的 PersonC。PersonB 的状态在提交时更改为“可用”。但是当 PersonB 与 PersonC 匹配时,PersonA 需要不意外地得到 PersonB。说得通?另外,我想以一种异步工作的方式来做到这一点。换句话说,我不希望每个提交者都必须在具有 waitForMatch 类型事物的线程上保留一个 Person。

同样,我不希望请求必须在单独的线程上运行,但如果有一个额外的 match maker 线程也没关系。

似乎应该有一些成语,因为它似乎是很常见的事情。但是我的谷歌搜索已经干涸(我可能使用了错误的术语)。

更新

有几件事让我很难解决这个问题。一个是我不想在内存中有对象,我希望所有等待的候选人都在 redis 或 memcache 或类似的东西中。另一个是任何人都可以有几个可能的匹配项。考虑如下接口:

person.getId();         // lets call this an Integer
person.getFriendIds();  // a collection of other person ids

然后我有一个看起来像这样的服务器:

MatchServer:
   submit( personId, expiration ) -> void // non-blocking returns immediately
   isDone( personId ) -> boolean          // either expired or found a match
   getMatch( personId ) -> matchId        // also non-blocking

这是一个休息界面,它会使用重定向,直到你得到结果。我的第一个想法是在 MatchServer 中有一个缓存,它由诸如 redis 之类的东西支持,并为当前被锁定并被操作的对象有一个并发的弱值哈希映射。每个 personId 都将被一个具有已提交、匹配和过期等状态的持久状态对象包装。

追到现在?很简单,提交代码完成了最初的工作,它是这样的:

public void submit( Person p, long expiration ) {
    MatchStatus incoming = new MatchStatus( p.getId(), expiration );
    if ( !tryMatch( incoming, p.getFriendIds() ) )
        cache.put( p.getId(), incoming ); 
}

public boolean isDone( Integer personId ) {
    MatchStatus status = cache.get( personId );
    status.lock();
    try {
        return status.isMatched() || status.isExpired();

    } finally {
        status.unlock();
    }
}

public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
    for ( Integer friend : friends ) {
        if ( match( incoming, friend ) )
            return true;
    }

    return false;
}

private boolean match( MatchStatus incoming, Integer waitingId ) {
    CallStatus waiting = cache.get( waitingId );
    if ( waiting == null )
        return false;

    waiting.lock();
    try {
        if ( waiting.isMatched() )
            return false;

        waiting.setMatch( incoming.getId() );
        incoming.setMatch( waiting.getId() );

        return true
    } finally {
        waiting.unlock();
    }
}

所以这里的问题是,如果两个人同时进来并且他们是他们唯一的匹配项,他们就不会找到对方。比赛条件对吗?我能看到解决它的唯一方法是同步“tryMatch()”。但这会扼杀我的吞吐量。我不能让 tryMatch 无限循环,因为我需要这些调用非常短。

那么有什么更好的方法来解决这个问题呢?我想出的每一个解决方案都一次次迫使人们集中在一起,这对吞吐量来说并不是很好。例如,创建一个后台线程并使用一个阻塞队列来一次放置和接收一个传入的队列。

任何指导将不胜感激。

4

4 回答 4

2

您也许可以使用ConcurrentHashMap. 我假设您的对象具有可以匹配的键,例如 PersonA 和 PersonB 将具有“Person”键。

ConcurrentHashMap<String, Match> map = new ConcurrentHashMap<>();

void addMatch(Match match) {
    boolean success = false;
    while(!success) {
        Match oldMatch = map.remove(match.key);
        if(oldMatch != null) {
            match.setMatch(oldMatch);
            success = true;
       } else if(map.putIfAbsent(match.key, match) == null) {
            success = true;
       }
   }
}

您将继续循环,直到您将匹配项添加到地图,或者直到您删除现有匹配项并将其配对。 remove并且putIfAbsent都是原子的。

编辑:因为你想将数据卸载到磁盘上,你可以使用例如MongoDB来达到这个目的,它的findAndModify方法。如果具有该键的对象已经存在,则该命令将删除并返回它,以便您可以将旧对象与新对象配对,并可能存储与新键关联的配对;如果具有键的对象不存在,则该命令存储具有键的对象。这相当于ConcurrentHashMap除了数据存储在磁盘上而不是内存中的行为;您不必担心同时写入两个对象,因为findAndModify逻辑可以防止它们无意中占用同一个键。

如果您需要将对象序列化为 JSON,请使用Jackson 。

Mongo 有其他替代品,例如DynamoDB,尽管 Dynamo 仅对少量数据免费。

编辑:鉴于好友列表不是自反的,我认为您可以结合使用 MongoDB(或另一个具有原子更新的键值数据库)和ConcurrentHashMap.

  1. MongoDB 中的人员要么是“匹配的”,要么是“不匹配的”。(如果我说“从 MongoDB 中删除一个人”,我的意思是“将这个人的状态设置为‘匹配’。”)
  2. 当你添加一个新人时,首先ConcurrentHashMap<key, boolean>为它创建一个,可能在一个全局的ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>.
  3. 遍历新人的朋友:
  4. 如果朋友在 MongoDB 中,则使用findAndModify原子地将其设置为“matched”,然后将新人写入 MongoDB,状态为“matched”,最后将该 pair 添加到 MongoDB 中可以查询的“Pairs”集合中由最终用户。ConcurrentHashMap从全局地图中删除此人。
  5. 如果朋友不在 MongoDB 中,则检查该朋友是否已写入当前朋友的关联ConcurrentHashMap. 它有,然后什么也不做;如果没有,则检查该朋友是否有ConcurrentHashMap关联;如果是,则将与当前人员的键关联的值设置为“true”。(请注意,由于当前人无法检查自己的映射并通过一个原子操作修改朋友的映射,因此两个朋友仍然有可能写入彼此的哈希映射,但自我哈希映射检查减少了这种可能性。)
  6. 如果此人尚未匹配,则以“未匹配”状态将其写入 MongoDB,将其ConcurrentHashMap从全局映射中删除,并创建一个延迟任务,该任务将遍历所有写信给此人ConcurrentHashMap(即使用ConcurrentHashMap#keySet())。这个任务的延迟应该是随机的(例如Thread.sleep(500 * rand.nextInt(30))),这样两个朋友就不会总是同时尝试匹配。如果当前人没有需要重新检查的朋友,则不要为其创建延迟任务。
  7. 当延迟到时,为这个人创建一个新的 ConcurrentHashMap,从 MongoDB 中删除不匹配的人,然后循环回到步骤 1。如果这个人已经匹配,那么不要从 MongoDB 中删除它并终止延迟的任务。

在常见的情况下,一个人要么与朋友匹配,要么在迭代朋友列表时在没有将朋友添加到系统的情况下匹配失败(即,此人ConcurrentHashMap将是空的)。如果朋友同时写信:

同时添加 Friend1 和 Friend2。

  1. Friend1 写信给 Friend2 ConcurrentHashMap,表示他们想念彼此。
  2. Friend2 写信给 Friend1ConcurrentHashMap以表示相同(只有当 Friend2 检查 Friend1 在 Friend1 写它的同时写到它的地图时才会发生这种情况 - 通常 Friend2 会检测到 Friend1 已经写到它的地图上,所以它不会写入 Friend1 的地图)。
  3. Friend1 和 Friend2 都写入 MongoDB。Friend1 的后续任务随机延迟 5 秒,Friend2 随机延迟 15 秒。
  4. Friend1 的任务首先触发,并与 Friend2 匹配。
  5. Friend2 的任务第二次触发;Friend2 不再在 MongoDB 中,因此任务立即终止。

几个小插曲:

  1. Friend1 和 Friend2 可能都没有ConcurrentHashMaps与它们关联,例如,如果 Friend2 在 Friend1 检查映射是否在内存中时仍在初始化其哈希映射。这很好,因为 Friend2 将写入 Friend1 的哈希映射,因此我们保证最终会尝试匹配 - 至少其中一个将具有哈希映射,而另一个正在迭代,因为哈希映射创建在迭代之前。
  2. 如果两个朋友的任务同时以某种方式触发,则匹配的第二次迭代可能会失败。在这种情况下,如果一个人在 MongoDB 中处于匹配状态,则应将其从列表中删除;然后他们应该将结果列表与写信给它的朋友列表合并ConcurrentHashMap,然后下一次迭代应该使用它作为新的朋友列表。最终该人将被匹配,否则该人的“重新检查”朋友列表将被清空。
  3. 您应该增加每个后续迭代的任务延迟,以增加两个朋友的任务不会同时运行的概率(例如Thread.sleep(500 * rand.nextInt(30)),在第一次迭代、Thread.sleep(500 * rand.nextInt(60))第二次迭代、Thread.sleep(500 * rand.nextInt(90))第三次迭代等)。
  4. 在随后的迭代中,您必须在从 MongoDB 中删除人员之前创建一个新ConcurrentHashMap人员,否则您将遇到数据竞争。同样,您必须在迭代潜在匹配项时从 MongoDB 中删除一个人,否则您可能会无意中匹配它两次。

编辑:一些代码:

该方法addUnmatchedToMongo(person1)将“不匹配”的 person1 写入 MongoDB

setToMatched(friend1)用于findAndModify原子地设置friend1为“匹配”;friend1如果已经匹配或不存在,该方法将返回 false ,如果更新成功则返回 true

isMatched(friend1)如果friend1存在且匹配则返回 true,如果不存在或存在且“不匹配”则返回 false

private ConcurrentHashMap<String, ConcurrentHashMap<String, Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;

executor.execute(new Runnable() {
    public void run() {
        while(true) {
            Runnable runnable = delayQueue.take();
            executor.execute(runnable);
        }
    }
}

public static void findMatch(Person person, Collection<Person> friends) {
    findMatch(person, friends, 1);
}

public static void findMatch(Person person, Collection<Person> friends, int delayMultiplier) {
    globalMap.put(person.id, new ConcurrentHashMap<String, Person>());
    for(Person friend : friends) {
        if(**setToMatched(friend)**) {
            // write person to MongoDB in "matched" state
            // write "Pair(person, friend)" to MongoDB so it can be queried by the end user
            globalMap.remove(person.id);
            return;
        } else {
            if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
                // the existence of "friendMap" indicates another thread is currently  trying to match the friend
                ConcurrentHashMap<String, Person> friendMap = globalMap.get(friend.id);
                if(friendMap != null) {
                    friendMap.put(person.id, person);
                }
            }
        }
    }
    **addUnmatchedToMongo(person)**;
    Collection<Person> retryFriends = globalMap.remove(person.id).values();
    if(retryFriends.size() > 0) {
        delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier), person, retryFriends, delayMultiplier));
    }
}

public class DelayedRetry implements Runnable, Delayed {
    private final long delay;
    private final Person person;
    private final Collection<Person> friends;
    private final int delayMultiplier;

    public DelayedRetry(long delay, Person person, Collection<Person> friends, delayMultiplier) {
        this.delay = delay;
        this.person = person;
        this.friends = friends;
        this.delayMultiplier = delayMultiplier;
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(delay, TimeUnit.MILLISECONDS);
    }

    public void run {
        findMatch(person, friends, delayMultiplier + 1);
    }
}
于 2013-05-09T18:57:17.750 回答
1

我仍然不清楚你的匹配系统的细节,但我可以给你一些一般性的指导。

从根本上说,如果没有原子读取-修改-写入功能,您将无法同步进程。我不会讨论如何从数据库中获取它,因为它从简单(带有事务隔离的 SQL 数据库)到不可能(一些 NoSQL 数据库)不等。如果您无法从数据库中获取它,那么您别无选择,只能在内存中进行同步。

其次,您需要能够同时从可用性池中自动删除两个匹配的人。但作为同一原子操作的一部分,您还需要在将它们分配给彼此之前验证两者仍然可用。

第三,为了最大限度地提高吞吐量,您将系统设计为检测竞争条件而不是阻止它们,并在检测到竞争时实施恢复过程。

上述所有操作在内存中比在数据库中更容易(并且性能更高)。因此,如果可能的话,我会在内存中这样做。

  1. 创建一个按插入排序的内存匹配池,因此每个请求都知道哪个请求在前,哪个请求在后。(这没有必要反映请求的顺序,它只需要是它们被插入到池中的顺序。)
  2. 请求进入。请求进入内存匹配池,数据库状态更改为“正在搜索”。
  3. 请求线程在内存池中搜索较旧的匹配请求。
    1. 如果找到一个,那就是匹配。
    2. 如果没有找到,则请求线程退出。
    3. 如果在搜索时,它与较新的请求匹配,它将停止搜索并让较新的请求将其从池中删除。
  4. 匹配时,较新的请求会通知较旧的请求停止搜索,并将两个请求从池中删除。如果检测到比赛,任何检测到它的人都会停止/撤消他们正在做的事情并根据新信息继续进行。您必须设计竞争检测的顺序,以确保这种行为不会导致孤立匹配(相当于死锁),但这是完全可行的。
  5. 从池中删除匹配项后,它们的数据库状态将更新。
  6. 一个单独的工作线程按从最早到最新的顺序扫描队列,并删除过期的请求,用新的状态更新数据库。

在这个系统中,唯一的阻塞同步操作被插入到匹配池中并从匹配池中删除,这些是单独的锁。(请求线程在从匹配池中删除它的请求之前,必须获得一个锁,看看它是否还在匹配池中,如果不是,则分支到竞态恢复程序。)我相信这是理论上的您可以同步多少的限制。(好吧,我猜你还必须在池已满时阻止插入池中,但你还能做什么?如果你可以创建一个新池,那么你可以扩展现有池。)

请注意,通过对请求队列进行排序并按顺序搜索,您可以保证请求线程可以进行完整的搜索。如果它找不到搜索,那么唯一的希望是后面的请求将匹配,并且该匹配将由后面的请求线程找到。

于 2013-05-15T07:15:16.950 回答
0

在可以提出更好更简单的方法之前,我一直采用超级简单的方法。处理 BlockingQueue 的单个后台线程。它没有很大的吞吐量,但提交者不必阻塞。它还具有不需要对服务员的持久缓存进行同步的好处。我可以很容易地将 BlockingQueue 更改为持久性支持的 BlockingQueue。如果队列已满,提交者只需等待。

唯一的问题是,如果同时有这么多的提交者和轮询者,处理队列将无可救药地落后于提交者。这是泵的简化实现。match 方法只是迭代 #getFriendIds 并进行键控查找以查看该 id 的人是否存在于 redis(或其他)缓存中。如果它们在缓存中,那么它们是可匹配的。我交换彼此的 id 以匹配它们。

class HoldPump extends Thread {

    private final BlockingQueue<Incoming> queue = new ArrayBlockingQueue<>( CAPACITY );

    HoldPump() {
        super( "MatchingPump" );
    }

    public void submit( Person p ) {
        Incoming incoming = new Incoming( p.getId(), p.getFriendIds() ) );
        queueProcessing( incoming );
    }

    public void queueProcessing( Incoming incoming ) ... {
        queue.put( incoming );
    }

    @Override
    public void run() {
        try {
            while ( true ) {
                Incoming incoming = queue.take();
                tryMatch( incoming );
            }
        } catch ( InterruptedException e ) {
            Thread.interrupted();
        }
    }
}



protected void trytMatch( Incoming incoming ) {
    MatchStatus status = incoming.status;

    status.answer( incoming.holdDuration );

    for ( Integer candidate : incoming.candidates ) {
        MatchStatus waiting = waitingForMatchByPersonId.get( candidate );
        if ( waiting != null ) {
            waiting.setMatch( incoming.status.getPersonId() );
            status.setMatch( waiting.getPersonId() )
        }
    }
}

#setMatch 方法本质上表示完成条件,该条件是 MatchStatus 中可重入锁的一部分。

于 2013-05-16T18:42:41.413 回答
0

我想以异步工作的方式做到这一点

异步处理 = "person submitter" 和 "person matcher" 之间的一对逻辑队列:

  • 一个匹配请求队列
  • 另一个匹配响应队列
  • 无论是同步还是异步,核心人物匹配器算法的工作原理都是一样的
  • 换句话说,异步行为被添加为顶部的外观/装饰器,并且不会影响匹配设计。

有几件事让我很难解决这个问题。一个是我不想在内存中有对象,我希望所有等待的候选人都在 redis 或 memcache 或类似的东西中。

  • 在跳到解决方案之前,首先描述您的要求/约束​​可能会有所帮助。我会改用某种语言:“我只有 200MB 内存可用于匹配处理,并希望最大限度地提高性能。在任何时间点最多可能有 10000 个匹配项等待处理,并且匹配项在 30 分钟后超时。”
  • 您不希望存储在“内存中”,而是“在缓存中”。但是您提到的两个缓存不是100%“在内存中”吗?他们添加了什么对我来说并不明显。您认为缓存可以解决任何特定要求吗?也许,例如,您可以在 200MB 的内存中存储大量数据,并拥有一个干净、流线型的高性能算法。

我的第一个想法是在 MatchServer 中有一个缓存,它由 redis 之类的东西支持,并为当前被锁定并被操作的对象具有并发弱值哈希映射。

  • 我建议您使用并发队列和普通哈希映射:请求者插入队列,匹配器线程从队列中拉出并插入普通哈希映射。
  • 并发哈希映射仅在您同时有多个线程对其进行操作时才有用。如果只有一个线程在上面运行,那么 vanilla hash map 就很好,而且性能优越。虽然您可以让请求者直接插入并发哈希映射,但我认为这会产生过多的并发争用/锁定。
  • 在这里,您谈论的是在进行匹配之前锁定,但这不是必需的。

1. 方案一:单线程内的序列化匹配处理

  • 在一个“匹配线程”中完成所有匹配
  • 不需要任何锁定或事务 - 没有多线程争用
  • 并发队列序列化所有传入请求
  • 匹配线程一次处理一个请求,将新人插入哈希图中
  • 为新人运行匹配 - 如果找到匹配,则链接两个人,从 hashmap 中删除每个人,并通过响应队列将结果返回给两个请求者

2.解决方案2:匹配处理的多线程,带有最小临界区的悲观锁定:

  • 获取对下一个人的引用(从队列中)。
  • 不加锁地为那个人运行匹配算法(即乐观匹配处理)
  • 确定最佳候选匹配(如果有)
  • 然后进行悲观的记录写入和存储:
    • 然后锁定对
    • 确认它们在锁定状态后仍然是不匹配的
    • 将它们链接为匹配项,如果需要,更新到持久存储
    • 开锁

3. 解决方案3:使用事务持久存储和*乐观锁定的多线程匹配处理 *

  • 修改人员类,添加匹配时间戳或版本号。这将充当乐观锁定控制标志。
  • 获取对下一个人的引用(从队列中,从持久存储中获得可选的附加“读取”)。
  • 不加锁地为那个人运行匹配算法(即乐观匹配处理)
  • 确定最佳候选匹配(如果有)
  • 然后进行乐观记录写入和存储:
    • 启动一个事务(可能是一个 JTA/DB 事务),
    • 将两个人对象链接为匹配项
    • 更新两个人对象上的匹配时间戳/版本号(JPA 通过 @Version 注释自动执行此操作)
    • 更新时间戳/版本号与旧值匹配的持久存储(例如 DB)(JPA 通过 @Version 注释自动执行此操作)
    • 提交交易
于 2013-05-20T08:59:07.233 回答