3

我正在使用 SpringBoot 和 Web3J 开发一个应用程序,其中我使用合约包装器与智能合约交互。这是用于监听事件的方法的自动生成代码,称为 NewId:

public Observable<NewIdEventResponse> newIdEventObservable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
    final Event event = new Event("NewId", 
            Arrays.<TypeReference<?>>asList(),
            Arrays.<TypeReference<?>>asList(new TypeReference<Bytes32>() {}, new TypeReference<Address>() {}));
    EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
    filter.addSingleTopic(EventEncoder.encode(event));
    return web3j.ethLogObservable(filter).map(new Func1<Log, NewIdEventResponse>() {
        @Override
        public NewIdEventResponse call(Log log) {
            EventValues eventValues = extractEventParameters(event, log);
            NewIdEventResponse typedResponse = new NewIdEventResponse();
            typedResponse.key = (Bytes32) eventValues.getNonIndexedValues().get(0);
            typedResponse.contractId = (Address) eventValues.getNonIndexedValues().get(1);
            return typedResponse;
        }
    });
}

我已经为 observable 创建了一个订阅 s,我在其中打印正在执行的线程名称并增加一个计数器,初始化为 0。然后 "mian thread" (spring boot thread) 在订阅后休眠 5 秒,然后打印计数器值并调用 s.unsubscribe 。这是代码:

//Invoke transactional contract method...
                this.counter = 0;
                CountDownLatch latch = new CountDownLatch(1);
                log.warn("Counter value before subscription: "+counter);
                Subscription s = contractWrapper.newIdEventObservable(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
                        .subscribe(evento ->{
                            log.warn("Event Received");
                            log.warn("Thread Name "+Thread.currentThread().getName());
                            latch.countDown();
                            this.testIncCounter();
                            },                              
                        Throwable::printStackTrace);

                latch.await();
                log.warn("Main Thread going to sleep for 5 seconds");
                Thread.sleep(5000);
                log.warn("Unsubscribing...");
                s.unsubscribe();
                log.warn("Counter value "+counter);

testIncCounter 方法是同步的:

private synchronized void testIncCounter(){
        counter++;
    }

问题是订阅代码被不同的线程执行了两次,从日志中可以看出,计数器的最终值为 2,应该是 1。这是输出:

2017-11-07 20:56:17.345  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Counter value before subscription: 0
2017-11-07 20:56:17.433  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Event Received
2017-11-07 20:56:17.433  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Thread Name http-nio-8090-exec-4
2017-11-07 20:56:17.449  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Main Thread going to sleep for 5 seconds
2017-11-07 20:56:17.491  WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl    : Event Received
2017-11-07 20:56:17.491  WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl    : Thread Name pool-17-thread-1
2017-11-07 20:56:22.450  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Unsubscribing...
2017-11-07 20:56:22.459  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Counter value 2

也许这是一个错误,还是我做错了什么?非常感谢您提前。

4

0 回答 0