0

我不久前开始使用多处理,它正在处理基本示例。之后我尝试实现某种多声音输入程序,并尝试通过队列将输入通量引导到某个处理模块,但目前严重失败。我将用 3 点来描述我的问题:文件夹结构、流程结构、我尝试过的内容。

文件夹结构

  • 根文件夹
    • 应用
      • 启动应用程序.py
      • input_cfg.ini
      • 核心.py
      • gui.py
      • audio_recorder.py(使用 sounddevice.InputStream)
      • x_recorder.py

进程结构 当我开始运行我的应用程序时,会调用 gui 并在我按下开始按钮后创建进程。

  • 主要流程
  • audio_recorder_1 进程
  • audio_recorder_进程
  • 申请流程

我试过的

核心.py

from multiprocessing import Queue, Process
central_queue = Queue()
...
d = {}
d['output'] = central_queue
o = AudioRecorder('name', **d)

启动应用程序.py

import core
def handle_queue_data():
    while True:
        print(str(core.central_queue.get()))
if __name__ == "__main__":
    Process(target=handle_queue_data, name="syncOutput").start()

audio_recorder.py

class AudioRecorder(object):
    def __init__(self, name, **d):
        ...
        self.output_queue = d['output']
    def run(self):
        queue = Queue()
        def callback(indata, frames, time, status):
            if status:
                print(status, flush=True)
            # Push the got data into the queue
            queue.put([indata.copy()])
        with sd.InputStream(samplerate=self.sample_rate, device=self.device_id, channels=self.channel_id, callback=callback):
            while True:
                self.output_queue.put(queue.get())

它不起作用。调试后,似乎从core.py记录器开始后,队列的引用发生了变化......仅供参考调试信息:

# in the audio_recorder.py object
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x00000000086B3320>
 _buffer = {deque} deque([[array([[-0.01989746, -0.02053833],\n       [-0.01828003, -0.0196228 ],\n       [-0.00634766, -0.00686646],\n       ..., \n       [-0.01119995, -0.01144409],\n       [-0.00900269, -0.00982666],\n       [-0.00823975, -0.00888062]], dtype=float32)]])
 _close = {Finalize} <Finalize object, callback=_finalize_close, args=[deque([[array([[-0.01989746, -0.02053833],\n       [-0.01828003, -0.0196228 ],\n       [-0.00634766, -0.00686646],\n       ..., \n       [-0.01119995, -0.01144409],\n       [-0.00900269, -0.00982666],\n       [-0
 _closed = {bool} False
 _ignore_epipe = {bool} False
 _joincancelled = {bool} False
 _jointhread = {Finalize} <Finalize object, callback=_finalize_join, args=[<weakref at 0x00000000083A2638; to 'Thread' at 0x0000000004DF1B00>], exitprority=-5>
 _maxsize = {int} 2147483647
 _notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x0000000004738198>, 0)>
 _opid = {int} 1320
 _reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B34A8>
 _rlock = {Lock} <Lock(owner=None)>
 _sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483645, maxvalue=2147483647)>
 _thread = {Thread} <Thread(QueueFeederThread, started daemon 9344)>
 _wlock = {NoneType} None
 _writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000086B3518>

# in the handle_queue_data
centralized_queue = {Queue} <multiprocessing.queues.Queue object at 0x000000000479DA20>
 _buffer = {deque} deque([])
 _close = {NoneType} None
 _closed = {bool} False
 _ignore_epipe = {bool} False
 _joincancelled = {bool} False
 _jointhread = {NoneType} None
 _maxsize = {int} 2147483647
 _notempty = {Condition} <Condition(<unlocked _thread.lock object at 0x00000000058C8350>, 0)>
 _opid = {int} 7208
 _reader = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x000000000684C438>
 _rlock = {Lock} <Lock(owner=None)>
 _sem = {BoundedSemaphore} <BoundedSemaphore(value=2147483647, maxvalue=2147483647)>
 _thread = {NoneType} None
 _wlock = {NoneType} None
 _writer = {PipeConnection} <multiprocessing.connection.PipeConnection object at 0x00000000058DE6A0>

之后我也尝试使用不同的东西,都失败了,我无法传递数据......队列是否有可能在这里是可变对象?或者多处理中存在错误(非常不可能)或者与 sounddevice 的组合使队列不稳定?

对不起,我的描述很长...

我提前感谢您的帮助!
最好的问候,
塞巴斯蒂安

4

1 回答 1

1

我并没有真正的经验multiprocessing,但据我了解,模块命名空间中的所有对象start_application.py对于每个进程都是重复的。如果我没记错的话,这包括core模块。因此,core.central_queue每个进程都有一个单独的实例。至少在 Windows 上似乎是这种情况,Python 文档建议无论如何都要“将资源显式传递给子进程”。

您应该使用该if __name__ == '__main__':块来创建 的唯一实例Queue和 的唯一实例AudioRecorderargs然后,您可以使用参数Process(如上面的链接所示)将这些唯一实例传递给您的进程。

除此之外,我真的不知道你想要实现什么。你想用一个随机的可用进程来处理音频输入的随机块吗?还是您想为每个进程提供完整的相同音频输入?

在后一种情况下,毕竟每个子进程都应该有一个单独的队列!sd.InputStream应该仍然是唯一的。在您的with声明中,您应该遍历所有子进程并将当前的音频块分别放入每个进程队列中。

PS:我刚刚意识到,出于某种原因,您可能只想启动一个附加进程。在这种情况下,你应该考虑放弃整个混乱,只做你需要在声明multiprocessing中做的任何事情。with

更新:

如果您想一次使用多个音频设备(以及多个 PortAudio 流),您仍然不一定需要multiprocessing. 您可以拥有一个with包含多个上下文管理器的语句并在其中进行处理。

根据您想要实现的目标,您可能有一个队列,所有音频回调都写入其中,或者每个回调一个队列。

如果您有充分的理由使用multiprocessing,如果您在主进程中启动所有音频流并在新的子进程中进行处理,它也应该可以正常工作。

于 2018-01-04T08:33:36.270 回答