2

我正在尝试使用 Spring Integration Java DSL 来接收电子邮件(gmail)。我也在使用 Redis。当我收到邮件时,我得到 ConcurrentModificationException 。在搜索时,我还发现人们在 Kafka 中也遇到了同样的错误。下面是我的代码和异常。

感谢您的帮助。

    Properties javaMailProperties = new Properties();
    javaMailProperties.setProperty("mail.imap.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
    javaMailProperties.setProperty("mail.imap.socketFactory.fallback", "false");
    javaMailProperties.setProperty("mail.store.protocol", "imaps");
    javaMailProperties.setProperty("mail.debug", "false");
    String imapUrl = "imaps://email%40gmail.com:password@imap.gmail.com:993/INBOX";



    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(Mail.imapInboundAdapter(imapUrl).javaMailProperties(javaMailProperties).shouldMarkMessagesAsRead(true)
            , new Consumer<SourcePollingChannelAdapterSpec>() {


        @Override
        public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
            sourcePollingChannelAdapterSpec
                    .poller(defaultPoller);
        }
    });
    return flowBuilder.channel(source.output()).get();

例外 :

2016-04-16 09:19:51.449  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
2016-04-16 09:21:31.463 ERROR 11640 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder$SendingHandler@2f104438]; nested exception is com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
c (sun.security.ssl.AppInputStream)
in (com.sun.mail.util.TraceInputStream)
in (java.io.BufferedInputStream)
bin (com.sun.mail.iap.ResponseInputStream)
input (com.sun.mail.imap.protocol.IMAPProtocol)
authenticatedConnections (com.sun.mail.imap.IMAPStore$ConnectionPool)
pool (com.sun.mail.imap.IMAPSSLStore)
store (com.sun.mail.imap.IMAPFolder)
folder (com.sun.mail.imap.IMAPMessage)
source (org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:161)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:57)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:176)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:173)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:330)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
c (sun.security.ssl.AppInputStream)
in (com.sun.mail.util.TraceInputStream)
in (java.io.BufferedInputStream)
bin (com.sun.mail.iap.ResponseInputStream)
input (com.sun.mail.imap.protocol.IMAPProtocol)
authenticatedConnections (com.sun.mail.imap.IMAPStore$ConnectionPool)
pool (com.sun.mail.imap.IMAPSSLStore)
store (com.sun.mail.imap.IMAPFolder)
folder (com.sun.mail.imap.IMAPMessage)
source (org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534)
    at org.springframework.integration.codec.kryo.PojoCodec.doEncode(PojoCodec.java:92)
    at org.springframework.integration.codec.kryo.AbstractKryoCodec$2.execute(AbstractKryoCodec.java:66)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:61)
    at org.springframework.integration.codec.kryo.AbstractKryoCodec.encode(AbstractKryoCodec.java:63)
    at org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary(AbstractBinder.java:230)
    at org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary(AbstractBinder.java:210)
    at org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.access$800(RedisMessageChannelBinder.java:71)
    at org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder$SendingHandler.handleMessageInternal(RedisMessageChannelBinder.java:295)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    ... 28 more
Caused by: java.util.ConcurrentModificationException
    at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
    at java.util.Vector$Itr.next(Vector.java:1133)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:92)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    ... 84 more

2016-04-16 09:21:36.188  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
2016-04-16 09:21:39.104  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
4

1 回答 1

0

看起来我们需要一个自定义的 kryo 序列化器(由 Spring Cloud Stream 使用)用于MimeMessage. 我打开了一个针对 Spring Integration的JIRA 问题。

然而,当我测试它时,我得到了一个堆栈溢出错误,而不是你看到的异常。

mail.debug如果您可以将javamail 属性设置为的控制台输出附加到 JIRA 问题上,并附上一条显示您的问题的消息,这将很有帮助true,这样我们就可以确定我们也解决了这个问题,因为这样我们就可以重建一个类似的MimeMessage.

编辑

初步分析表明,问题是在 mime 消息java.util.Logger的字段内序列化 a。session

此外,事实证明这个对象 ( AbstractMailReceiver.IntegrationMimeMessage) 根本不适合序列化,因为它在其范围内具有整个应用程序上下文,因为它是邮件接收器中的一个嵌套类。

因此,我建议在邮件模块中添加一个转换器,以将此对象转换为适合任何类型的序列化的对象。

编辑2

一种解决方法是添加.transform(mailToStringTransformer())到流中。邮件内容将是邮件的正文,发件人、密件抄送、抄送、收件人、回复和主题将添加到标题中。

于 2016-04-16T13:45:53.100 回答