2

我最近重构了一段用于生成唯一负数的代码。
编辑:多个线程获取这些 id 并作为键添加到数据库;数字需要为负数才能轻松识别 - 在测试会话结束时,它们会从数据库中删除。

我的 Java 算法如下所示:

private final Set<Integer> seen = Collections.synchronizedSet(new HashSet<Integer>());
public Integer generateUniqueNegativeIds() {
    int result = 0;
    do {
        result = random.nextInt();
        if (result > 0) {
            result *= -1;
        }
    } while (!seen.add(result));
    return result;
}

上面的代码结构,加上它对集合和“重试”循环的推测性添加,让我觉得有一个等效的非阻塞算法可以用任何原子变量替换同步集合。

我做了几次尝试使用原子变量重写,但都没有通过多线程攻击测试。

有没有优雅的非阻塞等价物?

编辑:出于好奇,这是使用原子整数作为保护的有缺陷的尝试

private final AtomicInteger atomi = new AtomicInteger(0);
public Integer generateUniqueNegativeIdsWithAtomicAlgo() {
    boolean added = false;
    int result = 0;
    do {
        result = random.nextInt();
        if (result > 0) {
            result *= -1;
        }
        if (atomi.compareAndSet(0, result)) {
            added = cache.add(result);
        }   
    } while (!added);
    return atomi.getAndSet(0);
}

编辑:下面的测试工具:

public static void main(String[] args) {
    final int NUMBER_OF_THREADS = 10000;
    final Set<Integer> uniques = Collections.synchronizedSet(new HashSet<Integer>());
    final List<Integer> positives = Collections.synchronizedList(new ArrayList<Integer>());
    final NegativeUniqueIdGenerator nuig = new NegativeUniqueIdGenerator();
    Thread[] workers = new Thread[NUMBER_OF_THREADS];
    long start = System.nanoTime();
    for (int i = 0; i < workers.length; i++) {
        Runnable runnable = new Runnable() {
            public void run() {
                int number = nuig.generateUniqueNegativeIds();
                if (number > 0) {
                    positives.add(number);
                }
                uniques.add(number);
            }
        };
        workers[i] = new Thread(runnable);
        workers[i].start();
    }
    for (int i = 0; i < workers.length; i++) {
        try {
            workers[i].join();
        } catch (InterruptedException ie) {}
    }
    long end = System.nanoTime();
    System.out.println(String.format("duration = %dns", (end - start)));
    System.out.println(String.format("#threads = %d", NUMBER_OF_THREADS));
    System.out.println(String.format("#uniques = %d", uniques.size()));
    System.out.println(String.format("#positives = %d", positives.size()));
    System.out.println(String.format("#duplicates = %d", NUMBER_OF_THREADS - uniques.size()));
    System.out.println(String.format("ratio = %f",
            ((double) NUMBER_OF_THREADS - uniques.size())
                    / NUMBER_OF_THREADS));
    assert uniques.size() == NUMBER_OF_THREADS;
}
4

8 回答 8

9

如果您不关心随机性,您可以减少一个计数器,如下所示:

private final AtomicInteger ai=new AtomicInteger(0);

public int nextID() {
  return ai.addAndGet(-1);
}

编辑:

对于随机数,您可以使用您的解决方案并使用例如。ConcurrentHashMap 或 ConcurrentSkipListSet 而不是 synchronizedSet。您必须确保不同的线程使用随机生成器的不同实例,并且这些生成器不相关。

于 2009-02-24T00:06:40.833 回答
6

建议使用计数器的其他答案非常好,但如果不可预测性(或至少非平凡的可预测性)重要,那么您的原始算法应该没问题。

为什么?

基本上,你会得到一个重复整数的概率是非常非常(非常)(非常)小,大约是 1 除以你还没有看到的整数个数。如果您已经生成N了数字,则算法的预期运行时间近似线性,N系数为 1/2^32,这意味着您必须生成超过十亿个数字才能使预期运行时间超过 2 次迭代循环!在实践中,检查集合中是否存在某个数字将比重复循环的可能性更能延长算法的运行时间(好吧,除非你使用HashSet可能 - 我忘记了它的渐近运行时间是什么)。

对于它的价值,确切的预期循环迭代次数是

2^64/(2^32 - N)^2

在您生成一百万个数字后,结果为 1.00047 - 这意味着,例如,要生成第 1,000,001 到第 1,002,000 个数字,您可能会在所有这些调用中得到一个重复数字total 。

于 2009-02-24T00:38:07.747 回答
3

据我所知,所有列出的需求的优雅解决方案只是从 -1 开始递减一个值。但是,我怀疑您没有列出所有要求。

于 2009-02-24T00:03:30.747 回答
2

根据您给出的要求,我个人只会使用中等质量的随机数生成器,您知道它不会在您需要的唯一数字数量内产生重复。除非您有未提及的额外要求,否则保留所有先前生成的数字的集合似乎有点过分。

例如,使用 32 位 XORShift 生成器将在重复模式之前以“随机”顺序生成所有 2^31 个负 4 字节整数。如果您需要比这更多的数字,您可能不希望将它们放在哈希集中。所以像这样的东西(警告:未经测试的头顶代码......):

