我们需要将事件发送到 kinesis,并且由于 aws 定价,我们计划将记录分批放入 kinesis。
我们读入一个 csv 文件,然后使用文件拆分器吐出行并将每一行转换为 json。
那么在转换为 json 之后,我们如何将这些行批处理为每批 25 行,以便我们的 kinesis serviceActivator 可以发送批处理?
任何示例将不胜感激。
<int-file:splitter id="fileLineSplitter"
input-channel="fileInputChannel"
output-channel="splitterOutputChannel"
markers="true" />
<int:transformer id="csvToDataCdrTransformer"
ref="dataCdrLineTransformer"
method="transform"
input-channel="lineOutputChannel"
output-channel="dataCdrObjectInputChannel">
</int:transformer>
<int:object-to-json-transformer input-channel="dataCdrObjectInputChannel"
output-channel="kinesisSendChannel">
<int:poller fixed-delay="50"/>
</int:object-to-json-transformer>
编辑:我按照“Artem Bilan”的建议添加了它并且有效
<int:aggregator input-channel="aggregateChannel"
output-channel="toJsonChannel"
release-strategy-expression="#this.size() eq 2"
expire-groups-upon-completion="true"/>
但我得到错误:
我正在使用markers="true",以便我们知道它是文件的结尾,因此我们可以将其重命名为“.done”。
在拆分器和转换器之间添加了一个路由器,当 FileMarker 为 END 时,它仅路由到“nullChannel”或“fileProcessedChannel”,否则,拆分线进入 default-output-channel="lineOutputChannel"
<int:router ref="fileMarkerCustomRouter" inputchannel="splitterOutputChannel" default-output-channel="lineOutputChannel"/>
路由器代码看起来像这样
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Collection<MessageChannel> targetChannels = new ArrayList<MessageChannel>();
if (isPayloadTypeFileMarker(message)) {
FileSplitter.FileMarker payload = (FileSplitter.FileMarker) message.getPayload();
if (isStartOfFile(payload)) {
targetChannels.add(nullChannel);
} else if (isEndOfFile(payload)) {
targetChannels.add(fileProcessedChannel);
}
}
return targetChannels;
}
但我收到此错误:
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
有任何想法吗?