1

我有一个“XAManager”类,用于 WildFly 13 中的 JCA 组件。

在standalone-full.xml 中,有问题的资源适配器定义如下:

...

<resource-adapter id="myTest.ear#LuceneConnector-ra-impl-0.1.0.rar">
    <archive>
        myTest.ear#LuceneConnector-ra-impl-0.1.0.rar
    </archive>
    <transaction-support>XATransaction</transaction-support>
    <connection-definitions>
        <connection-definition class-name="com.impl.LuceneManagedConnectionFactory" jndi-name="java:jboss/eis/LuceneConnector" enabled="true" use-java-context="true" pool-name="LuceneConnection" use-ccm="true">
            <xa-pool>
                <min-pool-size>0</min-pool-size>
                <max-pool-size>10</max-pool-size>
                <prefill>false</prefill>
                <use-strict-min>false</use-strict-min>
                <flush-strategy>FailingConnectionOnly</flush-strategy>
                <pad-xid>false</pad-xid>
                <wrap-xa-resource>true</wrap-xa-resource>
            </xa-pool>
            <security>
                <application/>
            </security>
        </connection-definition>
    </connection-definitions>
</resource-adapter>

……

此类在 JCA 的 ManagedConnection 类中使用,在 getXAResource() 方法中返回:

公共 XAResource getXAResource() 抛出 ResourceException {

if (this.xaResource == null) {
    this.xaResource = new LuceneXAManager(this);
}

if (LuceneManagedConnection.LOGGER.isDebugEnabled()) {
    LuceneManagedConnection.LOGGER.debug("XAResource=" + this.xaResource);
}

return this.xaResource;

}

类定义如下:

