1

我们有一个要求,我们需要处理一个文件夹中的多个文件(每个文件有 50K-100K 记录),并在进行一些计算后将数据存储在数据库中。处理完文件内容后,我们需要触发第二个流进行进一步处理。

我们设计解决方案的方式是

File Reader -- File Splitter -- Processor -- Re-sequencer(每条记录的发布策略) -- Sink(在数据库中加载数据)

上述设计的问题来自 re-sequencer 和 Sink,我们无法确定文件的所有内容是否成功存储在数据库中,否则我们无法调用第二个流。

我们已经使用聚合器进行了一些基准测试,它似乎比重新排序器慢得多。聚合器是否存在任何已知的性能问题?如果我们有一个仅在所有数据都使用 re-sequencer 时才发布的发布策略,是否会出现严重的性能下降

谢谢,贾亚德普

处理器 1 - 拆分行

Processor 2 - Calculation on each message
<int:service-activator input-channel="input" ref="singlePointCalculator" method="onMessage"/>
<bean id="singlePointCalculator" class="com.processor.SinglePointCalculator">
                <constructor-arg index="0" ref="output"/>

</bean>

Sink

<int:resequencer input-channel='input' output-channel='reseqChannel' release-strategy-expression="size() == 1" ></int:resequencer>

<int:transformer ref="mapTransformer" input-channel="reseqChannel" output-channel="mapChannel"/>

<bean id="mapTransformer" class="com.sink.MapTransformer">
<property name="columns" value=" side,value" />
</bean>

<int-jdbc:outbound-channel-adapter
                data-source="dataSource"
                channel="mapChannel"
                query=" ">
</int-jdbc:outbound-channel-adapter>


Sink Using Aggregator

<int:aggregator input-channel='input' output-channel='aggChannel' " ></int:aggregator >
<int:transformer ref="mapTransformer" input-channel="aggChannel" output-channel="mapChannel"/>

<bean id="mapTransformer" class="com.sink.MapTransformer">
<property name="columns" value=" side,value" />
</bean>

<int-jdbc:outbound-channel-adapter
                data-source="dataSource"
                channel="mapChannel"
                query=" ">
</int-jdbc:outbound-channel-adapter>
4

0 回答 0