问题标签 [concurrent-processing]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - 如何将对象与可迭代对象一起传递给 executor.map?
这不起作用,因为 executor.map 函数没有将 tFile 对象传递给函数。我该如何解决这个问题?
python - 在另一个线程中运行在 python3 中的非阻塞套接字服务器
我想知道运行非阻塞 python3 套接字服务器的最佳方法。
我目前的代码是:
在我的主程序中,如何让这个套接字服务器在后台运行(比如在另一个线程中)而不阻塞主线程,以便在创建端点后我可以继续在主线程中做其他事情?例如,在我的主线程中,我希望能够调用createEndpoint(data)
然后还调用其他一些函数等。
python - 当每个脚本在 python 中有多个线程时,从主脚本同时运行两个脚本
我想同时从一个master script
. 这些脚本中的每一个都已经在其中包含并行运行的线程。例如我跑
脚本1.py
脚本2.py
我正在单独运行script1.py
它 script2.py
。唯一的区别是我传递给班级的参数。是否有可能有一个主脚本,如果我只运行它,这两个脚本都会运行?
谢谢
python - 使用 multiprocessing 或 ray 与其他 cpu-bound 任务同时写入文件
我有一个有 72 个内核的工作站(实际上是 36 个多线程 CPU,显示为 72 个内核multiprocessing.cpu_count()
)。
我尝试了multiprocessing
并发ray
处理,批量数百万个小文件,我想在该处理期间同时编写一些输出文件。
我对阻止.get()
与例如apply_async()
(in multiprocessing
) 和ray.get()
.
有了ray
,我有一个远程函数 ( process_group()
),它在循环中并行处理数据组。在下文中,使用该multiprocessing
模块的代码版本也作为注释给出。
因此,在每次循环迭代中,我必须收集process_group()
同时计算的 many 的输出,作为数据帧列表,df_list
以便连接成一个更大的very_big_df
数据帧。后者需要写入磁盘(通常大小为 ~1 到 ~3 GB)。编写一个这样的文件需要大约10-30 [s]
一段时间才能180 [s]
处理process_group
遥控器。有数千次循环迭代。所以这需要几天时间才能完成。
是否可以将文件以非阻塞方式写入磁盘,同时循环继续以节省大约 10% 的时间(这将节省大约一天的计算时间)?
到下一次循环迭代的并发进程完成时,有足够的时间来写入前一次迭代的输出。这里涉及的内核似乎都以接近 100% 的速度运行,因此Threading
可能也不推荐使用该模块。multiprocessing.apply_async()
更令人沮丧,因为它不想要我的不可选择的输出very_big_df
数据框,我必须与一些更复杂的东西共享,这可能会花费我试图节省的时间,我希望ray
能有效地处理类似的事情。
[更新] 为了简单起见,我没有提到所有进程之间有一个很大的共享变量(这就是为什么我称它为并行进程以及文件的并发写入)。结果我的标题问题被编辑了。所以实际上,在光线并行作业之前有这段代码:
不确定这是否使它更像是“并行”执行而不仅仅是并发操作。
[更新 2] 共享数组是一个查找表,即对于并行工作者而言是只读的。
[更新 3] 我尝试了两种建议的解决方案:Threading 和 Ray / compute() 对于后者,建议使用写函数作为远程,并在 for 循环中异步发送写操作,我最初认为这是唯一可能的通过 .get() 这将是阻塞。
因此,对于 Ray,这显示了两种解决方案:
对于 RAY 解决方案,这需要增加 object_store_memory,默认值是不够的:节点内存的 10% ~ 37 GB(我有 376 GB 的 ram),然后上限为 20 GB,唯一存储的对象总计约 22 GB:一个数据帧列表df_list
(大约 11 GB),以及它们在写入函数中连接的结果(大约 11 GB),假设在连接期间有一个副本。如果不是,那么这个内存问题没有意义,我想知道我是否可以传递 numpy 视图,我认为这是默认情况下发生的。这是 RAY 相当令人沮丧的方面,因为我无法真正预测每个内存将有多少df_list
,它可以从 1 倍到 3 倍不等......
最后,坚持multiprocessing
使用 Threading 是最有效的解决方案,因为处理部分(没有 I/O)更快:
在每次循环迭代中,通常len(many_data_lists) = 7000
每个列表都包含 7 个大小为 (3, 9092) 的 numpy 数组。因此,这 7000 个列表将发送给 60 个工作人员:
process_group
每次循环迭代的所有并行时间:
射线:250 [s]
多处理:233 [s]
I/O:将 5GB parquet 文件写入外部 USB 3 旋转磁盘大约需要 35 秒。内部旋转盘上大约 10 秒。
Ray:约 5 秒的开销用于创建write_to_parquet.remote()
阻塞循环的未来。这仍然是在旋转磁盘上写入所需时间的 50%。这并不理想。
多处理:测量的 0 秒开销。
总上墙时间:
雷:486 [s]
多处理:436 [s]
我重复了几次,Ray和Multiprocessing之间的差异始终显示Multiprocessing快了约 50 秒。这是一个显着的差异,也令人费解,因为Ray宣传了更高的效率。
我将运行它进行更长时间的迭代并报告稳定性(内存、垃圾收集的潜在问题......)
ruby-on-rails - 表单提交在连续快速提交时失去幂等性
我遇到了人们双击我的表单提交按钮并提交多次的问题,所以我只想允许一次提交。我的第一个想法是 javascript,但这不是绝对瞬时的,我想要 100% 保证工作的东西。
我的解决方案是使提交具有幂等性,通过给每个表单加载自己的哈希,并在每次提交时检查该哈希是否已经存在,如果存在,则不做任何事情。
所以这是我的代码的要点:
如果我在提交之间留出一些时间,这会起作用,但是当我双击提交按钮时,两条记录会输入到我的数据库中,具有相同的哈希值。
我在本地主机上使用一个简单的 Puma 3.7 服务器。我的印象是大多数服务器会收到一个请求,执行它,然后继续下一个请求,但这几乎就像是在进行某种类型的并行。
我的问题是:这怎么可能?每个后续记录的 ID 值都比前一个记录大一,因此服务器并不知道前一个记录。那么,如果请求发送得非常快,那么如何忽略唯一哈希要求呢?同样,如果我稍后再尝试使用相同的哈希,则不会发生任何事情,正如预期的那样。
encryption - 对太多文件运行命令
我想加密和解密大文件(想想 20m 行)的文本。我使用的加密服务最多只能加密 64kb。出于此问题的目的,假设我们坚持使用此服务。
我的解决方案是将大文件分成 64kb 的块,并行加密所有文件,然后将加密的部分放入tar.gz
. 每个部分都有编号,part-xxx
以确保我可以恢复原始文件。在解密时我解压缩,并行解密每个部分并按顺序连接结果。
有趣的部分:当我在足够大的文件上执行最后一部分时,会发生以下情况之一:
tmux 会话终止,我被注销。没有日志,什么都没有。
我明白了:
我尝试了几种基于 xargs 的解决方案,但没有成功。这是有趣的代码:
更有趣的是:当 find 和 rm 报告问题时,我可以转到包含所有部件的临时文件夹,自己运行完全相同的命令,一切正常。
万一这很重要,所有这些都发生在 RAM 挂载的文件系统中。但是 RAM 不可能是问题:我在一台 256GB RAM 的机器上,所涉及的文件占用 1-2GB 并且htop
从未显示超过 10% 的使用率。
python - 如何从 cron 作业开始并行运行相同的 python 脚本?
我有一个监视文件夹,其中文件被删除。通过 cronjob,我启动了一个 python 脚本,首先检查新文件。
如果是这样,它会创建一个 pid 文件并将该文件上传到云服务。
云中的处理完成后,我删除了 pid 文件,但脚本仍在运行并执行其他任务。那时,当有新文件可用时,脚本的新实例可以再次启动。但是,当脚本多次运行并且失败时,它似乎在某个地方迷失了方向。所以我正在寻找一种更可靠的方法来并行运行同一脚本的不同实例。
c - 如何在 C 语言中只等待一段时间的信号?
我正在尝试使用 C 编程语言中的信号在 2 个进程(linux 中的父进程和子进程)之间进行通信。
第一个过程进行一些计算并提供数据。然后,它向处于挂起状态的第二个进程发送信号,等待信号唤醒并使用共享内存收集第一个进程共享的数据。
如何让第二个进程等待一段时间或者说一段时间?
在此期间,如果第一个进程提供数据并向第二个进程发送信号,则一切正常。否则,如果它在这段时间内没有收到来自第一个的任何信号,它会做另一件事。
我怎样才能使第二个过程响应这种需求?
我应该使用哪些算法和信号来实现这一点?
c++ - 如何让不同的线程一起填充一个数组?
假设我有一些要并行运行的任务(蒙特卡洛模拟)。我想完成给定数量的任务,但任务需要不同的时间,所以不容易在线程上平均分配工作。另外:我最终需要单个向量(或数组)中的所有模拟结果。
所以我想出了以下方法:
我猜上面的代码会给出竞争条件。我正在寻找一种高效的方法来避免这种情况。要求:避免竞争条件(填满整个数组,没有跳过);最终结果立即在数组中;高性能。
阅读各种方法,似乎 atomic 是一个不错的选择,但我不确定在我的情况下哪些设置最有效?甚至不确定 atomic 是否会削减它;也许需要一个保护 LastAdded 的互斥锁?
python - pandas 数据帧的并行化操作
我有一个非常大的熊猫数据框,如下所示:
(有几列需要做一些计算)
因此,对于每个客户,我必须应用大量的操作(移位、求和、减法、条件等),因此(我认为)不可能对所有内容应用布尔掩码,我已经尝试过,所以我的问题是可以将 pandas 数据框分成如下块,例如:
并同时应用所有这些块的所有操作。