12

我有一个基于数组的对象,它实现了以下接口:

public interface PairSupplier<Q, E> {
     public int size();

     public Pair<Q, E> get(int index);
}

我想在它上面创建一个特定的迭代器:

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     //some magic
}

接下来的方法中,我想从PairSupplier返回一些元素。

这个元素对于线程来说应该是唯一的,其他线程不应该有这个元素。

由于 PairSupplier 有最终大小,这种情况并不总是可能的,但我想接近它。

元素的顺序无关紧要,线程可以在不同的时间获取相同的元素。

示例: 2 Threads, 5 elements-{1,2,3,4,5}

Thread 1  | Thread 2
   1           2
   3           4
   5           1
   3           2
   4           5

我的解决方案

我创建了 AtomicInteger 索引,我在每次下一次调用时递增。

PairSupplier pairs;
AtomicInteger index;

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     int position = index.incrementAndGet() % pairs.size;
     if (position < 0) {
          position *= -1;
          position = pairs.size - position;
     }
     return pairs.get(position);
}

索引在所有线程之间共享。

我发现这个解决方案不可扩展(因为所有线程都在增加),也许有人有更好的想法?

该迭代器将被50-1000 个线程使用。

4

7 回答 7

4

您有一条Pair必须在所有线程之间共享的信息(“有人已经拿过这个了吗?”)。所以对于一般情况,你被卡住了。但是,如果您知道数组的大小和线程数,您可以使用存储桶来减轻痛苦。

假设我们知道将有 1,000,000 个数组元素和 1,000 个线程。为每个线程分配一个范围(线程#1 获取元素 0-999 等)。现在不再是 1,000 个线程争用一个 AtomicInteger,您可以完全没有争用!

如果您可以确定您的所有线程将以大致相同的速度运行,那将是有效的。如果您需要处理有时线程 #1 忙于做其他事情而线程 #2 空闲的情况,您可以稍微修改您的存储桶模式:每个存储桶都有一个 AtomicInteger。现在线程一般只会和自己竞争,但是如果他们的桶是空的,他们可以移动到下一个桶。

于 2013-10-15T13:53:44.013 回答
4

您的问题详细信息不明确-您的示例表明可以处理相同的两个线程,Pair但您在描述中另有说明。

由于更难实现,我将提供一个每个线程Iterable<Pair<Q,E>>交付Pair一个,直到供应商周期 - 然后它会重复。

public interface Supplier<T> {
  public int size();

  public T get(int index);

}

public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}

public class IterableSupplier<T> implements Iterable<T> {
  // The common supplier to use across all threads.
  final Supplier<T> supplier;
  // The atomic counter.
  final AtomicInteger i = new AtomicInteger();

  public IterableSupplier(Supplier<T> supplier) {
    this.supplier = supplier;
  }

  @Override
  public Iterator<T> iterator() {
    /**
     * You may create a NEW iterator for each thread while they all share supplier
     * and Will therefore distribute each Pair between different threads.
     *
     * You may also share the same iterator across multiple threads.
     *
     * No two threads will get the same pair twice unless the sequence cycles.
     */
    return new ThreadSafeIterator();
  }

  private class ThreadSafeIterator implements Iterator<T> {
    @Override
    public boolean hasNext() {
      /**
       * Always true.
       */
      return true;
    }

    private int pickNext() {
      // Just grab one atomically.
      int pick = i.incrementAndGet();
      // Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
      int actual = pick % supplier.size();
      if (pick != actual) {
        // So long as someone has a success before we overflow int we're good.
        i.compareAndSet(pick, actual);
      }
      return actual;
    }

    @Override
    public T next() {
      return supplier.get(pickNext());
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove not supported.");
    }

  }

}

注意:我已经稍微调整了代码以适应这两种情况。您可以采用每个线程或跨线程Iterator共享一个。Iterator

于 2013-10-28T13:04:31.110 回答
1

我无法理解您要解决的问题是什么?

每个线程是否处理整个集合?

是否担心没有两个线程可以同时在同一 Pair 上工作?但是每个线程都需要处理集合中的每个Pair?

或者您是否希望使用所有线程处理一次集合?

于 2013-10-25T03:20:13.177 回答
0

在您的示例中,有一个关键的事情是模糊的 - 这到底是什么意思?

元素的顺序无关紧要,线程可以在不同的时间获取相同的元素。

“不同时间”是什么意思?在 N 毫秒之内?这是否意味着绝对两个线程永远不会同时接触同一对?我会假设。

如果你想降低线程相互阻塞竞争同一个 Pair 的概率,并且有一个 Pair 的后备数组,试试这个:

  • 将您的数组划分为numPairs / threadCount子数组(您不必实际创建子数组,只需从不同的偏移量开始 - 但将其视为子数组更容易)
  • 将每个线程分配给不同的子数组;当线程耗尽其子数组时,增加其子数组的索引
    • 假设我们有 6 对和 2 个线程 - 您的分配看起来像 Thread-1:[0,1,2] Thread-2:[3,4,5]。当 Thread-1 启动时,它将查看与线程 2 不同的 Pairs 集合,因此它们不太可能争夺同一 Pairs
  • 如果两个线程实际上不同时接触 Pair 很重要,则将所有接触 Pair 对象的代码包装在中(在实例synchronized(pair)上同步,而不是类型!) - 有时可能会阻塞,但你'永远不要在单个事物上阻塞所有线程,就像线程只能相互阻塞,因为它们真的试图接触同一个对象AtomicInteger

请注意,这并不能保证永远不会阻塞 - 为此,所有线程都必须以完全相同的速度运行,并且处理每个 Pair 对象必须花费完全相同的时间,并且操作系统的线程调度程序必须永远不会窃取来自一个线程而不是另一个线程的时间。你不能假设任何这些事情。通过划分工作区域并使共享的最小状态单元成为锁,这为您提供了更高的可能性,您将获得更好的并发性。

但这是在数据结构上获得更多并发性的常用模式 - 在线程之间划分数据,以便它们很少同时接触同一个锁。

于 2013-10-23T08:40:20.957 回答
0

最容易看到的是创建哈希集或映射,并为每个线程提供唯一的哈希。之后,只需通过此哈希码进行简单获取。

于 2013-10-23T13:33:42.687 回答
0

这是标准的 java 信号量使用问题。以下 javadoc 给出了与您的问题几乎相似的示例。http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html

如果您需要更多帮助,请告诉我?

于 2013-10-25T17:21:52.003 回答
0

我更喜欢锁定和释放过程。

如果一个线程正在请求一个 pair 对象,则该 Pair 对象将从供应商处移除。在线程要求新对之前,“旧”对再次添加到供应商。

您可以从前面推动并放在最后。

于 2013-10-28T14:39:08.773 回答