问题标签 [worker-pool]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
307 浏览

perl - 使用 N 个并发异步 HTTP 客户端下载总共 M 个文件,其中 M 很大,N 是可配置的

我正在尝试编写一个脚本,该脚本将N通过 HTTP 同时下载大多数文件。

我以前使用AnyEvent::Worker::Pool来管理 BLOCKING 任务池。我还结合使用AnyEvent::HTTPAnyEvent->condvar来单独管理非阻塞下载。

我认为结合这两种方法应该非常简单,以便AnyEvent->condvar使 AnyEvent::HTTP::http_get 从AnyEvent::Worker::Pool的角度来看是 BLOCKING 。

但是,我遇到了一些我不明白的错误,大概是由于AnyEvent::Worker的实现细节。这是演示该问题的脚本的真正精简版本:

演示输出如下:

为什么选择AnyEvent::HTTP

在我的真实脚本中,我使用了更多的功能AnyEvent::HTTP;特别是,我将on_body回调与Term::StatusBar为脚本的最终用户显示进度条相结合;此外,我在on_body回调中策略性地“暂停”,以使传输速率等于或小于最终用户预定义的速率。

请随时提出具有这些功能的替代方案(或破解它们的简单方法!)

为什么选择AnyEvent::Worker::Pool

我已经很熟悉了。欢迎提供替代建议。

为什么是电动汽车

它很快。同样,欢迎提出替代建议。

0 投票
3 回答
13580 浏览

python - Python中带有工作池的异步多处理:超时后如何继续?

我想使用一个进程池运行多个作业并应用给定的超时,之后应该杀死一个作业并由另一个正在处理下一个任务的作业替换。

我尝试使用multiprocessing提供异步运行工人池的方法的模块(例如使用map_async),但在那里我只能设置一个“全局”超时,之后所有进程都将被杀死。

是否有可能有一个单独的超时,之后只有一个耗时太长的进程被杀死,而是再次将一个新的工作人员添加到池中(处理下一个任务并跳过超时的那个)?

这是一个简单的例子来说明我的问题:

超时后,所有工作人员都被杀死,程序退出。相反,我希望它继续下一个子任务。我必须自己实施这种行为还是有现有的解决方案?

更新

可以杀死悬挂的工人,他们会被自动替换。所以我想出了这个代码:

现在的问题是循环永远不会退出;即使在处理完所有任务后,调用也会get产生超时异常。

0 投票
1 回答
950 浏览

java - Java wordcount:一个平庸的实现

我用 Java 实现了一个 wordcount 程序。基本上,该程序需要一个大文件(在我的测试中,我使用了一个仅包含数字的 10 gb 数据文件),并计算每个“单词”出现的次数 - 在这种情况下,可能会出现一个数字(例如 23723 243 次)。

下面是我的实现。我寻求改进它,主要考虑性能,但也考虑其他一些事情,我正在寻找一些指导。以下是我希望纠正的几个问题:

  1. 目前,该程序是线程化的并且可以正常工作。但是,我所做的是将一块内存传递(500MB/NUM_THREADS)给每个线程,然后每个线程继续进行字数计数。这里的问题是我让主线程等待所有线程完成,然后再将更多数据传递给每个线程。这不是什么大问题,但是有一段时间,几个线程会等待一段时间,什么也不做。我相信某种工作池或执行器服务可以解决这个问题(我还没有学习过这个的语法)。

  2. 该程序仅适用于包含整数的文件。那是个问题。我为此苦苦挣扎,因为我不知道如何在不创建大量未使用变量的情况下迭代数据(使用 String 甚至 StringBuilder 的性能都很糟糕)。目前,我使用我知道输入是整数的事实,并且只是将临时变量存储为int,所以那里没有内存问题。我希望能够使用某种分隔符,无论该分隔符是空格还是几个字符。

  3. 我正在使用全局 ConcurrentHashMap 来记录键值对。例如,如果一个线程找到一个数字“24624”,它会在地图中搜索该数字。如果存在,它将将该键的值增加一。末尾键的值表示该键的出现次数。那么这是正确的设计吗?我会通过给每个线程自己的哈希图,然后在最后合并它们来提高性能吗?

  4. 有没有其他方法可以在不使用 RandomAccessMemory 类的情况下通过偏移量查找文件?这个类只会读入一个字节数组,然后我必须对其进行转换。我没有为这种转换计时,但也许使用其他东西可能会更快。

我也对其他可能性持开放态度,这就是我想到的。

注意:拆分文件不是我想探索的选项,因为我可能将其部署在我不应该创建自己的文件的服务器上,但如果它真的会提高性能,我可能会听。

其他注意事项:我是 Java 线程的新手,也是 StackOverflow 的新手。要温柔。

0 投票
1 回答
471 浏览

python - 使用 WorkerPool 通过 URL 列表进行多线程处理

我正在尝试使用多线程来遍历 url 的 txt 文件并抓取在每个 url 中找到的内容。这适用于大约 20 个 URL(数量不一致),但随后始终卡在文件中的最后一个 URL 上。它似乎没有按顺序进行。

我不知道为什么会卡住或从哪里开始,所以非常感谢您的帮助。

这是 url 的示例列表:

0 投票
1 回答
4285 浏览

go - 工作池的最佳大小

我正在构建一个使用 goroutines 的“工作池”的 Go 应用程序,最初我启动池创建了一些工作程序。我想知道多核处理器中的最佳工作人员数量是多少,例如在具有 4 核的 CPU 中?我目前正在使用以下方法:

完整的实现如下

