1

考虑以下方法:

public void add(final List<ReportingSTG> message) {
        if(stopRequested.get()) {
            synchronized (this) {
                if(stopRequested.get()) {
                    retryQueue.put(message);
                }
            }
        }
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
            synchronized (this) {
                if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                    final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                    messages.clear();
                    if(processors.size()>=numOfProcessors) {
                        waitingThreads.incrementAndGet();
                        waitForProcessor();
                        waitingThreads.decrementAndGet();
                    }                   
                    startProcessor(clone);
                }
            }

        }
    }

特别是这两条线:

 1:   final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
 2:   messages.clear();

如果线程A进入同步块并获取当前对象的锁,这是否意味着该对象的实例属性状态不能被同步块之外的其他线程更改(而线程A在同步块中)?

例如,线程 A 执行第 1 行 -> 线程 B 进入方法并添加新的列表条目 (messages.add(message)) -> 线程 a 执行的第 2 行 -> 删除线程 B 添加的条目(连同其他条目)。这种情况可能吗?或者线程 B 将在线程 A 释放锁时等待,然后才会删除 List 条目

消息是一个非静态同步列表

UPD:更新的方法,可能的解决方案:

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    while (addLock.get()){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {}
    }

    messages.add(message);

    if(messages.size() >= batchSize && waitingThreads.get() == 0) {
        synchronized (this) {
            if(messages.size() >= batchSize && waitingThreads.get() == 0) {

                addLock.set(true);
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                addLock.set(false);

                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
        }

    }
}

addLock - AtomicBoolean,默认为 false

4

2 回答 2

3

所描述的场景是可能的。即您可能会丢失消息。

synchronized关键字确保您永远不会有 2 个线程同时运行该部分synchronized。它不会阻止另一个线程修改在块内操作的对象synchronized(只要这个另一个线程可以访问它们)。


这是一个可能的解决方案,因为它同步了添加和清除。

private Object lock = new Object();

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    synchronized(lock){
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
    }
}
于 2013-02-28T11:58:36.680 回答
1

DoubleBufferedList最近安排了一堂课。也许使用它可以完全避免您的问题。顾名思义,它实现了双缓冲算法,但用于列表。

此类允许您拥有许多生产者线程和许多消费者线程。每个生产者线程都可以添加到当前列表中。每个消费者线程获取整个当前列表进行处理。

这也没有使用锁,只使用原子,因此它应该高效运行。

请注意,其中大部分是测试代码。您可以在评论后删除所有内容,// TESTING但您可能会发现测试的严谨性令人欣慰。

public class DoubleBufferedList<T> {
  // Atomic reference so I can atomically swap it through.
  // Mark = true means I am adding to it so unavailable for iteration.
  private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

  // Factory method to create a new list - may be best to abstract this.
  protected List<T> newList() {
    return new ArrayList<>();
  }

