2

我有一个流,可以观察目录中多文件的输出,处理数据并将其放入 HDFS。这是我的流创建命令:

stream create --name fileHdfs --definition "file --dir=/var/log/supervisor/ --pattern=tracker.out-*.log --outputType=text/plain | logHdfsTransformer | hdfs --fsUri=hdfs://192.168.1.115:8020 --directory=/data/log/appsync --fileName=log --partitionPath=path(dateFormat('yyyy/MM/dd'))" --deploy

问题是源:文件模块将从文件读取的所有数据发送到日志处理模块,而不是每转一行,因为这样,有效负载字符串有数百万个字符,我无法处理它。前任:

--- PAYLOAD LENGTH---- 9511284

请告诉我在使用 source:file 模块时如何逐行阅读,谢谢!!!

4

4 回答 4

3

它目前不受支持,但source使用 Spring Integration编写自定义inbound-channel-adapter来调用一次读取一行的 POJO 会很容易。

请打开一个新功能 JIRA 问题

您也可以在 XD 中使用 ajob而不是 a 。stream

于 2014-07-31T16:02:35.707 回答
3

我知道这可能为时已晚,但对于任何正在寻找解决方案的谷歌用户来说:

即使没有自动执行此操作的模块或选项,它也很简单,只需添加一个拆分器即可将传入消息分成多个传出消息。

请注意,您必须在使用 \n 和 \r\n 之间做出决定。检查您的文件以查看它们正在使用什么。

例子:

stream create --name filetest --definition "file --outputType=text/plain --dir=/tmp/examplefiles/| splitter --expression=payload.split('\\n') | log" --deploy

干杯!

于 2015-03-03T19:33:38.217 回答
1

Spring Integration 有一个 FileSplitter,可以将文本文件拆分为行。您可以使用它来创建自定义处理器模块,我们称之为文件拆分:

  1. 创建包含以下内容的 file-split.xml 文件:

    <?xml version="1.0" encoding="UTF-8"?>
    beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans        
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration 
       http://www.springframework.org/schema/integration/spring-integration.xsd">
    
    <int:splitter input-channel="input" output-channel="output">
            <bean class="org.springframework.integration.file.splitter.FileSplitter">
                    <constructor-arg value="false"/>
            </bean>
    </int:splitter>
    
    <int:channel id="output"/> 
    </beans>
    
  2. 将文件复制到 ${XD_HOME}/xd/modules/processor/file-split/config/ (如果需要,创建路径)

  3. 示例用法:

    stream create --name splitFile --definition "file --dir=/data --ref=true | file-split | log" --deploy
    

如果需要,您可以进一步自定义模块以采用更多选项。

于 2015-05-20T00:25:11.930 回答
0

您可以在部署流时尝试使用--mode=lines选项。请查看以下文档参考:http ://docs.spring.io/spring-xd/docs/current/reference/html/#file

希望这可以帮助!

干杯,普拉蒂克

于 2016-06-07T14:45:05.273 回答