我们想使用 Spring Integration 2.1.1 以事务方式从 POP3 邮箱中读取邮件。事务性使我们可以读取电子邮件并将其保存到 Oracle 并从邮箱中删除。在事务回滚的情况下,我们希望电子邮件保留在邮箱中(即不被删除)。
我们的问题是,尽管处于事务中(稍后会详细介绍),但邮箱被打开,邮件被检索,标记为删除并且连接关闭,而与事务的其余部分无关。这意味着如果稍后发生故障(因为连接已关闭并且邮箱进入永久删除电子邮件的更新状态),则不删除邮件为时已晚。
我们一直在尝试通过创建事务轮询入站通道适配器来做到这一点:
<!-- This inbound channel adaptor interfaces to the sendmail POP3 queue -->
<int-mail:inbound-channel-adapter id="pop3PollingChannelAdaptor"
store-uri="pop3://myuser:myuser@100.100.100.100/inbox"
channel="receiveInboundEmailChannel"
should-delete-messages="true"
auto-startup="true"
java-mail-properties="javaMailProperties">
<!-- Will poll every 20 seconds -->
<int:poller fixed-rate="10000"
max-messages-per-poll="1">
<int:transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="false"
timeout="100000" />
</int:poller>
</int-mail:inbound-channel-adapter>
它将消息放在入站电子邮件通道上:
依次由出站通道适配器(端点)监听:
<int:outbound-channel-adapter channel="receiveInboundEmailChannel"
ref="inboundEmailMessageEndpoint"
method="processMessage" />
端点的实现目前使事情变得非常简单。目前我们并没有靠近数据库,相反,如果我们想触发回滚,我们只是抛出一个异常。
@MessageEndpoint
public class InboundEmailMessageEndpoint {
@Transactional(propagation = Propagation.MANDATORY)
public void processMessage(Message<MimeMessage> message) {
MimeMessage mailMessage = message.getPayload();
// We throw exceptions here to cause a rollback if we need to during investigation...
}
}
查看所有日志,我们可以在上下文中看到我们的问题(注意 POP3 “sayonara”是如何在端点中的其余工作开始之前,最重要的是,在周围事务提交/回滚之前):
15:32:25.256 INFO [main][org.springframework.transaction.jta.JtaTransactionManager] Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@3228a1
15:32:25.256 INFO [main][org.springframework.transaction.jta.JtaTransactionManager] Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@10980e7
15:32:25.422 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning NULL!
15:32:25.422 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning NULL!
15:32:25.423 DEBUG [task-scheduler-1][org.springframework.transaction.jta.JtaTransactionManager] Creating new transaction with name [org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT,timeout_100000
15:32:25.423 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning NULL!
15:32:25.423 WARN [task-scheduler-1][com.atomikos.icatch.imp.TransactionServiceImp] Attempt to create a transaction with a timeout that exceeds com.atomikos.icatch.max_timeout - truncating to: 300000
15:32:25.423 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 entering state: ACTIVE
15:32:25.425 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.TaskManager] TaskManager: initializing...
15:32:25.426 INFO [task-scheduler-1][com.atomikos.icatch.imp.thread.TaskManager] THREADS: using JDK thread pooling...
15:32:25.428 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.TaskManager] THREADS: using executor class com.atomikos.icatch.imp.thread.Java15ExecutorFactory$Executor
15:32:25.428 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.Java15ExecutorFactory] (1.5) executing task: com.atomikos.timing.PooledAlarmTimer@18c4a7f
15:32:25.429 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.thread.ThreadFactory] ThreadFactory: creating new thread: Atomikos:0
15:32:25.429 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.TransactionServiceImp] Creating composite transaction: 10.9.21.7.tm0000100022
15:32:25.433 INFO [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] createCompositeTransaction ( 100000000 ): created new ROOT transaction with id 10.9.21.7.tm0000100022
DEBUG: JavaMail version 1.4.4
DEBUG: successfully loaded resource: /META-INF/javamail.default.providers
DEBUG: Tables of loaded providers
DEBUG: Providers Listed By Class Name: {com.sun.mail.smtp.SMTPSSLTransport=javax.mail.Provider[TRANSPORT,smtps,com.sun.mail.smtp.SMTPSSLTransport,Sun Microsystems, Inc], com.sun.mail.smtp.SMTPTransport=javax.mail.Provider[TRANSPORT,smtp,com.sun.mail.smtp.SMTPTransport,Sun Microsystems, Inc], com.sun.mail.imap.IMAPSSLStore=javax.mail.Provider[STORE,imaps,com.sun.mail.imap.IMAPSSLStore,Sun Microsystems, Inc], com.sun.mail.pop3.POP3SSLStore=javax.mail.Provider[STORE,pop3s,com.sun.mail.pop3.POP3SSLStore,Sun Microsystems, Inc], com.sun.mail.imap.IMAPStore=javax.mail.Provider[STORE,imap,com.sun.mail.imap.IMAPStore,Sun Microsystems, Inc], com.sun.mail.pop3.POP3Store=javax.mail.Provider[STORE,pop3,com.sun.mail.pop3.POP3Store,Sun Microsystems, Inc]}
DEBUG: Providers Listed By Protocol: {imaps=javax.mail.Provider[STORE,imaps,com.sun.mail.imap.IMAPSSLStore,Sun Microsystems, Inc], imap=javax.mail.Provider[STORE,imap,com.sun.mail.imap.IMAPStore,Sun Microsystems, Inc], smtps=javax.mail.Provider[TRANSPORT,smtps,com.sun.mail.smtp.SMTPSSLTransport,Sun Microsystems, Inc], pop3=javax.mail.Provider[STORE,pop3,com.sun.mail.pop3.POP3Store,Sun Microsystems, Inc], pop3s=javax.mail.Provider[STORE,pop3s,com.sun.mail.pop3.POP3SSLStore,Sun Microsystems, Inc], smtp=javax.mail.Provider[TRANSPORT,smtp,com.sun.mail.smtp.SMTPTransport,Sun Microsystems, Inc]}
DEBUG: successfully loaded resource: /META-INF/javamail.default.address.map
DEBUG: getProvider() returning javax.mail.Provider[STORE,pop3,com.sun.mail.pop3.POP3Store,Sun Microsystems, Inc]
DEBUG POP3: mail.pop3.rsetbeforequit: false
DEBUG POP3: mail.pop3.disabletop: false
DEBUG POP3: mail.pop3.forgettopheaders: false
DEBUG POP3: mail.pop3.cachewriteto: false
DEBUG POP3: mail.pop3.filecache.enable: false
DEBUG POP3: mail.pop3.keepmessagecontent: false
DEBUG POP3: mail.pop3.starttls.enable: false
DEBUG POP3: mail.pop3.starttls.required: false
15:32:25.446 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] connecting to store [pop3://devmail9:*****@100.100.100.100/inbox]
DEBUG POP3: mail.pop3.apop.enable: false
DEBUG POP3: mail.pop3.disablecapa: false
DEBUG POP3: connecting to host "100.100.100.100", port 110, isSSL false
S: +OK POP3 xxxxx.xxxx.xxxx.xxx.xxx.uk v4.39 server ready
C: CAPA
S: -ERR Unknown command in AUTHORIZATION state
C: USER myuser
S: +OK User name accepted, password please
C: PASS myuser
S: +OK Mailbox open, 3 messages
15:32:25.564 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] opening folder [pop3://myuser:*****@100.100.100.100/inbox]
C: STAT
S: +OK 3 5956
15:32:25.569 INFO [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] attempting to receive mail from folder [inbox]
C: NOOP
S: +OK No-op to you too!
15:32:25.591 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] found 1 new messages
C: TOP 1 0
S: +OK Top of message follows
Received: from xxxxx.xxxx.xxxx.xxx.xxx.uk ([100.100.100.100]) by xxxxx.xxxx.xxxx.xxx.xxx.uk (AIX5.2/8.11.6p2/8.11.0) with ESMTP id q57EUrG1544330 for C: LIST 1
....
S: +OK 1 1996
15:32:25.604 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] Recieved 1 messages
15:32:26.097 DEBUG [task-scheduler-1][org.springframework.integration.mail.Pop3MailReceiver] USER flags are not supported by this mail server. Flagging message with system flag
DEBUG POP3: streaming msg 1
C: RETR 1
S: +OK 1996 octets
Received: from xxxxx.xxxx.xxxx.xxx.xxx.uk ([100.100.100.100]) by GLA610.crown.copfs.gsi.gov.uk (AIX5.2/8.11.6p2/8.11.0) with ESMTP id q57EUrG1544330 for <myuser@xxxxx.xxxx.xxxx.xxx.xxx.uk>; Thu, 7 Jun 2012 15:30:53 +0100
....
DEBUG POP3: streaming msg 1
C: RETR 1
S: +OK 1996 octets
Received: from xxxxx.xxxx.xxxx.xxx.xxx.uk ([100.100.100.100]) by xxxxx.xxxx.xxxx.xxx.xxx.uk (AIX5.2/8.11.6p2/8.11.0) with ESMTP id q57EUrG1544330 for <myuser@xxxxx.xxxx.xxxx.xxx.xxx.uk>; Thu, 7 Jun 2012 15:30:53 +0100
....
.
C: NOOP
S: +OK No-op to you too!
C: DELE 1
S: +OK Message deleted
C: QUIT
S: +OK Sayonara
15:32:26.672 DEBUG [task-scheduler-1][org.springframework.integration.mail.MailReceivingMessageSource] received mail message [javax.mail.internet.MimeMessage@ef4504]
15:32:26.696 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.697 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.697 DEBUG [task-scheduler-1][org.springframework.transaction.jta.JtaTransactionManager] Participating in existing transaction
15:32:26.716 INFO [task-scheduler-1][xx.xxx.xxx.xxx.channel.email.InboundEmailMessageEndpoint] In InboundEmailMessageEndpoint. processing MailMessage (Subject): 7777777777777777777
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][org.springframework.transaction.jta.JtaTransactionManager] Initiating transaction commit
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.735 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.BaseTransactionManager] getCompositeTransaction() returning instance with id 10.9.21.7.tm0000100022
15:32:26.736 INFO [task-scheduler-1][com.atomikos.icatch.imp.CompositeTransactionImp] commit() done (by application) of transaction 10.9.21.7.tm0000100022
15:32:26.736 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 entering state: COMMITTING
15:32:26.740 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 entering state: TERMINATED
15:32:26.741 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 : stopping timer...
15:32:26.741 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 : disposing statehandler TERMINATED...
15:32:26.741 DEBUG [task-scheduler-1][com.atomikos.icatch.imp.CoordinatorImp] Coordinator 10.9.21.7.tm0000100022 : disposed.