3

我想使手刹自动化一点,并用python编写了一个小程序。现在我的子进程和线程模块有问题。我想动态更改我运行的手刹进程的数量。我实现了队列模块,用于获取和放置电影。

CompressThread调用handbrake类中的encode方法,然后encode调用_execute。现在我想将我在手刹类中读取的进度集中存储在压缩机类中。所以我可以将进度发布到 asocketserver和 a webgui。不,我写入sqlite3数据库,但这应该被删除(因为线程问题),并且仅在程序退出时保存。

我能想到的将数据集中保存的唯一方法是创建另一个线程,并在CompressThread类中轮询数据。我的问题是我的程序有 4 个线程。

有更好的解决方案吗?也许数据库没有错,我不应该删除它?

压缩机类:

class CompressThread(threading.Thread):
    """ Manage the queue of movies to be compressed
    """

    def __init__(self):
        threading.Thread.__init__(self)
        self._config = ConfigParser()
        self._config.process_config()
        self._handbrake = self._config.get_handbrake()
        self._lock = threading.Lock()

    def run(self):
        while True:
            movie_id = QUEUE.get()
            return_code = self._handbrake.encode(movie_id)
            print(return_code)
            QUEUE.task_done()


class Compressor(object):
    """ Compresses given mkv file

    Attributes:


    """

    __MAX_THREADS = 1

    def __init__(self):
        self._dest_audio_tracks = None
        self._log = None
        self.settings = None
        self.config = ConfigParser()
        self._database = db.DB()
        self._database.connect()
        self._running = True
        self._threads = []
        try:
            self.handbrake, self._log = self.config.process_config()
            self._log = logging.getLogger("Compressor")
        except ConfigError as error:
            raise Error("Config error: {0}".format(error))

    def process_file(self, input_file, output_file, title):
        if not os.path.exists(input_file):
            self._log.warning("Input file not exists: {0}".format(input_file))
            print("Input file not found: {0}".format(input_file))
        else:
            media_info = mediainfo.Mediainfo.parse(input_file)
            movie_settings = settings.Settings(input_file, title, output_file)
            movie_settings.parse(media_info)
            self._log.info("Added file {0} to list".format(movie_settings.input_file))
            QUEUE.put(self._database.insert_movie(movie_settings))

            print("File added.")

    def start(self):
        self._threads = [CompressThread() for i in range(self.__MAX_THREADS)]
        for thread in self._threads:
            thread.setDaemon(True)
            thread.start()
        while self._running:
            cmd = input("mCompress> ")
            if cmd == "quit":
                self._running = False
            elif cmd == "status":
                print("{0}".format(self._threads))
            elif cmd == "newfile":
                input_file = input("mCompress> newFile> Input filename> ")
                output_file = input("mCompress> newFile> Output filename> ")
                title = input("mCompress> newFile> Title> ")
                self.process_file(input_file, output_file, title)

    def _initialize_logging(self, log_file):
        try:
            self._log_file = open(log_file, "a+")
        except IOError as error:
            log_error = "Could not open log file {0}".format(error)
            self._log.error(log_error)
            raise IOError(log_error)
        self._log_file.seek(0)

if __name__ == "__main__":
    options_parser = OptionsParser()
    args = options_parser.parser.parse_args()
    if args.start:
        Compressor().start()

一块手刹类:

def _execute(self, options):
    command = ["{0}".format(self._location)]
    if self._validate_options(options):
        for option in options:
            command.extend(option.generate_command())
        print(" ".join(command))
        state = 1
        returncode = None
        process = None
        temp_file = tempfile.TemporaryFile()
        try:
            process = subprocess.Popen(command, stdout=temp_file, stderr=temp_file, shell=False)
            temp_file.seek(0)
            while True:
                returncode = process.poll()
                if not returncode:
                    for line in temp_file.readlines():
                        p = re.search("Encoding:.*([0-9]{1,2}\.[0-9]{1,2}) % \(([0-9]{1,2}\.[0-9]{1,2}) fps, avg "
                                      "([0-9]{1,2}\.[0-9]{1,2}) fps, ETA ([0-9]{1,2}h[0-9]{1,2}m[0-9]{1,2})",
                                      line.decode("utf-8"))
                        if p is not None:
                            self._database.update_progress(p.group(1), p.group(2), p.group(3), p.group(4))
                else:
                    break
            temp_file.seek(0)
            print(temp_file.readline())
            self._write_log(temp_file.readlines())
            if returncode == 0:
                state = 5
            else:
                state = 100
                raise ExecuteError("HandBrakeCLI stopped with an exit code not null: {0}".format(returncode))
        except OSError as error:
            state = 105
            raise ExecuteError("CLI command failed: {0}".format(error))
        except KeyboardInterrupt:
            state = 101
        finally:
            try:
                process.kill()
            except:
                pass
            temp_file.close()
            return state
    else:
        raise ExecuteError("No option given")
4

1 回答 1

2

只做你打算做的事。

如果这意味着您有 5 个线程而不是 4 个线程,那又如何?

您的所有线程都不受 CPU 限制。也就是说,它们不是在处理数字或解析字符串或做其他计算工作,它们只是在等待 I/O、外部进程或另一个线程。因此,创建更多不受 CPU 限制的线程并没有什么坏处,除非您疯狂到操作系统无法再顺利处理它们的地步。这是数百个。

如果您的任何线程CPU 限制,那么即使是 2 也太多了。在 CPython 中,* 线程必须获得全局解释器锁才能执行任何工作,** 因此它们最终不会并行运行,并且花费更多时间争夺 GIL 而不是工作。但即便如此,添加另一个非 CPU 绑定线程,该线程将所有时间都花在等待 CPU 绑定线程正在填充的队列上,不会使事情变得比现在更糟。***


至于分贝……</p>

SQLite3 本身,只要您有足够新的版本,就可以使用多线程。但是 Pythonsqlite3模块不是,因为它向后兼容非常旧版本的 SQLite3 引擎。有关详细信息,请参阅文档中的多线程。如果我没记错(该站点似乎暂时关闭,所以我无法检查),如果需要,您可以构建具有线程支持的第三方模块pysqlite(stdlib 模块所基于的)。

但是,如果您没有大量使用数据库,则运行一个线程与数据库通信,并使用一个队列来监听其他线程,这是一个非常合理的设计。


* 和 PyPy,但不一定在其他实现中。

** 扩展模块可以释放 GIL 以在 C 中工作,只要它们不触及 Python 中可见的任何值。NumPy 等一些著名的模块利用了这一点。

*** 等待线程本身可能会受到 CPU 绑定线程的阻碍,尤其是在 Python 3.1 及更早版本中,但它不会干扰它们。

于 2013-11-13T18:51:08.210 回答