1

假设我有一个来自 Executors 静态工厂方法之一的 ExecutorService 实例。

如果我提交了一个 Callable,其中 RetVal 不是来自某个线程的线程安全、本地实例化的对象,那么当我从同一个线程获取()它时,我是否需要担心 RetVals 的完整性?人们说局部变量是线程安全的,但我不确定当您返回本地实例化的对象并从其他线程接收它时它是否适用。

这是一个类似于我的情况的示例:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
    List<String> ret = new ArrayList<>();
    ret.add("aasdf");
    ret.add("dfls");
    return ret;
});

List<String> myList = fut.get();

在上面的示例中,我正在检索一个在不同线程中创建的 ArrayList——一个由执行程序创建的线程。我不认为上面的代码是线程安全的,但我无法找到关于我的具体情况的很多信息。

现在我在我的计算机上尝试了上面的代码,它实际上在我尝试它的时间里 100% 返回了预期的结果,我什至尝试了我自己的 ExecutorService 实现,到目前为止我只得到了预期的结果。因此,除非我非常幸运,否则我很确定它可以工作,但我不确定如何。我在另一个线程中创建了一个非线程安全的对象并在另一个线程中接收它;我不应该有机会收到一个部分构造的对象——在我的例子中是一个不包含 2 个字符串的列表吗?

下面是我为测试而制作的自定义实现。您可以忽略 EType 枚举。

class MyExecutor {

    enum EType {
        NoHolder, Holder1, Holder2
    }

    private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
    private final Thread thread;

    private final EType eType;

    public MyExecutor(EType eType) {
        eType = Objects.requireNonNull(eType);

        tasksQ = new ConcurrentLinkedQueue<>();
        thread = new Thread(new MyRunnable());
        thread.start();
    }

    public <T> Future<T> submit(Callable<T> c) {
        MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
        tasksQ.add(task);
        return task;
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (tasksQ.isEmpty()) {
                    try {
                        Thread.sleep(1);
                        continue;
                    } catch (InterruptedException ite) {
                        Thread.interrupted();
                        break;
                    }
                }

                MyFutureTask<?> task = tasksQ.poll();
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class MyFutureTask<T> implements RunnableFuture<T> {

        final Callable<?> cb;
        volatile Object outcome;

        static final int STATE_PENDING = 1;
        static final int STATE_EXECUTING = 2;
        static final int STATE_DONE = 3;

        final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);

        final EType eType;

        public MyFutureTask(Callable<?> cb, EType eType) {
            cb = Objects.requireNonNull(cb);
            eType = Objects.requireNonNull(eType);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new NotImplementedException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return atomicState.get() == STATE_DONE;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (true) {
                switch (atomicState.get()) {
                case STATE_PENDING:
                case STATE_EXECUTING:
//                      Thread.sleep(1);
                    break;
                case STATE_DONE:
                    return (T)outcome;
                default:
                    throw new IllegalStateException();
                }
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            throw new NotImplementedException();
        }

        void set(T t) {
            outcome = t;
        }

        @Override
        public void run() {
            if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
                Object result;
                try {
                    switch (eType) {
                    case NoHolder:
                        result = cb.call();
                        break;
                    case Holder1:
                        throw new NotImplementedException();
                    case Holder2:
                        throw new NotImplementedException();
                    default:
                        throw new IllegalStateException();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    result = null;
                }

                outcome = result;
                atomicState.set(STATE_DONE);
            }
        }
    }
}

class MyTask implements Callable<List<Integer>> {
    @Override
    public List<Integer> call() throws Exception {
        List<Integer> ret = new ArrayList<>(100);
        IntStream.range(0, 100).boxed().forEach(ret::add);
        return ret;
    }
}
4

2 回答 2

4

重要的是发生之前的关系。来自ExecutorServiceAPI 文档:

内存一致性效果:线程中的操作在提交RunnableCallable任务ExecutorService 之前发生在该任务采取的任何操作之前,这反过来又 发生在通过检索结果之前Future.get()

因此,您可以安全地传输这样的可变对象。该ExecutorService实现通过某种形式的安全发布传输对象。

显然,返回后不要更新原始线程中的对象。

如果您要通过存储在共享的非字段中来在线程之间进行通信volatile,那么这将是不安全的。

于 2020-01-08T14:25:39.473 回答
2

当多个线程尝试同时访问和修改同一状态时,线程安全成为一个问题。

请注意,在任务完成之前,您不会从 a 中获取实际结果Future(即Future#get在任务完成之前不会返回)。

在您的第一个示例中,线程安全不是问题,因为新对象(虽然是可变的)是由一个线程(由 Executor 创建的线程)创建的,并在Future该线程完成处理任务后从对象中检索。一旦调用线程获得对象,它就不能被任何其他线程修改,因为创建线程不再有权访问列表。

于 2020-01-08T14:30:36.363 回答