1

我正在尝试使用分区在多线程上运行 JAVAEE7 批处理。
我的批处理很简单:读取一堆随机数,使用 3 个线程写出它们的总和。

我的工作 XML

<job id="partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
    version="1.0">
    <step id="process" next="cleanup">
        <chunk item-count="3">
            <reader ref="partitionProcessIR">
                <properties>
                    <property name="start" value="#{partitionPlan['start']}" />
                    <property name="end" value="#{partitionPlan['end']}" />
                </properties>
            </reader>
            <processor ref="partitionProcessIP" />
            <writer ref="partitionProcessIW" />
        </chunk>
        <partition>
            <mapper ref="partitionMapperImpl" />
        </partition>
    </step>
    <step id="cleanup">
        <batchlet ref="partitionCleanupBatchlet"></batchlet>
    </step>
</job>

我的 PartitionMapperImpl:

@Override
 public PartitionPlan mapPartitions() throws Exception {
     // TODO Auto-generated method stub
     return new PartitionPlanImpl() {

         @Override
         public int getPartitions() {
             return 3;
         }

         @Override
         public int getThreads() {
             return 3;
         }

         @Override
         public Properties[] getPartitionProperties() {
             int totalRecords = getTotalRecords();
             int partItems = totalRecords / getPartitions();
             int remainItems = totalRecords % getPartitions();
             Properties[] props = new Properties[getPartitions()];

             for (int i = 0; i < getPartitions(); i++) {
                 props[i] = new Properties();
                 props[i].setProperty("start", String.valueOf(i * partItems));
                 // if this is the last partition, add remaining items
                 if (i == getPartitions() - 1) {
                     props[i].setProperty("end", String.valueOf((i + 1) * partItems + remainItems));
                 } else {
                     props[i].setProperty("end", String.valueOf((i + 1) * partItems));
                 }
             }
             return props;
         }
     };
 }

 private int getTotalRecords() {
     return 50;
 }

我的读者:

@Override
public void open(Serializable checkpoint) throws Exception {
    int start = new Integer(startProperty);
    int end = new Integer(endProperty);
    List<Integer> listNumber = new ArrayList<>();
    for (int i = start; i < end; i++) {
        int rand = (int) (Math.random() * 10);
        listNumber.add(rand);
    }
    iterator = listNumber.iterator();
}

@Override
public Integer readItem() throws Exception {
    if (iterator.hasNext()) {
        return iterator.next();
    }
    // end read
    return null;
}

我的处理器

@Override
    public Integer processItem(Object arg0) throws Exception {
        Integer rand = (Integer) arg0;
        return rand;
    }

我的作家

@Override
    public void writeItems(List<Object> arg0) throws Exception {
        int sum = 0;
        for (Object object : arg0) {
            Integer rand = (Integer) object;
            sum += rand;
        }
        System.out.println(Thread.currentThread().getId() + " | SUM OF CHUNK: " + sum);
    }

当我运行此批处理时,出现以下错误。我猜这与在 derby 数据库中同时存储多个检查点有关。

