0

我想创建一个CompletableFuture链,将其作为字节码/参数序列化到磁盘,并在将来的某个时候再次运行它。例如:

CompletableFuture<String> chain = new CompletableFuture<String>();

CompletableFuture<String> pos = chain;
for (int ii = 0; ii < 4; ++ii) {
    final int val = ii;
    pos = pos.thenApply( (s)->{ return s+" plus"+val;} );
}
pos = pos.thenApply( (s)->{ return "The final answer is "+s;} );

saveDisk( "chaindata.dat", chain, pos );
/////
// at some time in the future, perhaps in another program
Pair<CompletableFuture<String>,CompletableFuture<String>> newChain = loadFrom( "chaindata.dat" );
newChain.getA().complete("Hello");
String result = newChain.getB().get(); // should get hello with all post processing
// result==The final answer is Hello plus0 plus1 plus2 plus3

当然,这个棘手的事情是,即使字节码存储在当前的类文件中(可以用 检索Class.getResource()),我也需要在链中设置的最终变量也被持久化。
(参见上面的 for 循环final int var = ii;

我未来的目标实际上是在具有大多数相同类文件(但不是创建链的类文件)的从属进程上运行链我会编写字节码/数据,将其重新加载到从属进程上,然后运行。

如前所述,我相信在实例化时在循环中创建的最终变量是其中的棘手部分。IE。 pos = pos.thenApply( (s)->{ return s+" plus"+val;} );

有没有人建议如何做到这一点?

4

1 回答 1

2

首先是来自Lambda 表达式教程的警告:

如果lambda 表达式的目标类型及其捕获的参数是可序列化的,则可以序列化它。但是,与内部类一样,强烈建议不要对 lambda 表达式进行序列化

(重点是我的)

这个要求的问题CompletableFuture是不是Serializable,并且扩展它需要你复制很多代码CompletableFuture——因为你需要重写很多方法(比如thenApply())来让它们返回你的新的可序列化类型。

但是,可以做的是序列化您想要执行的操作链。由于它基本上是将 aCompletableFuture转换为另一个的函数,因此最简单的方法是定义一个SerializableFunction类型:

private interface SerializableFunction<T, R> extends Serializable {
    R apply(T t);

    default <V> SerializableFunction<T, V> andThen(SerializableFunction<? super R, ? extends V> after) {
        Objects.requireNonNull(after);
        return (T t) -> after.apply(apply(t));
    }
}

注意:我明确选择不扩展Function,因为andThen()参数不同(因此它不会被覆盖),并且您需要andThen()参数是可序列化的,以便返回的 lambda 也是可序列化的(根据上面的警告) .

CompletableFuture然后,您可以通过将所有调用包装在 lambda 表达式中来创建您的链:

private static SerializableFunction<CompletableFuture<String>, CompletableFuture<String>> createChain() {
    SerializableFunction<CompletableFuture<String>, CompletableFuture<String>> result = c -> c;
    for (int ii = 0; ii < 4; ++ii) {
        final int val = ii;
        result = result.andThen(pos -> pos.thenApply(s -> s + " plus" + val));
    }
    return result.andThen(pos -> pos.thenApply(s -> "The final answer is " + s));
}

并根据需要对其进行序列化:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(createChain());
oos.close();

ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
SerializableFunction<CompletableFuture<String>, CompletableFuture<String>> chain =
        (SerializableFunction<CompletableFuture<String>, CompletableFuture<String>>) ois.readObject();

CompletableFuture<String> future = new CompletableFuture<>();
CompletableFuture<String> newChain = chain.apply(future);
newChain.thenAccept(System.out::println);
future.complete("Hello");

输出:

最终答案是你好加0加1加2加3

请注意,为了使其正常工作,捕获的变量确实已序列化(由 处理SerilizedLambda),但反序列化的 JVM 仍需要具有兼容版本的 lambda 代码本身。

考虑到这一点,最好以专用结构表示您的操作链,并处理该结构以远程重建链。

于 2018-08-20T12:39:26.710 回答