5

我需要类似 PubSub 的东西,但不是向所有订阅者广播,而是将消息仅发送给 1 个订阅者(最好根据接收缓冲区中的消息数量自动选择订阅者,越低越好)。

我正在尝试使用受控数量的分布式工作人员发送数十万个 http 请求。

4

2 回答 2

4

为了解决这个问题,我尝试的第一件事是让工作人员拉取请求而不是让他们推送给他们。

所以我有一个全局注册Agent的,它包含要使用 API 执行的 http 请求列表,用于添加和检索请求。然后,我将在此阶段worker(Task, ...)使用Supervisorandone_for_one而不是添加 poolboy 来启动 N 个工作人员。每个工作人员都会请求Agent一个 http 请求来进行任何必要的工作,然后正常终止,由主管重新启动并请求一个新的 url。

工作人员从代理中的列表中拉出 http 任务,而不是将它们推送给它们,这将确保如果有工作要做,可用的工作人员总是很忙。

如果解决方案看起来不错,我会考虑添加 poolboy。您需要谨慎使用主管选项,这样一堆导致您的工作人员崩溃的错误网址不会触发主管取消其他所有内容。

于 2015-01-06T12:06:15.693 回答
2

As stated in my comment, my approach would be to use Poolboy to handle workers, but it is not possible to just request N workers (N being the number of requested URLs) because this will exceed the process limit quickly and cause the checkout requests to time out. Instead, you need a loop that checks whether workers are available and if so, requests the url asynchronously. If no workers are free, it should sleep for a while and then retry.

For this purpose, Poolboy has the :poolboy.checkout/2 function, the second parameter allows us to specify whether it should block or not. If no workers are available it will return :full, otherwise you will get back a worker pid.

Example:

def crawl_parallel(urls) do
  urls
  |> Enum.map(&crawl_task/1)
  |> Enum.map(&Task.await/1)
end

defp crawl_task(url) do
  case :poolboy.checkout Crawler, false do
    :full ->
      # No free workers, wait a bit and retry
      :timer.sleep 100
      crawl_task url
    worker_pid -> 
      # We have a worker, asynchronously crawl the url
      Task.async fn ->
        Crawler.Worker.crawl worker_pid, url
        :poolboy.checkin Crawler, worker_pid
      end
  end
end
于 2015-01-06T13:04:26.080 回答