-1

是的,这是关于 Java 生产者/消费者的另一个问题。

我的变体是我有 N 个由该produceDataRows方法启动的生产者和 M 个由该consumeDataRows方法启动的消费者。两种方法都启动自己的ThreadPoolExecutor类实例,提交各自数量的生产者/消费者任务,然后等待它们的执行者完成。

所以,这是我的代码:

final BlockingQueue<Row> allRows = new LinkedBlockingQueue<Row>();
ExecutorService exec = Executors.newFixedThreadPool(2);
FutureTask<Object> producer = new FutureTask<Object>(new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    produceDataRows(allRows);
    return null;
  }
});
FutureTask<Object> consumer = new FutureTask<Object>(new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    consumeDataRows(allRows);
    return null;
  }
});
exec.execute(producer);
exec.execute(consumer);
producer.get();
consumer.get();

问题是consumer.get()返回,但从consumeDataRows未被调用。,produceDataRows另一方面被称为。

我错过了什么?

谢谢。

编辑 1

根据Gray的回复,我将代码改写如下:

ExecutorService exec = Executors.newFixedThreadPool(2);
Callable<Object> producer = new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    produceDataRows(allRows);
    return null;
  }
};
Callable<Object> consumer = new Callable<Object>() {
  @Override
  public Object call() throws Exception {
    consumeDataRows(allRows);
    return null;
  }
};
exec.submit(producer);
exec.submit(consumer);
exec.shutdown();
exec.awaitTermination(10, TimeUnit.DAYS);

同样的效果 - 代码终止,但从consumeDataRows未被调用。我在这里有三个不同的 ThreadPoolExecutor 实例 - 一个 in produceDataRows,一个 inconsumeDataRows和最后一个。

谢谢。

编辑 2

我的produceDataRows方法有问题,因为如果我将其注释掉,那么执行会访问call两个可调用对象的方法。现在想弄清楚。

4

2 回答 2

3

编辑:

我认为您在这里有比赛条件。由于生产者和消费者都在同时运行我敢打赌消费者去看看有什么行并且什么都allRows没有,因为生产者还没有做任何事情。

您将需要提前运行生产者或让消费者使用allRows.poll(long timeout, TimeUnit unit)来等待生产者的结果。也许生产者可以在volatile boolean producedAll = true它产生所有行并完成后设置 a ,并让消费者循环.poll(...)等待 为producedAll真。


你的课程有点混乱。你应该调用exec.submit(...)你的Callable 实例化和执行一个FutureTask. submit(...)然后返回一个Future你可以调用get()的。

你现在正在做的是实例化你自己的FutureTask,这是由内部完成的ExecutorService——从不是由调用者完成的。然后,当您调用get()您的FutureTask时,它只是立即返回,而不做任何加入或任何事情。它(不幸的是)有效,因为FutureTask它是一个Runnable.

您的代码应该执行以下操作:

Future<Object> producer = exec.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
       produceDataRows(allRows);
       return null;
    }
});
Future<Object> consumer = exec.submit(new Callable<Object>() {
    ...
});
producer.get();
consumer.get();

顺便说一句,Callable<Void>如果我不关心 a 的回报,通常我会使用,Callable因为我只想要Exception. 我仍然返回null,但当Void你想要一个无值对象时很有用。

于 2012-08-07T16:14:43.410 回答
-1

看来您是从一个main方法内部运行它,而thread在其他两个线程完成工作之前,运行 main 的方法就死了。

我会建议joining你的线程给其他线程,以便主线程等待其他线程完成。

于 2012-08-07T16:33:18.107 回答