我需要类似 PubSub 的东西,但不是向所有订阅者广播,而是将消息仅发送给 1 个订阅者(最好根据接收缓冲区中的消息数量自动选择订阅者,越低越好)。
我正在尝试使用受控数量的分布式工作人员发送数十万个 http 请求。
我需要类似 PubSub 的东西,但不是向所有订阅者广播,而是将消息仅发送给 1 个订阅者(最好根据接收缓冲区中的消息数量自动选择订阅者,越低越好)。
我正在尝试使用受控数量的分布式工作人员发送数十万个 http 请求。
为了解决这个问题,我尝试的第一件事是让工作人员拉取请求而不是让他们推送给他们。
所以我有一个全局注册Agent
的,它包含要使用 API 执行的 http 请求列表,用于添加和检索请求。然后,我将在此阶段worker(Task, ...)
使用Supervisor
andone_for_one
而不是添加 poolboy 来启动 N 个工作人员。每个工作人员都会请求Agent
一个 http 请求来进行任何必要的工作,然后正常终止,由主管重新启动并请求一个新的 url。
工作人员从代理中的列表中拉出 http 任务,而不是将它们推送给它们,这将确保如果有工作要做,可用的工作人员总是很忙。
如果解决方案看起来不错,我会考虑添加 poolboy。您需要谨慎使用主管选项,这样一堆导致您的工作人员崩溃的错误网址不会触发主管取消其他所有内容。
As stated in my comment, my approach would be to use Poolboy to handle workers, but it is not possible to just request N workers (N being the number of requested URLs) because this will exceed the process limit quickly and cause the checkout requests to time out. Instead, you need a loop that checks whether workers are available and if so, requests the url asynchronously. If no workers are free, it should sleep for a while and then retry.
For this purpose, Poolboy has the :poolboy.checkout/2
function, the second parameter allows us to specify whether it should block or not. If no workers are available it will return :full
, otherwise you will get back a worker pid.
Example:
def crawl_parallel(urls) do
urls
|> Enum.map(&crawl_task/1)
|> Enum.map(&Task.await/1)
end
defp crawl_task(url) do
case :poolboy.checkout Crawler, false do
:full ->
# No free workers, wait a bit and retry
:timer.sleep 100
crawl_task url
worker_pid ->
# We have a worker, asynchronously crawl the url
Task.async fn ->
Crawler.Worker.crawl worker_pid, url
:poolboy.checkin Crawler, worker_pid
end
end
end