job.NewWorkerPool(maxWorkers) 和 module.Dispatcher.Run(jobQueue)

我使用工作池的用例:我有一个服务,它接受请求并调用多个外部 API 并将它们的结果聚合到一个响应中。每个调用都可以独立于其他调用,因为结果的顺序无关紧要。我将调用分派到工作池,每个调用都在一个可用的 goroutine 中以异步方式完成。一旦工作线程完成,我的“请求”线程会在获取和聚合结果的同时继续监听返回通道。完成所有操作后,最终聚合结果将作为响应返回。由于每个外部 API 调用可能会呈现可变响应时间,因此某些调用可以比其他调用更早完成。

0 投票
1 回答
107 浏览

go - 暂停工作池

使用看起来比这更接近的工作池的 golang 实现

我想暂停我的工作人员几秒钟,同时像事务处理一样进行数据库同步。我不希望我的同步数据被另一个潜在的不受控制的工作人员更新。

暂停工作的最佳方法是什么?

  • 关闭所有工人?
  • 如果被暂停,请在工人中使用 chan 以防止获得工作?
  • 在处理工作之前使用全局标志在每个工作人员中设置锁定?

谢谢

0 投票
1 回答
75 浏览

blocking - 如何在同一个verticle中使用不同的命名工作池?

我的服务中有一个 verticle,它接收 http 请求并使用 executeBlocking 与 MySQL db 对话。我正在使用命名工作池与数据库交互。现在,为了推送应用程序指标(使用阻塞的库),我想使用不同的命名工作池。因为我不希望数据库操作被指标中断,所以我想要一个单独的工作池。我可以使用事件总线并使用工作器顶点来推送指标,但由于这会产生转换到 JsonObject 的开销,我想从同一个顶点使用 executeBlocking 本身。

正如这里提到的https://groups.google.com/d/msg/vertx/eSf3AQagGGU/9m8RizIJeNQJ ,两种情况下使用的工作池是相同的。所以,制作一个新的工人 Verticle 真的会帮助我解耦用于数据库操作的线程和用于推送指标的线程。

任何人都可以帮助我做出更好的设计选择,或者如果我使用相同的 Verticle,我该如何使用不同的工作池?

0 投票
0 回答
292 浏览

node.js - 内存不足异常 - 未处理的 RangeError

我创建了一个 NodeJS 服务,它从 ActiveMQ 队列中读取消息、远程设置、获取这些消息、对其进行处理并将一些数据推送到 GUI。

我面临的问题是在尝试在 NodeJS 端处理这些消息时,当它们以大约的速度快速进入 ActiveMQ 队列时。每秒 5 条消息 (JSON),每条 JSON 消息的大小约为 18kb。传入的消息被写入文件,保存到中间 MSSQL 表中,一旦保存,JSON 文件就会移动到 Processed 文件夹。

环境设置是:

  • NodeJS 版本 8.9.4。
  • ActiveMQ 版本 5.15.4。
  • Java 版本 1.8.0_171。
  • 在主 Node 进程旁边创建了三个工作进程来处理消息负载。

工作人员是使用节点模块“Workerpool”创建的。 https://github.com/josdejong/workerpool

在处理来自 ActiveMQ 队列的大约 3000 条消息后,我一直遇到一个Unhandled rejection Range Error: Out of Memory Exception问题,消息从外部源以 200 毫秒/消息的频率推送到队列。

编码:

什么可能导致此问题?

如果需要任何其他信息,请告诉我。

0 投票
1 回答
604 浏览

go - 工作池实现以限制每秒的 http 请求

我是 Go lang 的新手,正在尝试实现受限的 GCP API 调用。即限制我的应用程序每秒进行的 API 调用次数,因此 Google 不会阻止我。

我一直在关注此处示例的工作池模式。

与链接示例类似的设计将是:

  • 使两个通道(作业、结果)的容量与要进行的 API 调用总数相同
  • 创建一个一定大小的工人池(比如 10 个工人)
  • 每个工作人员在进行 API 调用时通过作业通道,并将响应存储到结果通道中,等待时间为 1 秒

我的问题是:

  • 我是否理解正确,10 名工作人员每人有 1 秒的等待期意味着我的应用每秒大约进行 10 次 API 调用?
  • 这 10 个工作人员如何相互通信,以免他们踩到对方的脚趾,即两个工作人员不查询相同的 GCS 路径。

当然还有一个终极问题:使用工作池是不是有点矫枉过正?

谢谢!

0 投票
2 回答
238 浏览

node.js - 在 NodeJS 工作线程中执行网络 I/O 的问题

我有一个脚本,可以从服务器下载数千个文件,对这些文件执行一些 CPU 密集型计算,然后将结果上传到某个地方。作为一个附加级别的复杂性,我想限制与我正在下载文件的服务器的并发连接数。

为了让 CPU 密集型计算脱离事件线程,我利用了 josdejong 的 workerpool。我还想我可以利用这样一个事实,即在任何给定时间只会启动有限数量的线程来限制与我的服务器的并发连接数,所以我尝试将网络 I/O 放在工作进程中,例如所以(打字稿):

当我编译并运行此代码时,我看到消息“正在下载 test.txt”,但之后我看不到以下日志语句 ( console.log(csv))

我已经尝试对此代码进行各种修改,包括删除responseType、删除await和仅检查PromiseAxios 返回的、使函数非异步等。不管它似乎总是Axios.request在线崩溃

工作线程是否无法打开 HTTP 连接或其他什么?还是我只是犯了一个愚蠢的错误?