要点#1:
您将遇到的所有操作系统都使用抢占式调度在进程之间切换。这应该保证每个进程在任何远程现代硬件上每秒至少恢复几次(只要进程正在使用 cpu,而不是等待像文件或套接字 io 这样的中断)。基本上,在 2 核 cpu 上运行甚至数百个进程都不是问题。如果总负载过多,一切都会运行得更慢,但不会完全停止。
要点#2:
多线程?您可能会发现压缩/存储更受 IO 限制,因此在这里线程可能会很好。由于子线程可以完全访问父线程的内存空间,因此您甚至可能会从进程之间传输数据(取决于您当前的操作方式)中减少开销中获益。
要点#3:
shell 脚本只是另一个进程,所以回答 #1 并没有太大的不同。但是,请对此进行测试,因为 python bzip 可能比 shell bzip 慢得多(取决于您如何提供数据,以及它试图将数据放在哪里)......
要点#4:
对于 SO 来说绝对不是一个合适的问题
我的建议:
分析您的代码...使摄取过程尽可能高效,并在进程之间发送尽可能少的数据。仅从套接字读取数据并将其发送以进行处理的进程应该占用最少的 cpu。默认multiprocessing.Queue
值不是非常有效,因为它腌制数据,通过管道发送,然后在另一端取消腌制。如果您的数据可以分块为固定大小的块,请考虑使用几个multiprocessing.shared_memory.SharedMemory
缓冲区进行交换。对数据流进行分块还应该更容易并行化数据消费阶段,以更好地利用您拥有的任何 cpu 资源。
编辑:通过共享内存发送数据块的伪代码示例
import multiprocessing as mp
from contextlib import contextmanager
from collections import namedtuple
from ctypes import c_int8
import socket
import time
STALE_DATA = 0 #data waiting to be overwritten
NEW_DATA = 1 #data waiting to be processed
def producer_func(buffers):
shm_objects = {}
for buffer in buffers:
shm_objects[buffer.shm_name] = mp.shared_memory.SharedMemory(name=buffer.shm_name, create=False)
#buffer.state.value = 0 #value was initialized as stale at creation (data waiting to be overwritten)
with socket.create_connection(...) as s: #however you're reading data
while True: #for each chunk of data
while True: #until we get an open buffer
for buffer in buffers: #check each buffer object
#if buffer isn't being processed right now, and data has already been processed
if buffer.lock.acquire(False):
if buffer.state.value==STALE_DATA:
shm = shm_objects[buffer.shm_name]
break #break out of two loops
else:
buffer.lock.release()
else:
continue
break
s.recv_into(shm.buf) #put the data in the buffer
buffer.state.value = NEW_DATA #flag the data as new
buffer.lock.release() #release the buffer to be processed
#when you receive some sort of shutdown signal:
for shm in shm_objects:
shm.close()
def consumer_func(buffers):
shm_objects = {}
for buffer in buffers:
shm_objects[buffer.shm_name] = mp.shared_memory.SharedMemory(name=buffer.shm_name, create=False)
#buffer.state.value = 0 #value was initialized as stale at creation (data waiting to be overwritten)
while True: #for each chunk of data
while True: #until we get a buffer of data waiting to be processed
for buffer in buffers:
#if buffer isn't being processed right now, and data hasn't already been processed
if buffer.lock.acquire(False):
if buffer.state.value==NEW_DATA:
shm = shm_objects[buffer.shm_name]
break #break out of two loops
else:
buffer.lock.release()
else:
continue
break
process_the_data(shm.buf) #do your data reduction here
buffer.state.value = STALE_DATA
buffer.lock.release()
#when you receive some sort of shutdown signal:
for shm in shm_objects:
shm.close()
Buffer = namedtuple("Buffer", ['shm_name', 'lock', 'state'])
if __name__ == "__main__":
n_buffers = 4 # 4 buffers to swap between
#each buffer should be bigger than you will ever expect a message to be.
#using larger chunks is better for overhead (don't be processing chunks of less than a couple Kib at a time)
shm_objects = [mp.shared_memory.SharedMemory(create=True, size=2**20) for _ in range(n_buffers)] # 1MiB buffers
buffers = [Buffer(shm.name, mp.Lock(), mp.Value(c_int8, 0)) for shm in shm_objects] #building args for our processes
producer = mp.Process(target=producer_func, args=(buffers, ))
consumer = mp.Process(target=consumer_func, args=(buffers, ))
consumer.start()
producer.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
#signal child processes to close somehow
#cleanup
producer.join()
consumer.join()
for shm in shm_objects:
shm.close()
shm.unlink()