1

我有一个场景,其中将有一个包含网站的列表和用于抓取这些网站的代码块。是否可以实现多线程方式,以便每个线程将从列表中获取 5 个或更多网站并独立抓取,并确保它们不会获取由另一个线程收集的同一个网站。

List <String> websiteList;

//crawling code block here
4

5 回答 5

3

例如,您可以使用BlockingQueue所有感兴趣的消费者都可以共享的 a (注意,为清楚起见,跳过了错误处理):

public static void main(String[] args) throws Exception {
    // for test purposes add 10 integers
    final BlockingQueue<Integer> queue = new LinkedBlockingDeque<Integer>();
    for (int i = 0; i < 10; i++) {
        queue.add(i);    // 
    }

    new Thread(new MyRunnable(queue)).start();
    new Thread(new MyRunnable(queue)).start();
    new Thread(new MyRunnable(queue)).start();

}

static class MyRunnable implements Runnable {
    private Queue<Integer> queue;

    MyRunnable(Queue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while(!queue.isEmpty()) {
            Integer data = queue.poll();
            if(data != null) {
                System.out.println(Thread.currentThread().getName() + ": " + data);
            }
        }
    }
}

Queue为空时,Threads将退出并且程序将结束。

于 2013-04-23T10:02:38.353 回答
3

正如其他答案中提到的那样,有这样的要求,您最初应该考虑将您的网站保持在java.util.concurrent包中的 Java 并发抽象数据类型之一中,而不是在标准列表中。BlockingQueue 的drainTo方法听起来正是您正在寻找的,因为您希望线程能够一次获取一堆站点。

于 2013-04-23T10:07:22.450 回答
1

您可以使用LinkedBlockingQueue,将所有 websiteList 放入此队列,并在每个线程之间共享此队列。现在所有线程都将在这个队列上轮询,这是一个阻塞操作,确保队列中的一个元素仅由一个线程获取。

就像是:

String site;
while((site=queue.poll(timeout, TimeUnit.SECONDS))!=null)
{
//process site
}
于 2013-04-23T09:59:26.743 回答
0

你可以试试 DoubleBufferedList。这允许您从多个线程将列表和条目添加到列表中,并使用多个线程以完全无锁的方式从中获取列表。

public class DoubleBufferedList<T> {
  // Atomic reference so I can atomically swap it through.
  // Mark = true means I am adding to it so momentarily unavailable for iteration.
  private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

  // Factory method to create a new list - may be best to abstract this.
  protected List<T> newList() {
    return new ArrayList<>();
  }

  // Get and replace the current list.
  public List<T> get() {
    // Atomically grab and replace the list with an empty one.
    List<T> empty = newList();
    List<T> it;
    // Replace an unmarked list with an empty one.
    if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
      // Failed to replace! 
      // It is probably marked as being appended to but may have been replaced by another thread.
      // Return empty and come back again soon.
      return Collections.<T>emptyList();
    }
    // Successfull replaced an unmarked list with an empty list!
    return it;
  }

  // Grab and lock the list in preparation for append.
  private List<T> grab() {
    List<T> it;
    // We cannot fail so spin on get and mark.
    while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
      // Spin on mark - waiting for another grabber to release (which it must).
    }
    return it;
  }

  // Release the list.
  private void release(List<T> it) {
    // Unmark it - should this be a compareAndSet(it, it, true, false)?
    if (!list.attemptMark(it, false)) {
      // Should never fail because once marked it will not be replaced.
      throw new IllegalMonitorStateException("It changed while we were adding to it!");
    }
  }

  // Add an entry to the list.
  public void add(T entry) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entry.
      it.add(entry);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add many entries to the list.
  public void add(List<T> entries) {
    List<T> it = grab();
    try {
      // Successfully marked! Add my new entries.
      it.addAll(entries);
    } finally {
      // Always release after a grab.
      release(it);
    }
  }

  // Add a number of entries.
  @SafeVarargs
  public final void add(T... entries) {
    // Make a list of them.
    add(Arrays.<T>asList(entries));
  }
}
于 2013-04-23T10:01:03.177 回答
0

我建议以下三种解决方案之一:

把事情简单化

synchronized(list) {
    // get and remove 5 websites from the list
}

如果您可以更改列表类型,则可以使用

BlockingQueue

如果您无法更改列表类型,您可以使用

Collections.synchronizedList(list)
于 2013-04-23T10:01:51.433 回答