1

发现会CollectSignaturesFlow产生,UnsupportedOperationException因为在某些时候 Krio 无法序列化流中使用的集合之一。其中使用的 p2p CollectSignatureFlow 工作正常,没有任何问题。我还以更简单的方式实现了收集签名(是的,没有那么多检查,但它只是为了测试目的),它完成了这项工作。

编码:

   @InitiatingFlow
    @StartableByRPC
    class CheckoutClientFlow(
            private val person: Person = ClientAccountFlows.defaultPerson,
            private val partiesAndAmounts: Map<CordaX500Name, Long>
    ) : FlowLogic<SignedTransaction>() {
        override val progressTracker: ProgressTracker = tracker()
        private var selfIncluded = false
        @Suspendable
        override fun call(): SignedTransaction {
            progressTracker.currentStep = NotaryInfo
            val notary: Party = getPreferredNotary(serviceHub)
            progressTracker.currentStep = LocateOtherParties
            val parties = locateParties(partiesAndAmounts.keys).toMutableSet()
            if (parties.contains(ourIdentity)) {
                selfIncluded = true
                parties.remove(ourIdentity)
            }
            val inputs = subFlow(GatherStateAndRefFlow(person, parties))
            val otherPartiesSessions = parties.map { initiateFlow(it) }
            progressTracker.currentStep = CreateTransaction
            val inputsWithParties = inputs.mapKeys { entry -> locateParty(entry.key) }
            val txBuilder = createTxBuilder(notary, inputsWithParties)
            var onceSignedTransaction = serviceHub.signInitialTransaction(txBuilder)
            progressTracker.currentStep = SigningByOtherParty
            val signedTx = subFlow(CollectSignaturesFlow(onceSignedTransaction, otherPartiesSessions))
//            otherPartiesSessions.forEach {
//                onceSignedTransaction += subFlow(CollectSignatureFlow(onceSignedTransaction, it))
//            }
            progressTracker.currentStep = Finalize
            return subFlow(FinalityFlow(transaction = onceSignedTransaction, sessions = otherPartiesSessions))
        }
    }
}

考试:

@Test
    fun `subtract if both parties are present other`() {
        val person = defaultPerson
        val partiesAndAmounts = mapOf(Pair(aNodeName, defaultMovableAmount), Pair(bNodeName, defaultMovableAmount))
        val resultFuture = a.startFlow(ExchangeValueMultipleParties.CheckoutClientFlow(
                person = defaultPerson,
                partiesAndAmounts = partiesAndAmounts
        ))
        network.runNetwork()
        val res1 = resultFuture.get()
        assertNotNull(res1)
        assertNotEquals(0, res1.inputs.size)
        assertEquals(partiesAndAmounts.size, res1.tx.outputs.size)
        assertEquals(partiesAndAmounts.size, res1.sigs.size)
        val stateA = a.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
        assertNotNull(stateA)
//        assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateA.state.data.balance))
        val stateB = b.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
        assertNotNull(stateB)
//        assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateB.state.data.balance))
    }

堆栈跟踪:

[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
    at java.util.AbstractCollection.add(AbstractCollection.java:262)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
    at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
    at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
    at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
    at java.util.AbstractCollection.add(AbstractCollection.java:262)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]

似乎 Kryo 正在尝试调用 AbstractCollection 不支持的方法 add() 来重建流中使用的一些集合,认为该方法已实现。

这是 AbstractCollection 中的实现。

    public boolean add(E var1) {
        throw new UnsupportedOperationException();
    }

此外,当运行调试时,可能会观察到线程饥饿:

[WARN] 19:55:47,111 [HikariPool-3 housekeeper] pool.HikariPool. - HikariPool-3 - Thread starvation or clock leap detected (housekeeper delta=1m35s902ms765µs600ns).
[WARN] 19:55:47,111 [HikariPool-1 housekeeper] pool.HikariPool. - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m49s546ms362µs501ns).
[WARN] 19:55:47,113 [HikariPool-2 housekeeper] pool.HikariPool. - HikariPool-2 - Thread starvation or clock leap detected (housekeeper delta=1m37s789ms188µs200ns).
[WARN] 19:55:47,114 [HikariPool-4 housekeeper] pool.HikariPool. - HikariPool-4 - Thread starvation or clock leap detected (housekeeper delta=1m33s803ms330µs901ns).
4

0 回答 0