7

当我向外部系统发送请求时,我想实施严格的循环调度。有两个外部系统服务器。第一个请求应该发送到“System1”,第二个请求必须发送到“System2”,下一个请求必须发送到“System1”,依此类推。

由于我只有两台服务器可以向其发送请求,并且由于我希望在没有任何阻塞和上下文切换的情况下获得最大性能,因此我选择了 AtomicBoolean,因为它使用了 CAS 操作。

我的实现类

1.RoundRobinTest.java

package com.concurrency;

import java.util.Iterator;

public class RoundRobinTest 
{
    public static void main(String[] args) 
    {
        for (int i = 0; i < 500; i++) 
        {
            new Thread(new RoundRobinLogic()).start();
        }
        try 
        {
            // Giving a few seconds for the threads to complete
            Thread.currentThread().sleep(2000);
            Iterator<String> output = RoundRobinLogic.output.iterator();
            int i=0;
            while (output.hasNext()) 
            {
                System.out.println(i+++":"+output.next());
                // Sleeping after each out.print 
                Thread.currentThread().sleep(20);
            }
        } 
        catch (Exception ex) 
        {
            // do nothing
        }
    }

}

2.RoundRobinLogic.java(带有静态AtomicBoolean对象的类)

package com.concurrency;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

public class RoundRobinLogic implements Runnable 
{
    private static AtomicBoolean bool = new AtomicBoolean(true);

    public static Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run() 
    {
        if(bool.getAndSet(false))
        {
            // Sending the request to first system
            output.add("Request to System1");
        }
        else if(!bool.getAndSet(true))
        {
            // Sending the request to first system
            output.add("Request to System2");
        }       
    }

}

输出:


......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................

请求 318 和 319 已发送到同一台服务器,在这种情况下 AtomicBoolean 失败。对于我的应用程序,一次可能有 1000-2000 个线程访问共享对象。从实践中的 Java 并发来看,我已经看到了以下内容。

在高争用级别上,锁定往往优于原子变量,但在更现实的争用级别上,原子变量优于锁。这是因为锁通过挂起线程来对争用做出反应,从而减少 CPU 使用率和共享内存总线上的同步流量。 低到中等的争用,原子提供更好的可扩展性;对于高争用,锁提供更好的争用避免。(基于 CAS 的算法在单 CPU 系统上也优于基于锁的算法,因为 CAS 总是在单 CPU 系统上成功,除非在读取修改写入操作中间线程被抢占的不太可能的情况。)

现在我有以下问题。

  1. 有没有其他高效的非阻塞方式,来实现轮询请求发送。
  2. 在激烈的争论下,AtomicBoolean 是否有可能失败?我的理解是,性能/吞吐量可能会由于激烈的争用而下降。但是在上面的例子中 AtomicBoolean 失败了。为什么 ?
4

3 回答 3

11

作为约翰的回答的旁白,一个更清洁、也许更有效的实现RoundRobinLogic将使用AtomicIntegeror AtomicLong。这消除了将 的当前值AtomicBoolean与新值进行比较的需要:

class RoundRobinLogic implements Runnable
{
    private static final AtomicInteger systemIndex = new AtomicInteger(1);

    public static final Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run()
    {
        if (systemIndex.incrementAndGet() % 2 == 0) {
            // Sending the request to first system
            output.add("Request to System1");
        } else {
            // Sending the request to second system
            output.add("Request to System2");
        }
    }
}

这将允许您相当轻松地将其扩展到其他系统:

class RemoteSystem
{
    private final String name;

    RemoteSystem(String name)
    {
        this.name = name;
    }

    public String name()
    {
        return name;
    }
}

class RoundRobinLogic implements Runnable
{
    private static final AtomicInteger systemIndex = new AtomicInteger(1);

    private static final RemoteSystem[] systems = new RemoteSystem[] {
        new RemoteSystem("System1"),
        new RemoteSystem("System2"),
        new RemoteSystem("System3"),
        new RemoteSystem("System4"),
    };

    public static final Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run()
    {
        RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];

        // Sending the request to right system
        output.add("Request to " + system.name());
    }
}
于 2015-04-13T19:41:05.263 回答
4

假设您没有使用Queue实际系统的 API,而是使用 API。我看到的问题与以下有关:

    if(bool.getAndSet(false))
    {
        // Sending the request to first system
        output.add("Request to System1");
    }
    else if(!bool.getAndSet(true))
    {
        // Sending the request to second system
        output.add("Request to System2");
    }     

如果两个条件都失败了怎么办?这怎么可能?想象一下,在输入第一个ifbool时true。然后您尝试将其设置为 false,但另一个线程击败了您,因此您看到了false. 然后你试试else if. 现在,如果else if你到达那里的时间是假的,但设置为真,买另一个线程怎么办?在这种情况下,两次尝试都将失败。

我会将其重构为:

while(true){
  boolean current = bool.get();
  if(bool.compareAndSet(current, !current){
     if(current){ 
        //send request to first system
     } else {
        //send request to second system
     }
     return;
  }
}

正如 Sean Bright 所提到的,因为您要添加到队列中,即使您像我上面那样实现它,您仍然可能会看到一些乱序的值,因为队列本身不是与 AtomicBoolean 同步的一部分。

于 2015-04-13T19:25:48.313 回答
1

由于您的要求基本上是:实现一个原子操作

  1. n评估和翻转布尔值(或在通用服务器情况下评估模数并增加计数器)
  2. 根据步骤 1 的结果将条目插入队列,

您无法通过使第 1 步和第 2 步单独线程安全来真正实现这一点;您必须将步骤 1 和 2同步在一起。

这是一个应该有效的简单实现:

import java.util.LinkedList;
import java.util.Queue;

public class RoundRobinLogic implements Runnable 
{
    private static boolean bool = true;
    public static final Queue<String> OUTPUT = new LinkedList<String>();
    private static final Object LOCK = new Object();

    @Override
    public void run() {
        synchronized (LOCK) {
            OUTPUT.add(bool ? "Request to System1" : "Request to System2");
            bool = !bool;
        }
    }
}

关于你的问题:

  1. 如果您需要将两个高于处理器级别的操作同步在一起,则无法避免阻塞。中的类java.util.concurrent.atomic使用机器级原子指令,这就是使用这些类的代码(通常取决于平台)不需要阻塞的原因。
  2. 在您的实施中,AtomicBoolean没有失败。相反,在读取布尔值和将元素添加到队列之间存在竞争条件。
于 2015-04-13T21:15:25.960 回答