1

我正在尝试使用 java gateway sdk 将异步事务发送到我的 Fabric 网络,但我收到错误消息Channel [CHANNEL NAME] has been shutdown

这里有一些示例代码:

    Gateway.Builder builder = Gateway.createBuilder()
              .discovery(true)
              .identity(wallet, user.getName())
              .networkConfig([PATH TO CONNECTION PROFILE]);

    try(Gateway gateway = builder.connect()) {
        Network channel = gateway.getNetwork(CHANNEL_NAME);
        Contract someChaincode = channel.getContract(CHAINCODE_NAME);

        int coresNumber = (Runtime.getRuntime().availableProcessors());

        ExecutorService executor = Executors.newFixedThreadPool(coresNumber);

        for(String elemt : elements) {                                                                          
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try{
                    //Exception thrown here
                    byte[] res = someChaincode.submitTransaction("someFunction", elemt);
                    return new String(res);     
                } catch (ContractException e) {
                    e.printStackTrace();
                }
            }, executor);
        }
    } catch (Exception e) {
       // Handle Exception
    }

这里有一个例外:

java.util.concurrent.ExecutionException: org.hyperledger.fabric.gateway.GatewayRuntimeException: org.hyperledger.fabric.sdk.exception.InvalidArgumentException: Channel [CHANNEL NAME] has been shutdown.

准确地说,异常是在方法中抛出的checkChannelState()。我有一种感觉,我没有正确处理多线程。

4

1 回答 1

1

您似乎不会等待您在代码片段中创建的期货的完成。因此,您正在安排事务调用以在不同的线程上执行,但是在执行此代码之前,退出 try-with-resources 块,该块会关闭您用于连接的网关实例。关闭网关会导致所有相关的资源和连接都关闭,包括底层的 Channel。因此,当您的事务调用实际运行时,您已经关闭了它们执行所需的连接和资源。

您需要在关闭网关实例之前从您创建的 Future 对象中获取结果;换句话说,在退出创建网关的 try-with-resources 块之前。有点像这样:

Collection<Callable<String>> tasks = elements.stream()
    .map(element -> new Callable<String>() {
        public String call() throws ContractException, TimeoutException, InterruptedException {
            byte[] result = contract.submitTransaction("someFunction", element);
            return new String(result);
        }
    }).collect(Collectors.toList());

try {
    Collection<String> results = new ArrayList<>();

    Collection<Future<String>> futures = executor.invokeAll(tasks, timeout, timeUnit);
    for (Future<String> future : futures) {
        try {
            String result = future.get(timeout, timeUnit);
            results.add(result);
        } catch (CancellationException | InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    System.out.println("Results: " + results);
} catch (InterruptedException e ) {
    e.printStackTrace();
}
于 2019-11-19T14:51:40.237 回答