  // Get and replace the current list.
  public List<T> get() {
    // Atomically grab and replace the list with an empty one.
    List<T> empty = newList();
    List<T> it;
    // Replace an unmarked list with an empty one.
    if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
      // Failed to replace! 
      // It is probably marked as being appended to but may have been replaced by another thread.
      // Return empty and come back again soon.
      return Collections.EMPTY_LIST;
    }
    // Successfull replaced an unmarked list with an empty list!
    return it;
  }

  // Grab and lock the list in preparation for append.
  private List<T> grab() {
    List<T> it;
    // We cannot fail so spin on get and mark.
    while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
      // Spin on mark.
    }
    return it;
  }

  // Release the list.
  private void release(List<T> it) {
    // Unmark it. Should never fail because once marked it will not be replaced.
    if (!list.attemptMark(it, false)) {
      throw new IllegalMonitorStateException("it changed while we were adding to it!");
    }
  }

  // Add an entry to the list.
  public void add(T entry) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entry.
      it.add(entry);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add many entries to the list.
  public void add(List<T> entries) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entries.
      it.addAll(entries);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add a number of entries.
  public void add(T... entries) {
    // Make a list of them.
    add(Arrays.asList(entries));
  }
  // TESTING.
  // How many testers to run.
  static final int N = 10;
  // The next one we're waiting for.
  static final AtomicInteger[] seen = new AtomicInteger[N];
  // The ones that arrived out of order.
  static final Set<Widget>[] queued = new ConcurrentSkipListSet[N];

  static {
    // Populate the arrays.
    for (int i = 0; i < N; i++) {
      seen[i] = new AtomicInteger();
      queued[i] = new ConcurrentSkipListSet();
    }
  }

  // Thing that is produced and consumed.
  private static class Widget implements Comparable<Widget> {
    // Who produced it.
    public final int producer;
    // Its sequence number.
    public final int sequence;

    public Widget(int producer, int sequence) {
      this.producer = producer;
      this.sequence = sequence;
    }

    @Override
    public String toString() {
      return producer + "\t" + sequence;
    }

    @Override
    public int compareTo(Widget o) {
      // Sort on producer
      int diff = Integer.compare(producer, o.producer);
      if (diff == 0) {
        // And then sequence
        diff = Integer.compare(sequence, o.sequence);
      }
      return diff;
    }
  }

  // Produces Widgets and feeds them to the supplied DoubleBufferedList.
  private static class TestProducer implements Runnable {
    // The list to feed.
    final DoubleBufferedList<Widget> list;
    // My ID
    final int id;
    // The sequence we're at
    int sequence = 0;
    // Set this at true to stop me.
    public volatile boolean stop = false;

    public TestProducer(DoubleBufferedList<Widget> list, int id) {
      this.list = list;
      this.id = id;
    }

    @Override
    public void run() {
      // Just pump the list.
      while (!stop) {
        list.add(new Widget(id, sequence++));
      }
    }
  }

  // Consumes Widgets from the suplied DoubleBufferedList
  private static class TestConsumer implements Runnable {
    // The list to bleed.
    final DoubleBufferedList<Widget> list;
    // My ID
    final int id;
    // Set this at true to stop me.
    public volatile boolean stop = false;

    public TestConsumer(DoubleBufferedList<Widget> list, int id) {
      this.list = list;
      this.id = id;
    }

    @Override
    public void run() {
      // The list I am working on.
      List<Widget> l = list.get();
      // Stop when stop == true && list is empty
      while (!(stop && l.isEmpty())) {
        // Record all items in list as arrived.
        arrived(l);
        // Grab another list.
        l = list.get();
      }
    }

    private void arrived(List<Widget> l) {
      for (Widget w : l) {
        // Mark each one as arrived.
        arrived(w);
      }
    }

    // A Widget has arrived.
    private static void arrived(Widget w) {
      // Which one is it?
      AtomicInteger n = seen[w.producer];
      // Don't allow multi-access to the same producer data or we'll end up confused.
      synchronized (n) {
        // Is it the next to be seen?
        if (n.compareAndSet(w.sequence, w.sequence + 1)) {
          // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
          for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
            Widget it = i.next();
            // Is it in sequence?
            if (n.compareAndSet(it.sequence, it.sequence + 1)) {
              // Done with that one too now!
              i.remove();
            } else {
              // Found a gap! Stop now.
              break;
            }
          }
        } else {
          // Out of sequence - Queue it.
          queued[w.producer].add(w);
        }
      }
    }
  }

  // Main tester
  public static void main(String args[]) {
    try {
      System.out.println("DoubleBufferedList:Test");
      // Create my test buffer.
      DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
      // All threads running - Producers then Consumers.
      List<Thread> running = new LinkedList<>();
      // Start some producer tests.
      List<TestProducer> producers = new ArrayList<>();
      for (int i = 0; i < N; i++) {
        TestProducer producer = new TestProducer(list, i);
        Thread t = new Thread(producer);
        t.setName("Producer " + i);
        t.start();
        producers.add(producer);
        running.add(t);
      }

      // Start the same number of consumers.
      List<TestConsumer> consumers = new ArrayList<>();
      for (int i = 0; i < N; i++) {
        TestConsumer consumer = new TestConsumer(list, i);
        Thread t = new Thread(consumer);
        t.setName("Consumer " + i);
        t.start();
        consumers.add(consumer);
        running.add(t);
      }
      // Wait for a while.
      Thread.sleep(5000);
      // Close down all.
      for (TestProducer p : producers) {
        p.stop = true;
      }
      for (TestConsumer c : consumers) {
        c.stop = true;
      }
      // Wait for all to stop.
      for (Thread t : running) {
        System.out.println("Joining " + t.getName());
        t.join();
      }
      // What results did we get?
      for (int i = 0; i < N; i++) {
        // How far did the producer get?
        int gotTo = producers.get(i).sequence;
        // The consumer's state
        int seenTo = seen[i].get();
        Set<Widget> queue = queued[i];
        if (seenTo == gotTo && queue.isEmpty()) {
          System.out.println("Producer " + i + " ok.");
        } else {
          // Different set consumed as produced!
          System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
        }
      }
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    }
  }
}
于 2013-02-28T12:14:22.317 回答