我有一个“XAManager”类,用于 WildFly 13 中的 JCA 组件。
在standalone-full.xml 中,有问题的资源适配器定义如下:
...
<resource-adapter id="myTest.ear#LuceneConnector-ra-impl-0.1.0.rar">
<archive>
myTest.ear#LuceneConnector-ra-impl-0.1.0.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<connection-definitions>
<connection-definition class-name="com.impl.LuceneManagedConnectionFactory" jndi-name="java:jboss/eis/LuceneConnector" enabled="true" use-java-context="true" pool-name="LuceneConnection" use-ccm="true">
<xa-pool>
<min-pool-size>0</min-pool-size>
<max-pool-size>10</max-pool-size>
<prefill>false</prefill>
<use-strict-min>false</use-strict-min>
<flush-strategy>FailingConnectionOnly</flush-strategy>
<pad-xid>false</pad-xid>
<wrap-xa-resource>true</wrap-xa-resource>
</xa-pool>
<security>
<application/>
</security>
</connection-definition>
</connection-definitions>
</resource-adapter>
……
此类在 JCA 的 ManagedConnection 类中使用,在 getXAResource() 方法中返回:
公共 XAResource getXAResource() 抛出 ResourceException {
if (this.xaResource == null) {
this.xaResource = new LuceneXAManager(this);
}
if (LuceneManagedConnection.LOGGER.isDebugEnabled()) {
LuceneManagedConnection.LOGGER.debug("XAResource=" + this.xaResource);
}
return this.xaResource;
}
类定义如下:
公共类 LuceneXAManager 实现 XAResource {
static final String _CVS = "$Revision: 1.12 $";
private static final Logger LOGGER = Logger.getLogger(IndexWriter.class);
private LuceneManagedConnection lmc = null;
// private final CACClientLocal cacClientLocal = null;
// private final CACCommunityLocal cacCommunityLocal = null;
public LuceneXAManager(final LuceneManagedConnection lmc) {
this.lmc = lmc;
}
/**
* {@inheritDoc}
*/
public void commit(final Xid xid, final boolean arg1)
throws XAException {
if (LuceneXAManager.LOGGER.isDebugEnabled()) {
LuceneXAManager.LOGGER.debug("commit transaction : " + xid + " int " + arg1);
}
List<LuceneDocumentTransferWrapper> tasks = this.lmc.getLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
if (tasks != null) {
TransactionManager tm = null;
Transaction tx = null;
try {
InitialContext ctx = new InitialContext();
tm = (TransactionManager)ctx.lookup("java:jboss/TransactionManager");
tx = tm.suspend();
UserTransaction ut = (UserTransaction)ctx.lookup("java:jboss/UserTransaction");
ut.begin();
for (LuceneDocumentTransferWrapper wrapper : tasks) {
String operation = wrapper.getCommand();
Object dataObject = wrapper.getDataObject();
IndexWriter writer = null;
if (wrapper.getCommunityId() != null) {
if (LuceneManagedConnection.DO_RECREATE_INDEX.equals(operation)) {
IndexWriterManager.createNewVNIndexFor(this.loadCommunity(wrapper.getCommunityId()));
continue;
} else {
writer = IndexWriterManager.getVNWriterFor(wrapper.getCommunityId());
if (writer == null) {
writer = IndexWriterManager.getVNWriterFor(this.loadCommunity(wrapper.getCommunityId()));
}
}
if (writer == null) {
throw new IllegalArgumentException("Failed to get index writer for community " + wrapper.getCommunityId());
}
} else if (wrapper.getClientId() != null) {
if (LuceneManagedConnection.DO_RECREATE_INDEX.equals(operation)) {
IndexWriterManager.createNewIndexFor(this.loadClient(wrapper.getClientId()));
continue;
} else {
writer = IndexWriterManager.getWriterFor(wrapper.getClientId());
if (writer == null) {
writer = IndexWriterManager.getWriterFor(this.loadClient(wrapper.getClientId()));
}
}
if (writer == null) {
throw new IllegalArgumentException("Failed to get index writer for client " + wrapper.getClientId());
}
} else {
throw new IllegalArgumentException("Have no clientId or communityId");
}
if (LuceneManagedConnection.DO_DELETE_DOCUMENTS.equals(operation)) {
List<Long> documentIds = (List<Long>)dataObject;
writer.deleteDocuments(documentIds);
} else if (LuceneManagedConnection.DO_INDEX_DOCUMENT.equals(operation)) {
Document document = (Document)dataObject;
writer.addDocument(document);
} else if (LuceneManagedConnection.DO_UPDATE_INDEX.equals(operation)) {
Document document = (Document)dataObject;
if (Document.DOCUMENT_STATUS_ACTIVE.equals(document.getStatus())
|| Document.DOCUMENT_STATUS_WAITING.equals(document.getStatus())
|| Document.DOCUMENT_STATUS_OUTDATED.equals(document.getStatus())) {
writer.updateDocument(document, null);
} else {
writer.deleteDocument(document);
}
} else if (LuceneManagedConnection.DO_INDEX_CONTAINER.equals(operation)) {
Container container = (Container)dataObject;
writer.deleteContainer(container.getRfid());
writer.addContainer(container);
} else {
throw new IllegalArgumentException(" Wrong command " + operation);
}
}
ut.commit();
} catch (Exception e) {
LuceneXAManager.LOGGER.error("Failed to handle commit for transaction " + xid, e);
HashSet<Long> failedDocuments = new HashSet<Long>();
for (LuceneDocumentTransferWrapper wrapper : tasks) {
failedDocuments.clear();
if (LuceneManagedConnection.DO_DELETE_DOCUMENTS.equals(wrapper.getCommand())) {
failedDocuments.addAll((List)wrapper.getDataObject());
} else if (LuceneManagedConnection.DO_INDEX_DOCUMENT.equals(wrapper.getCommand())) {
failedDocuments.add(((Document)wrapper.getDataObject()).getId());
} else if (LuceneManagedConnection.DO_UPDATE_INDEX.equals(wrapper.getCommand())) {
failedDocuments.add(((Document)wrapper.getDataObject()).getId());
}
for (Long id : failedDocuments) {
LuceneXAManager.LOGGER.error("Possible Lucene index corruption - failed to '" + wrapper.getCommand() + "' document (id=" + id
+ ")");
}
}
throw new XAException(XAException.XAER_RMFAIL);
} finally {
try {
if (tm != null && tx != null) {
tm.resume(tx);
}
} catch (Exception e) {
LuceneXAManager.LOGGER.error("Error while close resume session", e);
}
this.lmc.removeLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
}
}
}
/**
* {@inheritDoc}
*/
public void end(final Xid xid, final int arg1)
throws XAException {
if (LuceneXAManager.LOGGER.isDebugEnabled()) {
LuceneXAManager.LOGGER.debug("end transaction : " + xid);
}
}
/**
* {@inheritDoc}
*/
public void forget(final Xid xid)
throws XAException {
if (LuceneXAManager.LOGGER.isDebugEnabled()) {
LuceneXAManager.LOGGER.debug("forget transaction : " + xid);
}
this.lmc.removeLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
}
/**
* {@inheritDoc}
*/
public int getTransactionTimeout()
throws XAException {
return 0;
}
/**
* {@inheritDoc}
*/
public boolean isSameRM(final XAResource arg0)
throws XAException {
return false;
}
/**
* {@inheritDoc}
*/
public int prepare(final Xid xid)
throws XAException {
if (LuceneXAManager.LOGGER.isDebugEnabled()) {
LuceneXAManager.LOGGER.debug("prepare transaction : " + xid);
}
List<LuceneDocumentTransferWrapper> tasks = this.lmc.getLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
List<Long> clientWriter = new ArrayList<Long>();
List<Long> communityWriter = new ArrayList<Long>();
if (tasks != null) {
for (LuceneDocumentTransferWrapper wrapper : tasks) {
if (wrapper.getCommunityId() != null) {
if (!communityWriter.contains(wrapper.getCommunityId())) {
communityWriter.add(wrapper.getCommunityId());
}
} else {
if (!clientWriter.contains(wrapper.getClientId())) {
clientWriter.add(wrapper.getClientId());
}
}
}
}
TransactionManager tm = null;
try {
InitialContext ctx = new InitialContext();
tm = (TransactionManager)ctx.lookup(CACBean.JDNI_TX_MANAGER);
tm.suspend();
UserTransaction ut = (UserTransaction)ctx.lookup(CACBean.JDNI_USERTRANACTION);
ut.begin();
for (Long clientId : clientWriter) {
IndexWriter writer = IndexWriterManager.getWriterFor(clientId);
if (writer == null) {
if (IndexWriterManager.getWriterFor(this.loadClient(clientId)) == null) {
throw new IllegalArgumentException("Failed to get index writer for client " + clientId);
}
}
}
for (Long communityId : communityWriter) {
IndexWriter writer = IndexWriterManager.getVNWriterFor(communityId);
if (writer == null) {
if (IndexWriterManager.getVNWriterFor(this.loadCommunity(communityId)) == null) {
throw new IllegalArgumentException("Failed to get index writer for community " + communityId);
}
}
}
ut.commit();
} catch (Exception e) {
LuceneXAManager.LOGGER.error("Failed to handle prepare for transaction " + xid, e);
throw new XAException(XAException.XAER_RMFAIL);
} finally {
// try {
// if (tm != null && tx != null) {
// tm.resume(tx);
// }
// } catch (Exception e) {
// LuceneXAManager.LOGGER.warn("Error while close resume session", e);
// }
}
return XAResource.XA_OK;
}
/**
* {@inheritDoc}
*/
public Xid[] recover(final int arg0)
throws XAException {
return null;
}
/**
* {@inheritDoc}
*/
public void rollback(final Xid xid)
throws XAException {
if (LuceneXAManager.LOGGER.isDebugEnabled()) {
LuceneXAManager.LOGGER.debug("rollback transaction : " + xid);
}
this.lmc.removeLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
}
/**
* {@inheritDoc}
*/
public boolean setTransactionTimeout(final int arg0)
throws XAException {
return false;
}
/**
* {@inheritDoc}
*/
public void start(final Xid arg0, final int arg1)
throws XAException {
if (LuceneXAManager.LOGGER.isDebugEnabled()) {
LuceneXAManager.LOGGER.debug("start transaction : " + arg0 + " int " + arg1);
}
}
private Client loadClient(final Long clientId)
throws RemoteException, CreateException, NamingException, CACException {
....
return client;
}
private Community loadCommunity(final Long communityId)
throws RemoteException, CreateException, NamingException, CACException {
....
return community;
}
}
我遇到的问题是,当尝试通过 JNDI 获取 UserTransaction 时,我得到了Caused by: javax.naming.NameNotFoundException: UserTransaction [Root exception is java.lang.IllegalStateException: WFLYEJB0137: Only session and message-driven beans with bean-managed transaction demarcation are allowed to access UserTransaction]
我尝试用注释我的班级@Stateless
并@TransactionManagement(TransactionManagementType.BEAN)
解决这个问题,但没有用。我也尝试通过 @Inject UserTransaction 注入 UserTransaction 但在这种情况下 UserTransaction 为空。
有任何想法吗 ?