我已经对我的批处理进行了以下配置。
For batch Job --> max-failed-records=”-1” - 这表示该实例处理所有记录,不管没有。的记录失败。
1. Batch_Step: AcceptPolicy= “All”
So this being first batch step will accept all the records and process them.
2. Batch_Step1: AcceptPolicy= “ONLY_FAILURES”
So this must accept only the records that got failed in the first batch Step.
**The first batchstep has some dataweave transformation.
The second batchstep as of know i am just logging and no transformation is present.**
Dataweave 中发生异常,当一些不正确的记录(错误数据)出现时,因此必须在第二个 Batchstep 中捕获此不正确的记录。
但我面临的问题是,第一批执行良好并显示失败记录的计数。
但是控件根本没有进入第二个批处理步骤,并且控制台中存在 PFB 异常。(仅当某些记录在第一个 BatchStep 中失败时才会发生这种情况)。对于积极的情况,批处理步骤工作正常。根据我的观察,日志控制台中的以下内容可能有用:无法在缓冲区“batch-stepping-queue-buffer”上排队 4 条记录,例如作业“failedrecordspocBatch”的“3b742880-98e1-11e5-9927-f0ea20524153”。记录将被标记为失败。处理块 id 是 '3d066141-98e1-11e5-9927-f0ea20524153' 。…… ..
The exception stack trace is
Cannot coerce a :null to a :string (com.mulesoft.weave.mule.exception.WeaveExecutionException)
com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler:201 (null)
--------------------------------------------------------------------------------
Root Exception stack trace:
com.mulesoft.weave.model.values.coercion.UnsupportedTypeCoercionException: Cannot coerce a :null to a :string
at com.mulesoft.weave.model.values.formatting.StringFormatTypeCoercionValue.evaluate(StringFormatTypeCoercionValue.scala:58)
at com.mulesoft.weave.model.values.formatting.StringFormatTypeCoercionValue.evaluate(StringFormatTypeCoercionValue.scala:14)
at com.mulesoft.weave.engine.ast.stringops.JoinOpNode$$anonfun$evaluate$1.apply(JoinOpNode.scala:14)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
ERROR 2015-12-02 16:11:27,635 [batch-job-failedrecordspocBatch-work-manager.02] com.mulesoft.module.batch.engine.buffer.TransactionalQueueBuffer: Could not queue 4 records for instance '3b742880-98e1-11e5-9927-f0ea20524153' of job 'failedrecordspocBatch' on buffer 'batch-stepping-queue-buffer'. Records will be marked as failed. Processing block id is '3d066141-98e1-11e5-9927-f0ea20524153'
org.mule.api.serialization.SerializationException: Could not serialize object
at org.mule.serialization.internal.AbstractObjectSerializer.serialize(AbstractObjectSerializer.java:68) ~[?:?]
at com.mulesoft.module.batch.engine.queue.AbstractBatchQueueDelegate.dispatch(AbstractBatchQueueDelegate.java:103) ~[?:?]
at com.mulesoft.module.batch.engine.queue.SteppingQueueDelegate.dispatch(SteppingQueueDelegate.java:52) ~[?:?]
at com.mulesoft.module.batch.engine.buffer.TransactionalQueueBuffer.writeToQueue(TransactionalQueueBuffer.java:142) ~[?:?]
at com.mulesoft.module.batch.engine.buffer.TransactionalQueueBuffer.doFlush(TransactionalQueueBuffer.java:127) ~[?:?]
at com.mulesoft.module.batch.engine.buffer.TransactionalQueueBuffer.add(TransactionalQueueBuffer.java:87) ~[?:?]
at com.mulesoft.module.batch.engine.DefaultBatchEngine.routeNext(DefaultBatchEngine.java:736) ~[?:?]
at com.mulesoft.module.batch.engine.DefaultBatchEngine.routeError(DefaultBatchEngine.java:593) ~[?:?]
at com.mulesoft.module.batch.engine.DefaultBatchEngine.updateStatisticsAndRoute(DefaultBatchEngine.java:817) ~[?:?]
at com.mulesoft.module.batch.engine.threading.BatchRecordWork.run(BatchRecordWork.java:92) ~[?:?]
at org.mule.work.WorkerContext.run(WorkerContext.java:286) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.7.0_72]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.7.0_72]
at java.lang.Thread.run(Unknown Source) [?:1.7.0_72]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.mule.module.launcher.MuleApplicationClassLoader)
classLoaders (org.mule.module.launcher.application.CompositeApplicationClassLoader)
executionClassLoader (org.mule.DefaultMuleContext)
context (com.mulesoft.weave.mule.function.FlowRefLookupFunctionValue)
_2 (scala.Tuple2)
kv (scala.collection.immutable.HashMap$HashMap1)
elems (scala.collection.immutable.HashMap$HashTrieMap)
variableContext (com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler)
payload (com.mulesoft.module.batch.record.Record)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:603) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.mulesoft.module.serialization.kryo.internal.KryoObjectSerializer.doSerialize(KryoObjectSerializer.java:105) ~[?:?]
at com.mulesoft.module.serialization.kryo.internal.KryoObjectSerializer.doSerialize(KryoObjectSerializer.java:97) ~[?:?]
at org.mule.serialization.internal.AbstractObjectSerializer.serialize(AbstractObjectSerializer.java:64) ~[?:?]
... 13 more
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Unknown Source) ~[?:1.7.0_72]
at java.util.Vector$Itr.next(Unknown Source) ~[?:1.7.0_72]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:92) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:603) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366) ~[?:?]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:549) ~[?:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[?:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:524) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[?:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:625) ~[?:?]
at com.mulesoft.module.serialization.kryo.internal.KryoObjectSerializer.doSerialize(KryoObjectSerializer.java:105) ~[?:?]
at com.mulesoft.module.serialization.kryo.internal.KryoObjectSerializer.doSerialize(KryoObjectSerializer.java:97) ~[?:?]
at org.mule.serialization.internal.AbstractObjectSerializer.serialize(AbstractObjectSerializer.java:64) ~[?:?]
... 13 more
INFO 2015-12-02 16:11:27,635 [batch-job-failedrecordspocBatch-work-manager.02] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance '3b742880-98e1-11e5-9927-f0ea20524153' of job 'failedrecordspocBatch'. Total Records processed: 4. Successful records: 0. Failed Records: 4
INFO 2015-12-02 16:11:27,745 [batch-job-failedrecordspocBatch-work-manager.02] com.mulesoft.module.batch.engine.DefaultBatchEngine:
****************************************************************************************************************************************************************
* - - + Exception Type + - - * - - + Step + - - * - - + Count + - - *
****************************************************************************************************************************************************************
* com.mulesoft.module.batch.exception.BatchException * Batch_Step * 1 *
* org.mule.api.serialization.SerializationException * Batch_Step1 * 4 *
****************************************************************************************************************************************************************
INFO 2015-12-02 16:11:27,823 [batch-job-failedrecordspocBatch-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance 3b742880-98e1-11e5-9927-f0ea20524153 of job failedrecordspocBatch
然而,从下面的链接我了解到很少有有效载荷不能被序列化!!! 这是链接: http ://bushorn.com/deeper-lock-mule-batch/
所以,我知道我需要实现我第一次听到的 Kryo Serializer。
那么,有人可以帮助我如何在我的批处理步骤流程中实现Kryo Serializer概念。
如果需要任何澄清,请发表评论。