公共类 LuceneXAManager 实现 XAResource {

static final String _CVS = "$Revision: 1.12 $";

private static final Logger LOGGER = Logger.getLogger(IndexWriter.class);

private LuceneManagedConnection lmc = null;


// private final CACClientLocal cacClientLocal = null;
// private final CACCommunityLocal cacCommunityLocal = null;

public LuceneXAManager(final LuceneManagedConnection lmc) {
    this.lmc = lmc;
}


/**
 * {@inheritDoc}
 */
public void commit(final Xid xid, final boolean arg1)
    throws XAException {

    if (LuceneXAManager.LOGGER.isDebugEnabled()) {
        LuceneXAManager.LOGGER.debug("commit transaction : " + xid + "  int " + arg1);
    }

    List<LuceneDocumentTransferWrapper> tasks = this.lmc.getLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());

    if (tasks != null) {
        TransactionManager tm = null;
        Transaction tx = null;
        try {
            InitialContext ctx = new InitialContext();
            tm = (TransactionManager)ctx.lookup("java:jboss/TransactionManager");
            tx = tm.suspend();
            UserTransaction ut = (UserTransaction)ctx.lookup("java:jboss/UserTransaction");
            ut.begin();

            for (LuceneDocumentTransferWrapper wrapper : tasks) {
                String operation = wrapper.getCommand();
                Object dataObject = wrapper.getDataObject();
                IndexWriter writer = null;

                if (wrapper.getCommunityId() != null) {

                    if (LuceneManagedConnection.DO_RECREATE_INDEX.equals(operation)) {
                        IndexWriterManager.createNewVNIndexFor(this.loadCommunity(wrapper.getCommunityId()));
                        continue;
                    } else {
                        writer = IndexWriterManager.getVNWriterFor(wrapper.getCommunityId());
                        if (writer == null) {
                            writer = IndexWriterManager.getVNWriterFor(this.loadCommunity(wrapper.getCommunityId()));

                        }
                    }

                    if (writer == null) {
                        throw new IllegalArgumentException("Failed to get index writer for community " + wrapper.getCommunityId());
                    }

                } else if (wrapper.getClientId() != null) {

                    if (LuceneManagedConnection.DO_RECREATE_INDEX.equals(operation)) {
                        IndexWriterManager.createNewIndexFor(this.loadClient(wrapper.getClientId()));
                        continue;
                    } else {
                        writer = IndexWriterManager.getWriterFor(wrapper.getClientId());
                        if (writer == null) {
                            writer = IndexWriterManager.getWriterFor(this.loadClient(wrapper.getClientId()));
                        }
                    }

                    if (writer == null) {
                        throw new IllegalArgumentException("Failed to get index writer for client " + wrapper.getClientId());
                    }

                } else {
                    throw new IllegalArgumentException("Have no clientId or communityId");
                }

                if (LuceneManagedConnection.DO_DELETE_DOCUMENTS.equals(operation)) {

                    List<Long> documentIds = (List<Long>)dataObject;
                    writer.deleteDocuments(documentIds);
                } else if (LuceneManagedConnection.DO_INDEX_DOCUMENT.equals(operation)) {
                    Document document = (Document)dataObject;
                    writer.addDocument(document);

                } else if (LuceneManagedConnection.DO_UPDATE_INDEX.equals(operation)) {
                    Document document = (Document)dataObject;

                    if (Document.DOCUMENT_STATUS_ACTIVE.equals(document.getStatus())
                        || Document.DOCUMENT_STATUS_WAITING.equals(document.getStatus())
                        || Document.DOCUMENT_STATUS_OUTDATED.equals(document.getStatus())) {
                        writer.updateDocument(document, null);
                    } else {
                        writer.deleteDocument(document);
                    }

                } else if (LuceneManagedConnection.DO_INDEX_CONTAINER.equals(operation)) {
                    Container container = (Container)dataObject;

                    writer.deleteContainer(container.getRfid());
                    writer.addContainer(container);

                } else {
                    throw new IllegalArgumentException(" Wrong command " + operation);
                }

            }

            ut.commit();

        } catch (Exception e) {
            LuceneXAManager.LOGGER.error("Failed to handle commit for transaction " + xid, e);

            HashSet<Long> failedDocuments = new HashSet<Long>();

            for (LuceneDocumentTransferWrapper wrapper : tasks) {
                failedDocuments.clear();

                if (LuceneManagedConnection.DO_DELETE_DOCUMENTS.equals(wrapper.getCommand())) {
                    failedDocuments.addAll((List)wrapper.getDataObject());
                } else if (LuceneManagedConnection.DO_INDEX_DOCUMENT.equals(wrapper.getCommand())) {
                    failedDocuments.add(((Document)wrapper.getDataObject()).getId());
                } else if (LuceneManagedConnection.DO_UPDATE_INDEX.equals(wrapper.getCommand())) {
                    failedDocuments.add(((Document)wrapper.getDataObject()).getId());
                }

                for (Long id : failedDocuments) {
                    LuceneXAManager.LOGGER.error("Possible Lucene index corruption - failed to '" + wrapper.getCommand() + "' document (id=" + id
                        + ")");
                }
            }

            throw new XAException(XAException.XAER_RMFAIL);

        } finally {
            try {
                if (tm != null && tx != null) {
                    tm.resume(tx);
                }
            } catch (Exception e) {
                LuceneXAManager.LOGGER.error("Error while close resume session", e);
            }

            this.lmc.removeLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
        }
    }
}


/**
 * {@inheritDoc}
 */
public void end(final Xid xid, final int arg1)
    throws XAException {
    if (LuceneXAManager.LOGGER.isDebugEnabled()) {
        LuceneXAManager.LOGGER.debug("end transaction : " + xid);
    }
}


/**
 * {@inheritDoc}
 */
public void forget(final Xid xid)
    throws XAException {
    if (LuceneXAManager.LOGGER.isDebugEnabled()) {
        LuceneXAManager.LOGGER.debug("forget transaction : " + xid);
    }

    this.lmc.removeLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
}


/**
 * {@inheritDoc}
 */
public int getTransactionTimeout()
    throws XAException {
    return 0;
}


/**
 * {@inheritDoc}
 */
public boolean isSameRM(final XAResource arg0)
    throws XAException {
    return false;
}


/**
 * {@inheritDoc}
 */
