1

我想得到一堆 XML 并解析它们。它们有点大。所以我想我可以在未来像这样获取并解析它们:(我目前使用赛璐珞)

country_xml = {}
country_pool = GetAndParseXML.pool size: 4, args: [@connection]
countries.each do |country|
   country_xml[country] = country_pool.future.fetch_xml country
end
countries.each do |country|
xml = country_xml[country]
# Do stuff with the XML!
end

如果它在实际需要之前不占用大量内存,那会很好。理想情况下,我希望它可以缓冲 3 个 XML 文件停止并等待至少 1 个被处理然后继续。我该怎么做?

4

2 回答 2

0

第一个问题是:占用内存的是什么?我将假设它是 prased XML 文档,因为这对我来说似乎最有可能。

我认为最简单的方法是创建一个将获取和处理 XML 的参与者。如果您随后创建一个包含 3 个这些参与者的池,您将最多同时处理 3 个请求。

含糊其辞(假设您没有使用赛璐珞注册表):

class DoStuffWithCountryXml
    include Celluloid
    exclusive :do_stuff_with_country

    def initialize(fetcher)
        @fetcher = fetcher
    end

    def do_stuff_with_country(country)
        country_xml = fetcher.fetch_xml country
        # Do stuff with country_xml
    end

end

country_pool = GetAndParseXML.pool size: 4, args: [@connection]
country_process_pool = DoStuffWithCountryXml.pool size: 3, args: [country_pool]

countries_futures = countries.map { |c| country_process_pool.future.do_stuff_with_country(c) }

countries_stuff = countries_futures.map { |f| f.value }

请注意,如果这是唯一使用 GetAndParseXML 的地方,则池大小可能与 DoStuffWithXmlActor 相同。

于 2015-04-10T15:41:26.610 回答
0

我根本不会使用 a Pool。你没有从中受益。很多人似乎觉得一起使用 aFuture和 aPool是个好主意,但通常比使用其中一个更糟糕。

在您的情况下,使用Future... 但您也将从即将推出的Multiplexer功能中受益。在那之前,这样做......使用与尝试或建议的完全不同的策略:

class HandleXML
    include Celluloid
    def initialize(fetcher)
        @fetcher = fetcher
    end
    def get_xml(country)
        @fetcher.fetch_xml(country)
    end
    def process_xml(country, xml)
        #de Do whatever you need to do with the data.
    end
 end

 def begin_processor(handler, countries, index)
     data = handler.future.get_xml(countries[index])
     index += 1
     data
 end

 limiter = 3 #de This sets your desired limit.
 country_index = 0
 data_index = 0
 data = {}
 processing = []
 handler = HandleXML.new(@connection)

 #de Load up your initial futures.
 limiter.times {
     processing << begin_processor(handler, countries, country_index)
 }

 while data_index < countries.length
     data[countries[data_index]] = processor.shift.value
     handler.process_xml(countries[data_index],data[countries[data_index]])
     #de Once you've taken out one XML set above, load up another.
     if country_index < countries.length
         processing << begin_processor(handler, countries, country_index)
     end
 end

上面只是一个如何做的例子,一次Future处理3。我没有运行它,它可能有错误,但这个想法是为你演示的。

代码加载多3组 Country XML,然后开始处理该 XML。一旦它处理了一组 XML,它就会加载另一组,直到处理完所有国家的 XML。

于 2015-06-16T13:18:50.737 回答