0

我需要实现可以处理抖动的音频播放器。所以我需要缓冲,因此我需要最小的缓冲区大小并知道当时缓冲区中有多少元素。

但是在 python Queue qsize() 方法中没有实现。我能做些什么呢?

class MultiprocessedAudioPlayer(object):

    def __init__(self, sampling_frequency, min_buffer_size=1, max_buffer_size=10, sample_width=2):
        self.p = PyAudio()
        self.stream = self.p.open(format=self.p.get_format_from_width(width=sample_width), rate=sampling_frequency,
                                  output=True, channels=1)
        self.max_buffer_size = max_buffer_size
        self.min_buffer_size = min_buffer_size
        self.buffer = Queue(maxsize=max_buffer_size)
        self.process = Process(target=self.playing)
        self.process.start()
        self.condition = Condition()

    def schedule_to_play(self, frame):
        self.condition.acquire()
        if self.buffer.full():
            print('Buffer is overflown')
            self.condition.wait()
        self.buffer.put(frame)
        if self.buffer.qsize() > self.min_buffer_size:
            print('Buffer length is', len(self.buffer))
            self.condition.notify()
            print('It is sufficient to play')
        self.condition.release()
        # print('frame appended buffer length is {} now'.format(self.buffer.qsize()))

    def play(self, frame):
        print('started playing frame at {}'.format(datetime.now()))
        self.stream.write(frame, num_frames=len(frame))
        print('stopped playing frame at {}'.format(datetime.now()))

    def close(self):
        self.stream.stop_stream()
        self.stream.close()

    def playing(self):
        while True:
            self.condition.acquire()
            if self.buffer.qsize() < self.min_buffer_size:
                self.condition.wait()
            frame = self.buffer.popleft()
            print('popping frame from buffer')
            print('Buffer length is {} now'.format(len(self.buffer)))
            self.condition.notify()
            self.condition.release()
            self.play(frame)
4

1 回答 1

0

两个建议:

  1. 使用threading——qsize()方法可靠。multiprocessing(由于来回发送消息的延迟,它不可靠。)

  2. multiprocessingManager保存共享状态的实例一起使用。每个进程都可以设置和获取数据,Manager 处理来回发送更新。

以下示例每秒将数据添加到列表中,并且不时地由第二个进程扫描数据。还要注意广泛的日志记录,这对多进程程序非常有用。

#!/usr/bin/env python

'''
mptest_proxy.py -- producer adds to fixed-sized list; scanner uses them

OPTIONS:
-v      verbose multiprocessing output
'''

import logging, multiprocessing, sys, time


def producer(objlist):
    '''
    add an item to list every sec; ensure fixed size list
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            return
        msg = 'ding: {:04d}'.format(int(time.time()) % 10000)
        logger.info('put: %s', msg)
        del objlist[0]
        objlist.append( msg )


def scanner(objlist):
    '''
    every now and then, run calculation on objlist
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(5)
        except KeyboardInterrupt:
            return
        logger.info('items: %s', list(objlist))


def main():
    opt_verbose = '-v' in sys.argv[1:] 
    logger = multiprocessing.log_to_stderr(
            level=logging.DEBUG if opt_verbose else logging.INFO,
    )
    logger.info('setup')

    # create fixed-length list, shared between producer & consumer
    manager = multiprocessing.Manager()
    my_objlist = manager.list( # pylint: disable=E1101
        [None] * 10
    )

    multiprocessing.Process(
        target=producer,
        args=(my_objlist,),
        name='producer',
    ).start()

    multiprocessing.Process(
        target=scanner,
        args=(my_objlist,),
        name='scanner',
        ).start()

    logger.info('running forever')
    try:
        manager.join() # wait until both workers die
    except KeyboardInterrupt:
        pass
    logger.info('done')


if __name__=='__main__':
    main()
于 2014-06-03T17:06:02.943 回答