1

我们需要将事件发送到 kinesis,并且由于 aws 定价,我们计划将记录分批放入 kinesis。

我们读入一个 csv 文件,然后使用文件拆分器吐出行并将每一行转换为 json。

那么在转换为 json 之后,我们如何将这些行批处理为每批 25 行,以便我们的 kinesis serviceActivator 可以发送批处理?

任何示例将不胜感激。

    <int-file:splitter id="fileLineSplitter"
                       input-channel="fileInputChannel"
                       output-channel="splitterOutputChannel"
                       markers="true" />


<int:transformer id="csvToDataCdrTransformer"
                     ref="dataCdrLineTransformer"
                     method="transform"
                     input-channel="lineOutputChannel"
                     output-channel="dataCdrObjectInputChannel">
    </int:transformer>


    <int:object-to-json-transformer input-channel="dataCdrObjectInputChannel"
                                    output-channel="kinesisSendChannel">
        <int:poller fixed-delay="50"/>
    </int:object-to-json-transformer>

编辑:我按照“Artem Bilan”的建议添加了它并且有效

<int:aggregator input-channel="aggregateChannel"
                output-channel="toJsonChannel"
                release-strategy-expression="#this.size() eq 2"
                expire-groups-upon-completion="true"/>

但我得到错误:

  1. 我正在使用markers="true",以便我们知道它是文件的结尾,因此我们可以将其重命名为“.done”。

  2. 在拆分器和转换器之间添加了一个路由器,当 FileMarker 为 END 时,它仅路由到“nullChannel”或“fileProcessedChannel”,否则,拆分线进入 default-output-channel="lineOutputChannel"

    <int:router ref="fileMarkerCustomRouter" inputchannel="splitterOutputChannel" default-output-channel="lineOutputChannel"/>
    

路由器代码看起来像这样

 @Override
    protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
        Collection<MessageChannel> targetChannels = new ArrayList<MessageChannel>();

        if (isPayloadTypeFileMarker(message)) {

            FileSplitter.FileMarker payload = (FileSplitter.FileMarker) message.getPayload();

            if (isStartOfFile(payload)) {

                targetChannels.add(nullChannel);

            } else if (isEndOfFile(payload)) {

                targetChannels.add(fileProcessedChannel);
            }
        }
        return targetChannels;
    }

但我收到此错误:

Caused by: java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?

有任何想法吗?

4

1 回答 1

1

为此,您绝对需要一个<aggregator>with therelease-strategy-expression="25"并在发布后expire-groups-upon-completion="true"让它为同一组创建一个新组correlationKey

不,确定你为什么需要markers="true",但没有它<int-file:splitter>填充适当的相关标头。因此,您甚至可以考虑在之后仅依赖默认拆分和默认聚合。

此外,您应该考虑将来自聚合器的结果转换为 JSON。它发出一个List<?>. 将整个列表序列化为 JSON 非常有效。另外,在发送到 Kinesis 之前,您可能需要再进行一次转换。

因此,您的配置原型应该是这样的:

<int-file:splitter id="fileLineSplitter"
                   input-channel="fileInputChannel"
                   output-channel="splitterOutputChannel"/>

<int:transformer id="csvToDataCdrTransformer"
                 ref="dataCdrLineTransformer"
                 method="transform"
                 input-channel="lineOutputChannel"
                 output-channel="aggregateChannel">
</int:transformer>

<int:aggregator input-channel="aggregateChannel" 
                output-channel="toJsonChannel"
                expire-groups-upon-completion="true" />

<int:object-to-json-transformer input-channel="toJsonChannel"
                                output-channel="kinesisSendChannel"/>

这样整个文件将被视为一个批处理。您拆分它,处理每一行,将它们聚合回列表,然后在发送到 Kinesis 之前转换为 JSON。

从这里我想请你提出一个 JIRA 来添加ObjectToJsonTransformer.ResultType.BYTES模式,以便更好地使用基于byte[]like的下游组件KinesisMessageHandler

于 2018-04-19T13:19:45.790 回答