0

让我解释一下我的应用程序是如何设置的。首先,我有一个独立的命令行启动应用程序,该应用程序运行一个主程序,而主程序又在传递适当参数的作业操作员上调用 start。我知道 start 是一个异步调用,一旦我调用 start ,除非我在我的 main 中阻塞了一些它是如何死的。

我遇到的问题是,当我运行分区作业时,它似乎使一些线程处于活动状态,从而阻止了整个处理结束。当我运行非分区作业时,一旦作业完成,进程就会正常结束。

这是正常和/或预期的行为吗?有没有办法告诉分区线程死亡。似乎分区线程在作业完成后被阻塞等待某些东西,它们不应该是吗?

我知道我可以监控主要的批处理状态并可能结束它,但正如我在另一个问题中所说的那样,这给数据库增加了大量的喋喋不休,并不理想。

我的工作规范的一个例子

<job id="partitionTest" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<step id="onlyStep">
    <partition>
        <plan partitions="2">
            <properties partition="0">
                <property name="partitionNumber" value="1"></property>
            </properties>
            <properties partition="1">
                <property name="partitionNumber" value="2"></property>
            </properties>
        </plan>
    </partition>

    <chunk item-count="2">
        <reader id="reader" ref="DelimitedFlatFileReader">
            <properties>
                <!-- Reads in from file Test.csv -->
                <property name="fileNameAndPath" value="#{jobParameters['inputPath']}/CSVInput#{partitionPlan['partitionNumber']}.csv" />
                <property name="fieldNames" value="firstName, lastName, city" />
                <property name="fullyQualifiedTargetClass" value="com.test.transactionaltest.Member" />
            </properties>
        </reader>
        <processor ref="com.test.partitiontest.Processor" />
        <writer ref="FlatFileWriter" >
            <properties>
                <property name="appendOn" value="true"/>
                <property name="fileNameAndPath" value="#{jobParameters['outputPath']}/PartitionOutput.txt" />
                <property name="fullyQualifiedTargetClass" value="com.test.transactionaltest.Member" />
            </properties>
        </writer>
    </chunk>
</step>
</job>

编辑:

好的,阅读有关此问题的更多信息并查看 spring 批处理代码,至少在我看来,JsrPartitionHandler 中似乎存在一个错误。具体来说,handle 方法会在本地创建一个 ThreadPoolTask​​Executor,但是该线程池永远不会被正确清理。应该在该方法返回之前调用关闭/销毁以执行一些清理,否则线程会留在内存中并超出范围。

如果我在这里错了,请纠正我,但这绝对是问题所在。

我要去尝试改变它,看看它是如何发挥作用的。我做了一些测试后会更新。

4

1 回答 1

0

我已经确认这个问题是 spring 批处理核心库中的一个错误(在我看来仍然是 atm)。

我在 spring batch jira 站点上创建了一张票。票证上有一个简单的附加 Java 项目,可以确认我看到的问题。如果其他任何人遇到问题,他们应该参考该票。

我找到了一个临时解决方法,它只使用等待/通知方案,并且似乎曾经添加过池线程关闭。我将添加每个类/代码并尝试解释我做了什么。

在主线程/类中,这是存在于 main 方法或从 main 调用的方法中的代码

        while(!ThreadNotifier.instance(this).getNotify()){
        try {
            synchronized(this){
                System.out.println("WAIT THREAD IS =======" + Thread.currentThread().getName());
                wait();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

这是 ThreadNotifier 类

public class ThreadNotifier {


private static ThreadNotifier tn = null;

private boolean notification = false;

private Object o;

private ThreadNotifier(Object o){
    this.o = o;

}

public static ThreadNotifier instance(Object o){

    if(tn == null){
        tn = new ThreadNotifier(o);
    }

    return tn;
}


public void setNotify(boolean value){
    notification = true;
    synchronized(o){
        System.out.println("NOTIFY THREAD IS =======" + Thread.currentThread().getName());
        o.notify();
    }
}

public boolean getNotify(){
    return notification;
}

}

最后,这是我用来提供通知的作业监听器

public class PartitionWorkAround implements JobListener {

@Override
public void beforeJob() throws Exception {
    // TODO Auto-generated method stub

}

@Override
public void afterJob() throws Exception {

    ThreadNotifier.instance(null).setNotify(true);

}

}

在问题解决之前,这是我能想到的最好的方法。作为参考,我使用在这里学到的关于受保护块的知识来找出一种方法来做到这一点。

于 2015-09-18T19:31:37.553 回答