0

我们正在使用 Spring Batch Chunk 从 JMS 目标读取消息并写入平面文件。在这方面,我们有以下观察,

  1. 如果消息代理在阅读器读取消息并且未达到提交计数时关闭,则到目前为止读取的消息数量都将传递给 Writer,然后批处理进入 FAILED 状态。这是 Chunk 的默认行为吗?

  2. 如果第 1 点的答案是肯定的,我们如何确保这部分块不发送给 Writer。(为了提供更多关于这个问题的背景知识,我们在 JMS 模板中处理了 JMS 会话,因此当块无法读取等于 Commit Count 的完整消息数时,部分块中读取的所有消息都将回滚到JMS 目标,其中相同的部分块被写入文件。当我们重新启动批处理作业时,这会导致文件中的重复)。

任何帮助将不胜感激。

编辑

配置如下图,

块:

<batch:step id="step-1" next="step-2">
    <batch:tasklet allow-start-if-complete="false">
        <batch:chunk reader="jms-reader-1-1" writer="file-writer-1-1" commit-interval="1000">
    </batch:chunk>
</batch:step>

作家(平面文件):

<bean scope="step" class="o.s.b.i.f.FlatFileItemWriter" id="file-writer-1-1">
    <property name="resource" value="file:#{T(com.test.core.BatchConfiguration).BATCH_VFS_LOCAL_TEMP_LOCATION}/#{T(com.test.utils.ThreadContextUtils).getJobInstanceIdAsString()}/AssetMesage"/>
    <property name="lineAggregator">
        <bean class="o.s.b.i.f.t.DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="o.s.b.i.f.t.BeanWrapperFieldExtractor">
                    <property name="names" value="assetId,assetName,assetDesc"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

阅读器(JMS):

<bean scope="step" class="com.test.runtime.impl.item.readers.JMSItemReader" id="jms-reader-1-1">
    <property name="adapter">
        <bean class="com.test.adapter.impl.JMSAdapter">
            <property name="resource" ref="JMS.vmsmartbatch02_Regression"/>
            <property name="retryerId" value="JMS.vmsmartbatch02_Regression-retryer"/>
        </bean>
    </property>
    <property name="destination" value="#{jobParameters[source1jmsdestination] != null ? jobParameters[source1jmsdestination] : &quot;sourceTopic&quot;}"/><property name="durableSubscriberName" value="sourceTopicDS"/><property name="destinationType" value="Topic"/>
    <property name="ackMode" value="#{T(javax.jms.Session).CLIENT_ACKNOWLEDGE}"/>
    <property name="maxMessageCount" value="2000"/>
</bean>

编辑 2

下面是我正在使用的核心阅读器逻辑,

读者

    public Object read() throws Exception, UnexpectedInputException,
                ParseException, NonTransientResourceException {
            Object item = null;
            try {
                if(ackMode != 0 && ackMode >= 1  && ackMode <= 3){
                    adapter.getResource().setSessionAcknowledgeMode(ackMode);
                }

                if(maxMessageCount > 0){
                    ThreadContextUtils.addToExecutionContext("maxMessageCount", maxMessageCount);
  if(ThreadContextUtils.getExecutionContext().containsKey("readMessageCount")) {
                        readMessageCount = ThreadContextUtils.getExecutionContext().getInt("readMessageCount");
                    }
                }
                if (TOPIC_KEY.equalsIgnoreCase(destinationType)
                        && durableSubscriberName != null) {
                    item = (Object) adapter.invoke(REC_DS_AND_CONVERT_SELECTED,
                            OBJECT_CLASS, destination, durableSubscriberName,
                            receiveTimeout, filter == null ? "" : filter);
                } else {
                    item = (Object) adapter.invoke(REC_AND_CONVERT_SELECTED,
                            OBJECT_CLASS, destination,
                            receiveTimeout <= 0 ? adapter.getResource()
                                    .getReceiveTimeout() : receiveTimeout,
                            (filter == null ? "" : filter));
                }   
                if(maxMessageCount > 0){
                 if( item !=null){
                      readMessageCount++;
                    ThreadContextUtils.addToExecutionContext("readMessageCount", readMessageCount);
                 }
                }
                return item;
            } finally {

            }
        }
4

0 回答 0