public int prepare(final Xid xid)
    throws XAException {

    if (LuceneXAManager.LOGGER.isDebugEnabled()) {
        LuceneXAManager.LOGGER.debug("prepare transaction : " + xid);
    }

    List<LuceneDocumentTransferWrapper> tasks = this.lmc.getLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());

    List<Long> clientWriter = new ArrayList<Long>();
    List<Long> communityWriter = new ArrayList<Long>();

    if (tasks != null) {
        for (LuceneDocumentTransferWrapper wrapper : tasks) {
            if (wrapper.getCommunityId() != null) {
                if (!communityWriter.contains(wrapper.getCommunityId())) {
                    communityWriter.add(wrapper.getCommunityId());
                }
            } else {
                if (!clientWriter.contains(wrapper.getClientId())) {
                    clientWriter.add(wrapper.getClientId());
                }
            }
        }
    }

    TransactionManager tm = null;
    try {

        InitialContext ctx = new InitialContext();
        tm = (TransactionManager)ctx.lookup(CACBean.JDNI_TX_MANAGER);
        tm.suspend();
        UserTransaction ut = (UserTransaction)ctx.lookup(CACBean.JDNI_USERTRANACTION);
        ut.begin();

        for (Long clientId : clientWriter) {
            IndexWriter writer = IndexWriterManager.getWriterFor(clientId);
            if (writer == null) {
                if (IndexWriterManager.getWriterFor(this.loadClient(clientId)) == null) {
                    throw new IllegalArgumentException("Failed to get index writer for client " + clientId);
                }
            }
        }

        for (Long communityId : communityWriter) {
            IndexWriter writer = IndexWriterManager.getVNWriterFor(communityId);
            if (writer == null) {
                if (IndexWriterManager.getVNWriterFor(this.loadCommunity(communityId)) == null) {
                    throw new IllegalArgumentException("Failed to get index writer for community " + communityId);
                }
            }
        }

        ut.commit();
    } catch (Exception e) {
        LuceneXAManager.LOGGER.error("Failed to handle prepare for transaction " + xid, e);
        throw new XAException(XAException.XAER_RMFAIL);
    } finally {
        // try {
        // if (tm != null && tx != null) {
        // tm.resume(tx);
        // }
        // } catch (Exception e) {
        // LuceneXAManager.LOGGER.warn("Error while close resume session", e);
        // }
    }

    return XAResource.XA_OK;
}


/**
 * {@inheritDoc}
 */
public Xid[] recover(final int arg0)
    throws XAException {
    return null;
}


/**
 * {@inheritDoc}
 */
public void rollback(final Xid xid)
    throws XAException {

    if (LuceneXAManager.LOGGER.isDebugEnabled()) {
        LuceneXAManager.LOGGER.debug("rollback transaction : " + xid);
    }

    this.lmc.removeLuceneDocumentTransferWrapper(xid.getGlobalTransactionId());
}


/**
 * {@inheritDoc}
 */
public boolean setTransactionTimeout(final int arg0)
    throws XAException {
    return false;
}


/**
 * {@inheritDoc}
 */
public void start(final Xid arg0, final int arg1)
    throws XAException {
    if (LuceneXAManager.LOGGER.isDebugEnabled()) {
        LuceneXAManager.LOGGER.debug("start transaction : " + arg0 + "  int " + arg1);
    }
}


private Client loadClient(final Long clientId)
    throws RemoteException, CreateException, NamingException, CACException {

    ....

    return client;
}


private Community loadCommunity(final Long communityId)
    throws RemoteException, CreateException, NamingException, CACException {

    ....
    return community;
}

}

我遇到的问题是,当尝试通过 JNDI 获取 UserTransaction 时,我得到了Caused by: javax.naming.NameNotFoundException: UserTransaction [Root exception is java.lang.IllegalStateException: WFLYEJB0137: Only session and message-driven beans with bean-managed transaction demarcation are allowed to access UserTransaction]

我尝试用注释我的班级@Stateless@TransactionManagement(TransactionManagementType.BEAN)解决这个问题,但没有用。我也尝试通过 @Inject UserTransaction 注入 UserTransaction 但在这种情况下 UserTransaction 为空。

有任何想法吗 ?

4

0 回答 0