int seed = (int) System.nanoTime();
final int origSeed = seed;

public int nextUniqueNegativeNumber() {
  int n = seed;
  do {
    n ^= (n << 13);
    n ^= (n >>> 17);
    n ^= (n << 5);
    seed = n;
    if (n == origSeed) {
      throw new InternalError("Run out of numbers!");
    }
  } while (n > 0);
  return n;
}

如果需要并发,我会留给读者将“种子”转换为使用 AtomicInteger ......

编辑:实际上,为了优化并发情况,您可能只想在获得下一个负数后写回“种子” 。

好的,根据大众的需求,原子版本将是这样的:

  AtomicInteger seed = new AtomicInteger((int) System.nanoTime());

  public int nextUniqueNegativeNumber() {
    int oldVal, n;
    do {
      do {
        oldVal = seed.get();
        n = oldVal ^ (oldVal << 13); // Added correction
        n ^= (n >>> 17);
        n ^= (n << 5);
      } while (seed.getAndSet(n) != oldVal);
    } while (n > 0);
    return n;
  }
于 2009-02-24T05:24:24.253 回答
2

试试这个:http ://www.javaconcurrencyinpractice.com/listings.html

于 2009-02-25T03:04:16.687 回答
2

我会将 OP 的答案与 jpalecek 的答案结合起来给出:

private final AtomicInteger ai=new AtomicInteger(0);

public int nextID() {
    return ai.addAndGet(-1 - random.nextInt(1000));
}
于 2009-02-25T03:49:41.990 回答
2

高级库有一个可以使用的 NonBlockingHashSet。只需将您的 set 实例替换为 NonBlockingHashSet 的实例即可。

http://sourceforge.net/projects/high-scale-lib

于 2009-02-25T04:26:15.477 回答
1

我认为你的意思是非阻塞和可重入的。

编辑:(替换我原来的,因为这要好得多)

一个基于线程的选项实际上非常高效(至少比原来的性能更高)。如果您创建了一个弱散列映射,其中一个线程对象作为“键”并作为“值”,则放置一个能够从特定范围内制造一系列例如 1000 个数字的对象。

这样你就可以为每个线程分配它自己的 1000 个数字范围来分配。当对象的数字用完时,让它返回一个无效的数字(0?),您就会知道您必须为该对象分配一个新的范围。

任何地方都不会同步任何东西(编辑:哎呀,有点错误。见下文),弱哈希映射会自动释放被破坏的线程(无需特殊维护),最慢的部分将是线程的单个哈希查找这实际上非常快。

获取当前正在运行的线程:

Thread currThread=Thread.getCurrentThread();

我也可能是错的,您可能只需要使方法同步,那么这将起作用:

int n=-1;
synchronized int getNegativeNumber() {
    return n--;
}

我继续写了它(有时这些东西在我写之前一直卡在我的脑海里,只要我写了,我还不如把它贴出来)。未经测试,但我很确定它应该是关闭的,如果不是开箱即用的话。只有一个类和一个静态方法可以调用以获得唯一的负数。(哦,我确实需要一些同步,但它只会被使用 0.001% 的时间)。

希望有一种方法可以创建链接代码块,而不是像这样在不离开站点的情况下内联 - 对不起长度。

package test;

import java.util.WeakHashMap;

public class GenNumber {
    // Static implementation goes first.
    private static int next = -1;
    private static final int range = 1000;

    private static WeakHashMap<Thread, GenNumber> threads = new WeakHashMap<Thread, GenNumber>();

    /**
     * Generate a unique random number quickly without blocking
     * 
     * @return the random number < 0
     */
    public static int getUniqueNumber() {
        Thread current = Thread.currentThread();
        int next = 0;

        // Have to synchronize some, but let's get the very
        // common scenario out of the way first without any
        // synchronization. This will be very fast, and will
        // be the case 99.9% of the time (as long as range=1000)
        GenNumber gn = threads.get(current);
        if (gn != null) {
            next = gn.getNext();
            if (next != 0)
                return next;
        }

        // Either the thread wasn't found, or the range was
        // used up. Do the rest in a synchronized block.
        // The three lines tagged with the comment "*" have
        // the potential to collide if this wasn't synchronized.
        synchronized (threads) {
            if (gn == null) {
                gn = new GenNumber(next -= range); // *
                threads.put(current, gn); // *
                return gn.getNext(); // can't fail this time
            }
            // now we know the range has run out

            gn.setStart(next -= range); // *
            return gn.getNext();
        }
    }

    // Instance implementation (all private, nobody needs to see this)
    private int start;
    private int count;

    private GenNumber(int start) {
        setStart(start);
    }

    private int getNext() {
        if (count < range)
            return start - count;
        return 0;
    }

    private GenNumber setStart(int start) {
        this.start = start;
        return this;
    }
}

让我感到震惊的是,可以用两个在不同对象上同步的非常小的同步块代替一个大的同步块,一个用于“+= count”,一个用于 .put()。如果碰撞仍然让你慢下来,那可能会有所帮助(尽管如果碰撞仍然让你慢下来(真的???)你最好只增加计数。

于 2009-02-24T00:29:28.720 回答