0

问题陈述

我目前正在构建一个包含三个任务的交换刮板,每个任务都独立运行process

  • #1:接收实时网络提要​​:非常快的数据进入,立即放入multiprocessing队列并继续。
  • #2:使用队列数据并优化:使用我编写的一些逻辑来使用和优化它。慢但不会太慢,当数据进入慢时最终赶上并清除队列。
  • #3:使用压缩提要bz2并上传到我的 s3 存储桶:每小时,我压缩优化的数据(以进一步减小文件大小),然后上传到我的 s3 存储桶。这在我的机器上大约需要 10-20 秒。

我遇到的问题是这些任务中的每一个都需要自己的并行process。生产者 (#1) 无法进行优化 (#2),否则它会停止提要连接,并且网站会因为线程 #1 没有响应而终止我的套接字。上传器(#3)不能和任务#2在同一个进程上运行,否则我会填满队列太多,我永远赶不上。我试过这个:不起作用。

这个刮刀在我的本地机器上工作得很好,每个任务都有自己的进程。但是当它部署在服务器上时,我真的不想在三核机器上花很多钱。我发现 Digital Ocean 的 4vCPU 选项最便宜,每月 40 美元。但我想知道是否有比购买 4 核更好的方法。

需要注意的一些事项:在我的 16 英寸 MBP 上,任务 #1 使用 99% 的 CPU,任务 #2 使用 20-30% 的 CPU,任务 #3 一直休眠到整点,所以它主要使用 0.5-1%中央处理器。

问题:

  • 如果我在 2 核机器上运行三个进程,这实际上与运行两个进程相同吗?我知道这取决于系统调度,但这是否意味着它将在压缩时停止,或者继续移动直到压缩结束?启动(并支付)一个每小时只使用一次的全新核心似乎真的很浪费。但是这个每小时的任务让整个队列停滞了太多,我不知道如何解决这个问题。

  • 无论如何我可以在同一个进程/核心上压缩文件时继续 Task#2吗?

  • 如果我运行一个 bash 脚本来进行压缩,那还会停止软件吗?我的电脑是 6 核的,所以我无法真正在本地测试服务器的约束

  • 有比 DigitalOcean 更便宜的替代品吗?老实说,我对 AWS 感到害怕,因为我听说过人们因意外使用而收到 1,000 美元账单的恐怖故事。我宁愿像 DigitalOcean 这样更可预测的东西

我试过的

如前所述,我尝试在同一进程中组合 Task#2 和 Task#3。一旦压缩开始,它就会停止。压缩是同步的,并使用此线程中的代码完成。找不到异步 bz2 压缩,但我不确定这是否有助于不停止 Task#2。


PS:我真的尽量避免带着这样的开放性问题来 StackOverflow,因为我知道这些会得到不好的反馈,但是当我对云不太了解时,另一种方法是尝试并投入大量时间和金钱老实说计算。我更喜欢一些专家的意见

4

1 回答 1

1

要点#1:

您将遇到的所有操作系统都使用抢占式调度在进程之间切换。这应该保证每个进程在任何远程现代硬件上每秒至少恢复几次(只要进程正在使用 cpu,而不是等待像文件或套接字 io 这样的中断)。基本上,在 2 核 cpu 上运行甚至数百个进程都不是问题。如果总负载过多,一切都会运行得更慢,但不会完全停止。

要点#2:

多线程?您可能会发现压缩/存储更受 IO 限制,因此在这里线程可能会很好。由于子线程可以完全访问父线程的内存空间,因此您甚至可能会从进程之间传输数据(取决于您当前的操作方式)中减少开销中获益。

要点#3:

shell 脚本只是另一个进程,所以回答 #1 并没有太大的不同。但是,请对此进行测试,因为 python bzip 可能比 shell bzip 慢得多(取决于您如何提供数据,以及它试图将数据放在哪里)......

要点#4:

对于 SO 来说绝对不是一个合适的问题

我的建议:

分析您的代码...使摄取过程尽可能高效,并在进程之间发送尽可能少的数据。仅从套接字读取数据并将其发送以进行处理的进程应该占用最少的 cpu。默认multiprocessing.Queue值不是非常有效,因为它腌制数据,通过管道发送,然后在另一端取消腌制。如果您的数据可以分块为固定大小的块,请考虑使用几个multiprocessing.shared_memory.SharedMemory缓冲区进行交换。对数据流进行分块还应该更容易并行化数据消费阶段,以更好地利用您拥有的任何 cpu 资源。

编辑:通过共享内存发送数据块的伪代码示例

import multiprocessing as mp
from contextlib import contextmanager
from collections import namedtuple
from ctypes import c_int8
import socket
import time

STALE_DATA = 0 #data waiting to be overwritten
NEW_DATA = 1 #data waiting to be processed

def producer_func(buffers):
    shm_objects = {}
    for buffer in buffers:
        shm_objects[buffer.shm_name] = mp.shared_memory.SharedMemory(name=buffer.shm_name, create=False)
        #buffer.state.value = 0 #value was initialized as stale at creation (data waiting to be overwritten)
    with socket.create_connection(...) as s: #however you're reading data
        while True: #for each chunk of data
            while True: #until we get an open buffer
                for buffer in buffers: #check each buffer object
                    #if buffer isn't being processed right now, and data has already been processed
                    if buffer.lock.acquire(False):
                        if buffer.state.value==STALE_DATA: 
                            shm = shm_objects[buffer.shm_name]
                            break #break out of two loops
                        else:
                            buffer.lock.release()
                else:
                    continue
                break
            
            s.recv_into(shm.buf) #put the data in the buffer
            buffer.state.value = NEW_DATA #flag the data as new
            buffer.lock.release() #release the buffer to be processed
    #when you receive some sort of shutdown signal:
    for shm in shm_objects:
        shm.close()

def consumer_func(buffers):
    shm_objects = {}
    for buffer in buffers:
        shm_objects[buffer.shm_name] = mp.shared_memory.SharedMemory(name=buffer.shm_name, create=False)
        #buffer.state.value = 0 #value was initialized as stale at creation (data waiting to be overwritten)
    while True: #for each chunk of data
        while True: #until we get a buffer of data waiting to be processed
            for buffer in buffers:
                #if buffer isn't being processed right now, and data hasn't already been processed
                if buffer.lock.acquire(False):
                    if buffer.state.value==NEW_DATA:
                        shm = shm_objects[buffer.shm_name]
                        break #break out of two loops
                    else:
                        buffer.lock.release()
            else:
                continue
            break
        process_the_data(shm.buf) #do your data reduction here
        buffer.state.value = STALE_DATA
        buffer.lock.release()
    #when you receive some sort of shutdown signal:
    for shm in shm_objects:
        shm.close()
        
Buffer = namedtuple("Buffer", ['shm_name', 'lock', 'state'])

if __name__ == "__main__":
    n_buffers = 4 # 4 buffers to swap between
    #each buffer should be bigger than you will ever expect a message to be.
    #using larger chunks is better for overhead (don't be processing chunks of less than a couple Kib at a time)
    shm_objects = [mp.shared_memory.SharedMemory(create=True, size=2**20) for _ in range(n_buffers)] # 1MiB buffers
    buffers = [Buffer(shm.name, mp.Lock(), mp.Value(c_int8, 0)) for shm in shm_objects] #building args for our processes
    producer = mp.Process(target=producer_func, args=(buffers, ))
    consumer = mp.Process(target=consumer_func, args=(buffers, ))
    consumer.start()
    producer.start()
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            break
    #signal child processes to close somehow
    #cleanup
    producer.join()
    consumer.join()
    for shm in shm_objects:
        shm.close()
        shm.unlink()
于 2021-02-04T05:39:29.440 回答