3

我正在尝试在其中实现一个 RSS/Atom 提要聚合器,spring-integration并且我主要使用 Java DSL 来编写我的IntegrationFlow. 此聚合器的要求是可以在运行时添加/删除提要。也就是说,提要在设计时是未知的。

我发现使用Feed.inboundAdapter()带有测试 url 的 basic 并使用转换器从提要中提取链接然后将其传递给 anoutbound-file-adapter以将链接保存到文件很简单。inbound-file-adapter但是,当我尝试从运行文件中读取(数千个)提要 url 时,我遇到了非常困难FileSplitter,然后将每个Message<String>包含提要 url 的结果传递给然后注册一个新的Feed.inboundAdapter(). 这对 Java DSL 来说是不可能的吗?

理想情况下,如果我能做到以下几点,我会喜欢它:

@Bean
public IntegrationFlow getFeedsFromFile() throws MalformedURLException {
    return IntegrationFlows.from(inboundFileChannel(), e -> e.poller(Pollers.fixedDelay(10000)))
            .handle(new FileSplitter())
            //register new Feed.inboundAdapter(payload.toString()) foreach Message<String> containing feed url coming from FileSplitter
            .transform(extractLinkFromFeedEntry())
            .handle(appendLinkToFile())
            .get();
} 

尽管在多次阅读了 spring 集成 java DSL 代码(并在此过程中学习了很多东西)之后,我只是看不出有可能以这种方式做到这一点。所以... A) 是吗?B)应该是吗?C) 建议?

几乎感觉我应该能够获取输出.handle(new FileSplitter())并将其传递给.handleWithAdapter(Feed.inboundAdapter(/*stuff here*/))但 DSL 仅引用outbound-adapters 那里。入站适配器实际上只是一个子类,您可以指定其中一个的唯一AbstractMessageSource位置似乎是作为方法的参数。IntegrationFlows.from(/*stuff here*/)

我原以为可以从文件中获取输入,逐行拆分,使用该输出来注册入站提要适配器,轮询这些提要,从提要中提取新链接,并将它们附加到文件中. 看起来好像不是。

我可以做一些聪明的子类化来完成这项工作吗?

失败了...我怀疑这将是答案,我找到了spring integration Dynamic Ftp Channel Resolver Examplethis answer on how to adapt it dynamic register stuff for the inbound case...

这是要走的路吗?任何帮助/指导表示赞赏。在倾注了 DSL 代码并阅读了几天的文档之后,我想我将尝试实现动态 ftp 示例并将其调整为与 FeedEntryMessageSource 一起使用……在这种情况下,我的问题是……动态 ftp 示例有效使用 XML 配置,但是否可以使用 Java 配置或 Java DSL 来实现?

更新

我已经实现了如下解决方案:

@SpringBootApplication 
class MonsterFeedApplication {

public static void main(String[] args) throws IOException {
    ConfigurableApplicationContext parent = SpringApplication.run(MonsterFeedApplication.class, args);

    parent.setId("parent");
    String[] feedUrls = {
            "https://1nichi.wordpress.com/feed/",
            "http://jcmuofficialblog.com/feed/"};

    List<ConfigurableApplicationContext> children = new ArrayList<>();
    int n = 0;
    for(String feedUrl : feedUrls) {
        AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
        child.setId("child" + ++n);
        children.add(child);
        child.setParent(parent);
        child.register(DynamicFeedAdapter.class);
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        props.setProperty("feed.url", feedUrl);
        PropertiesPropertySource pps = new PropertiesPropertySource("feed", props);
        env.getPropertySources().addLast(pps);
        child.setEnvironment(env);
        child.refresh();
    }

    System.out.println("Press any key to exit...");
    System.in.read();
    for (ConfigurableApplicationContext child : children) {
        child.close();
    }
    parent.close();
}

@Bean
public IntegrationFlow aggregateFeeds() {       
    return IntegrationFlows.from("feedChannel")
            .transform(extractLinkFromFeed())
            .handle(System.out::println)
            .get();
}

@Bean
public MessageChannel feedChannel() {
    return new DirectChannel();
}

@Bean
public AbstractPayloadTransformer<SyndEntry, String> extractLinkFromFeed() {
    return new AbstractPayloadTransformer<SyndEntry, String>() {
        @Override
        protected String transformPayload(SyndEntry payload) throws Exception {
            return payload.getLink();
        }
    };

}

}

DynamicFeedAdapter.java

@Configuration
@EnableIntegration
public class DynamicFeedAdapter {

    @Value("${feed.url}")
    public String feedUrl;

    @Bean
    public static PropertySourcesPlaceholderConfigurer pspc() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    @Bean
    public IntegrationFlow feedAdapter() throws MalformedURLException {

        URL url = new URL(feedUrl);

        return IntegrationFlows
                .from(s -> s.feed(url, "feedTest"), 
                        e -> e.poller(p -> p.fixedDelay(10000)))
                .channel("feedChannel")
                .get();
    }

}

如果并且仅我将其中一个url 定义为application.propertiesas时,此方法才有效feed.url=[insert url here]。否则它无法告诉我“无法解析属性 {feed.url}”。我怀疑发生的事情是所有中@Bean定义的 sDynamicFeedAdapter.java都急切地初始化了单例,所以除了在 main 方法的 for 循环中手动创建的 bean (因为它们注入了 feed.url 属性而工作正常)之外,我们还有一个已经急切初始化的流浪单例,如果在 application.properties 中没有定义 feed.url,那么它无法解析该属性,一切都会发生。现在根据我对 Spring 的了解,我知道应该可以@LazyDynamicFeedAdapter.java所以我们不会遇到这个不受欢迎的流浪单身问题孩子。现在的问题是......如果我只是标记feedAdapter() @Lazy然后bean永远不会被初始化。我如何自己初始化它们?

更新 - 问题已解决

在没有对其进行测试的情况下,我认为问题在于引导在其组件扫描期间找到了 DynamicFeedAdapter。一个简单的解决方案是将其移动到同级包中。如果 MonsterFeedApplication 在 com.acme.foo 中,则将适配器配置类放在 com.acme.bar 中。这样,引导不会将其视为应用程序的“一部分”

这确实是问题所在。实施 Gary 的建议后,一切正常。

4

1 回答 1

1

有关入站邮件适配器的类似问题,请参阅此问题的答案及其后续内容。

本质上,每个馈送适配器都是在参数化的子上下文中创建的。

在这种情况下,子上下文是在一个main()方法中创建的,但没有理由不能在由.handle().

于 2016-03-28T12:48:03.363 回答