3

我不是以英语为母语的人,但我尽量清楚地表达我的问题。我遇到了这个困扰我两天的问题,但我仍然找不到解决方案。

我已经构建了一个流,它将在 Hadoop YARN 中的 Spring Can 数据流中运行。

流由 Http 源、处理器和文件接收器组成。

1.Http Source
HTTP Source 组件有两个输出通道绑定两个不同的目的地,即在 application.properties 中定义的 dest1 和 dest2。

spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2

以下是 HTTP 源代码片段供您参考。

@Autowired
    private EssSource channels; //EssSource is the interface for multiple output channels

##output channel 1:
    @RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        logger.info("enter ... handleRequest1...");
        channels.output().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

##output channel 2:
    @RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        logger.info("enter ... handleRequest2...");
        channels.output2().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

2.处理器
处理器有两个多输入通道和两个输出通道绑定不同的目的地。目标绑定在处理器组件项目的 application.properties 中定义。

//input channel binding  
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2

//output channel binding  
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink

下面是处理器的代码片段。

@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
    public Object transform(Message<?> message) {
        logger.info("enter ...transform...");

        return "processed by transform1";;
    }


    @Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
    public Object transform2(Message<?> message) {
        logger.info("enter ... transform2...");
        return "processed by transform2";
    }

3.文件接收器组件。

我使用 Spring 的官方 fil sink 组件。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT

我只是在其 applicaiton.properties 文件中添加目标绑定。spring.cloud.stream.bindings.input.destination=fileSink

4.发现:

我期望的数据流应该是这样的:

Source.handleRequest() -->Processor.handleRequest()

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

应该只将字符串“由 transform2 处理”保存到文件中。

但是经过我的测试,数据流实际上是这样的:

Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

“由transform1处理”和“由transform2处理”字符串都保存到文件中。

5.问题:

虽然 Processor.handleRequest() 中输出通道的目的地绑定到 hdfsSink 而不是 fileSink,但数据仍然流向文件 Sink。我无法理解这一点,这不是我想要的。我只希望来自 Processor.handleRequest2() 的数据流到文件接收器而不是两者。如果我做得不对,谁能告诉我该怎么做以及解决方案是什么?这让我困惑了2天。

感谢您的热心帮助。

亚历克斯

4

1 回答 1

2

您的流定义是这样的吗(“-2”版本是具有多个通道的版本)?

http-source-2 | processor-2 | file-sink

请注意,Spring Cloud Data Flow 将覆盖其中定义的目标,applications.properties这就是为什么,即使spring.cloud.stream.bindings.output.destination处理器设置为hdfs-sink,它实际上也会匹配 的输入file-sink

此处解释了从流定义配置目标的方式(在水龙头的上下文中):http ://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow -stream-tap-dsl

您可以做的是简单地交换通道 1 和 2 的含义 - 将侧通道用于 hdfs。虽然这有点脆弱 - 因为 Stream 的input/output通道将自动配置,而其他通道将通过配置application.properties- 在这种情况下,通过流定义或在部署时配置侧通道目标可能会更好 - 请参阅http ://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties

在我看来,这些也可能是 2 个使用常规组件监听不同端点的流——假设数据应该并排流动。

于 2016-09-06T19:19:34.070 回答