1

我们正在使用 ThreadPoolTask​​Executor 来并行化消息处理程序的执行,但我们遇到了并发修改异常。

下面是抛出的异常:

2018-07-26 21:18:15.563 ERROR [dia-feed-sftp,a889a5984eb10d70,3a486897c53ed430,false] 7643 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@2f91c88]; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.util.ConcurrentModificationException, failedMessage=GenericMessage [payload=byte[3445], headers={auditId=e6af7733-455b-490b-8acd-319c57f3ef5e, file_remoteDirectory=/usr/appl/dia/data/wrk/pcf_test_dev/e2e_debug_1/, X-B3-ParentSpanId=b31063e9685ce603, scst_partition=0, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@2f64d8bc, file_remoteFile=BROBRG.D203, spanTraceId=a889a5984eb10d70, spanId=928797df743c4c03, spanParentSpanId=b31063e9685ce603, file_remoteFileInfo={"directory":false,"filename":"BROBRG.D203","link":false,"modified":1532374864000,"permissions":"-rwxr-x---","remoteDirectory":"/usr/appl/dia/data/wrk/pcf_test_dev/e2e_debug_1/","size":20136538}, nativeHeaders={X-B3-ParentSpanId=[a6012ccb232fd522], X-B3-Sampled=[0], X-B3-TraceId=[a889a5984eb10d70], X-B3-TraceId=[a889a5984eb10d70]}, partitionKey=M65Y26Z, X-B3-SpanId=928797df743c4c03, X-B3-Sampled=0, X-B3-TraceId=a889a5984eb10d70, id=75e29c84-f80f-fbf7-1fb8-82dbdb836c1e, spanSampled=0, contentType=text/plain, timestamp=1532639895536}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:725)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:231)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:220)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:278)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:379)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:53)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:373)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.UncategorizedAmqpException: java.util.ConcurrentModificationException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:83)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1847)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1784)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:864)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:107)
    at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:95)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164)
    ... 116 more
Caused by: java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
    at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752)
    at java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750)
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.convertHeaderValueIfNecessary(DefaultMessagePropertiesConverter.java:211)
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.convertHeadersIfNecessary(DefaultMessagePropertiesConverter.java:171)
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:139)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.sendToRabbit(RabbitTemplate.java:2010)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1999)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:865)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1841)
    ... 122 more

下面是我们的配置:

@Qualifier("auditChannel")
@Bean
public MessageChannel auditChannel() {
    return MessageChannels.publishSubscribe(threadPoolTaskExecutor()).get();
}



@Bean
public TaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);
    executor.setMaxPoolSize(20);
    executor.setThreadNamePrefix("default_task_executor_thread");
    executor.initialize();
    return executor;
}

@Bean
public IntegrationFlow auditIntegrationFlow(
    OperationalDataStoreClient operationalDataStoreClient,
    @Qualifier("auditChannel") MessageChannel auditChannel,
    Clock clock
) {
    return from(auditChannel)
        .filter(message -> !(message instanceof FileSplitter.FileMarker))
        .handle(new FileAuditHandler(operationalDataStoreClient, clock))
        .get();
}


public class FileAuditHandler {

public void transform(Message<?> message) throws MessagingException {
    ObjectMapper objectMapper = new ObjectMapper();
    MessageHeaders messageHeaders = message.getHeaders();
    String fileName = (String) messageHeaders.get("file_remoteFile");

    FileAuditRecord fileAuditRecord = new FileAuditRecord(
            fileName,
        LocalDateTime.ofInstant(clock.instant(), ZoneId.of("UTC")));

    try {
        operationalDataStoreClient.write("/ods/fms/file_audit/" + messageHeaders.get("auditId") + ".json",
            objectMapper.writeValueAsString(fileAuditRecord),
            "ods_fms_file_audit");

    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

}

可能是因为 ThreadPoolTask​​Executor 的使用方式或 fileAuditHandler 连接不正确?

4

1 回答 1

0

这意味着您有一个类型的消息标头,Map<?, ?>并且您在多条消息中具有相同的标头值,或者您正在以某种方式修改该映射,同时检查它是否包含在传出消息中......

else if (value instanceof Map<?, ?>) {
    @SuppressWarnings("unchecked")
    Map<String, Object> originalMap = (Map<String, Object>) value;
    Map<String, Object> writableMap = new HashMap<String, Object>(originalMap.size());
    for (Map.Entry<String, Object> entry : originalMap.entrySet()) {
        writableMap.put(entry.getKey(), this.convertHeaderValueIfNecessary(entry.getValue()));
    }
    value = writableMap;
}

转换器在地图上进行迭代,而其他一些线程正在修改该地图。

如果Map<?, ?>标题中有一个值,它需要是唯一的并且不能被修改。

你可以使用类似...

setHeader("myMapHeader", new HashMap<>(sourceMap))

...或使地图 aConcurrentMap允许一个线程在另一个线程对其进行修改时进行迭代。

但是,这是可疑的,因为您将有一个关于迭代器是否会看到更改的竞争条件。

虽然框架确保消息和消息标头是不可变的,但如果您将可变对象添加到有效负载和/或标头,则框架无法保护此类问题。

于 2018-07-27T16:45:20.950 回答