2017-03-02T15:22:45.955+0700| 情报:275 | 总和:13 2017-03-02T15:22:45.958+0700| 情报:316 | 块的总和:17 2017-03-02T15:23:05.971+0700| 严重:读取-处理-写入循环失败 com.ibm.jbatch.container.exception.BatchContainerServiceException:无法在 com 中保留 [process] 的检查点数据.ibm.jbatch.container.persistence.CheckpointManager.checkpoint(CheckpointManager.java:133) 在 com.ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeChunk(ChunkStepControllerImpl.java:644) 在 com.ibm.jbatch.container.impl .ChunkStepControllerImpl.invokeCoreStep(ChunkStepControllerImpl.java:764) 在 com.ibm.jbatch.container.impl.BaseStepControllerImpl.execute(BaseStepControllerImpl.java:144) 在 com.ibm.jbatch.container.impl.ExecutionTransitioner.doExecutionLoop(ExecutionTransitioner.java :112) 在 com.ibm.jbatch.container。SQLTransactionRollbackException: ????????????????????????????????????: Lock : ROW, CHECKPOINTDATA, (110,27)等待 XID : {77885156, S} , APP, select id, obj from CHECKPOINTDATA where id = ? 授予 XID:{77885155,X} 锁定:ROW,CHECKPOINTDATA,(110,28)等待 XID:{77885155,S},APP,从 CHECKPOINTDATA 中选择 id,obj,其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156? 在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:503) 在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.updateCheckpointData(JBatchJDBCPersistenceManager.java:388) 在 fish.payara.jbatch.persistence .rdbms.LazyBootPersistenceManager.updateCheckpointData(LazyBootPersistenceManager.java:230) 在 com.ibm.jbatch.container.persistence.CheckpointManager。checkpoint(CheckpointManager.java:128) ... 13 更多原因:java.sql.SQLTransactionRollbackException: ??????????????????????????????? ????????: Lock : ROW, CHECKPOINTDATA, (110,27) Waiting XID : {77885156, S} , APP, select id, obj from CHECKPOINTDATA where id = ? 授予 XID:{77885155,X} 锁定:ROW,CHECKPOINTDATA,(110,28)等待 XID:{77885155,S},APP,从 CHECKPOINTDATA 中选择 id,obj,其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156? 在 org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) 在 org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source) 在 org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException (未知来源)在 org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(未知来源)在 org.apache.derby.impl.jdbc。EmbedConnection.handleException(Unknown Source) at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source) at org.apache.derby.impl.jdbc.EmbedResultSet.closeOnTransactionError(Unknown Source) at org.apache.derby。 impl.jdbc.EmbedResultSet.movePosition(Unknown Source) at org.apache.derby.impl.jdbc.EmbedResultSet.next(Unknown Source) at com.sun.gjc.spi.base.ResultSetWrapper.next(ResultSetWrapper.java:103)在fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:498) ... 16 更多原因:java.sql.SQLException: ??????????????? ?????????????????????: Lock : ROW, CHECKPOINTDATA, (110,27) Waiting XID : {77885156, S} , APP, select id, obj from CHECKPOINTDATA哪里 id = ?授予 XID : {77885155, X} 锁: ROW, CHECKPOINTDATA, (110, 28) 等待 XID : {77885155, S} , APP, select id, obj from CHECKPOINTDATA where id = ? 授予 XID : {77885156, X} ????????XID: 77885156? 在 org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown Source) ... 27 更多原因:错误 40001:??? ?????????????????????????????????:锁:行,检查点数据,(110,27)等待XID:{77885156 , S} , APP, 选择 id, obj from CHECKPOINTDATA where id = ? 授予 XID:{77885155,X} 锁定:ROW,CHECKPOINTDATA,(110,28)等待 XID:{77885155,S},APP,从 CHECKPOINTDATA 中选择 id,obj,其中 id = ?授予 XID : {77885156, X} ????????XID: 77885156? 在 org.apache.derby.impl.services.locks 的 org.apache.derby.iapi.error.StandardException.newException(Unknown Source)。

你有任何想法如何解决这个问题吗?
或者任何可以在超过 2 个线程上运行的示例都非常有帮助。
提前致谢。

4

2 回答 2

0

看起来像 Payara 问题,来自堆栈跟踪中的这一行:

fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:503)

您可以尝试使用正确的 GlassFish 运行您的应用程序,看看是否有同样的问题。

或者,您可以将应用程序部署到包含 JBeret 作为批处理容器的 WildFly。如果您的应用程序是按照 JSR 352 规范编写的,它应该在任何 Java EE 7 兼容的应用程序服务器中部署和运行。您可以将 WildFly 配置为将 jdbc 作业存储库与 Derby 或任何其他受支持的 DBMS(包括捆绑的 H2 数据库)一起使用。

如果您仍然卡住,我建议跟进 Payara 项目。

于 2017-03-07T03:57:54.450 回答
0

在我看来,您可能遇到了并发问题,例如死锁或锁定超时。(这有点难以分辨,因为您的异常信息在问题中有点乱码,而且我认为,因为 Derby 消息是以母语字符串和英语字符串的混合形式打印的)。

您可以在此处找到一些用于诊断和理解为什么您的并发数据库访问会遇到这些问题的策略:https ://wiki.apache.org/db-derby/LockDebugging

于 2017-03-02T15:00:39.080 回答