0

我正在开发一个 Python 守护进程,负责将多媒体文件转换为 .mp4 格式。这个想法是让守护程序运行,并且每当用户需要时,我将所需的视频添加到队列中,并且一个线程最终从队列中获取视频并通过子进程调用 Handbrake 以进行转换。为简单起见,我目前只使用一个线程。

这是我的代码。首先,守护进程(server.py,改编自 Kris Johnson 的帖子):

#!/usr/bin/env python
# -*- coding: utf-8 -*- 

import os, sys
import os.path
import logging.config
import SocketServer
import optparse
import resource
import socket, tempfile
import time
from threadedqueue import QueueServer

version = '0.1'
SERVER_PORT=6666
SERVER_SOCKET='server_socket'

SERVER_TYPE=SocketServer.UnixStreamServer
ServerBase = SERVER_TYPE

if ServerBase == SocketServer.UnixStreamServer:
    server_address = os.path.join(tempfile.gettempdir(), SERVER_SOCKET)

SERVER_LOG=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'convertCentral.log')
logging.basicConfig(format='[%(asctime)s.%(msecs).03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=SERVER_LOG, level=logging.INFO)


class RequestHandler(SocketServer.StreamRequestHandler):

    """Request handler

    An instance of this class is created for each connection made
    by a client.  The Server class invokes the instance's
    setup(), handle(), and finish() methods.

    The template implementation here simply reads a single line from
    the client, breaks that up into whitespace-delimited words, and
    then uses the first word as the name of a "command."  If there is
    a method called "do_COMMAND", where COMMAND matches the
    commmand name, then that method is invoked.  Otherwise, an error
    message is returned to the client.

    """

    def handle(self):
        """Service a newly connected client.

        The socket can be accessed as 'self.connection'.  'self.rfile'
        can be used to read from the socket using a file interface,
        and 'self.wfile' can be used to write to the socket using a
        file interface.

        When this method returns, the connection will be closed.
        """

        # Read a single request from the input stream and process it.
        request = self.rfile.readline()
        if request:
            self.server.log('request %s: %s',
                            self.connection.getpeername(), request.rstrip())
            try:
                self.process_request(request)
            except Exception, e:
                self.server.log('exception: %s' % str(e))
                self.wfile.write('Error: %s\n' % str(e))
        else:
            self.server.log('error: unable to read request')
            self.wfile.write('Error: unable to read request')


    def process_request(self, request):
        """Process a request.

        This method is called by self.handle() for each request it
        reads from the input stream.

        This implementation simply breaks the request string into
        words, and searches for a method named 'do_COMMAND',
        where COMMAND is the first word.  If found, that method is
        invoked and remaining words are passed as arguments.
        Otherwise, an error is returned to the client.
        """

        words = request.split()
        if len(words) == 0:
            self.server.log('error: empty request')
            self.wfile.write('Error: empty request\n')
            return

        command = words[0]
        args = words[1:]

        methodname = 'do_' + command
        if not hasattr(self, methodname):
            self.server.log('error: invalid command')
            self.wfile.write('Error: "%s" is not a valid command\n' % command)
            return
        method = getattr(self, methodname)
        method(*args)


    def do_stop(self, *args):
        self.wfile.write('Stopping server\n')
        self.server.stop()


    """Process an 'echo' command"""
    def do_echo(self, *args):
        self.wfile.write(' '.join(args) + '\n')


    """Process a 'convert' command"""
    def do_convert(self, video):
        self.wfile.write('Converting %s\n' % (video))

        try:
            self.server.addVideo(video)
        except Exception as e:
            logging.info("ERROR: %s" % e)


class Server(ServerBase):

    def __init__(self, server_address):
        self.__daemonize()
        self.pool = QueueServer()

        if ServerBase == SocketServer.UnixStreamServer:

            # Delete the socket file if it already exists
            if os.access(server_address, 0):
                os.remove(server_address)

        ServerBase.__init__(self, server_address, RequestHandler)


    def addVideo(self, video):
        self.pool.add(video)

    def log(self, format, *args):
        try:
            message = format % args
            logging.info("%s" % message)
        except Exception, e:
            print str(e)


    def serve_until_stopped(self):
        self.log('started')

        self.__stopped = False

        while not self.__stopped:
            self.handle_request()

        self.log('stopped')


    def stop(self):
        self.__stopped = True


    def __daemonize(self):
        UMASK = 0
        WORKDIR = '/'
        MAXFD = 1024
        if hasattr(os, 'devnull'):
            REDIRECT_TO = os.devnull
        else:
            REDIRECT_TO = '/dev/null'

        try :
            if os.fork() != 0:
                os._exit(0)

            os.setsid()

            if os.fork() != 0:
                os._exit(0)

            os.chdir(WORKDIR)
            os.umask(UMASK)
        except OSError, e:
            self.log('exception: %s %s', e.strerror, e.errno)
            raise Exception, "%s [%d]" % (e.strerror, e.errno)
        except Exception, e:
            self.log('exception: %s', str(e))

        maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
        if maxfd == resource.RLIM_INFINITY:
            maxfd = MAXFD
        for fd in range(0, maxfd):
            try:
                os.close(fd)
            except OSError:
                pass

        os.open(REDIRECT_TO, os.O_RDWR)
        os.dup2(0, 1)
        os.dup2(0, 2)


""" Run a server as a daemon """
def run_server(options, args):

    print("convertCentral running on %s" % server_address)
    svr = Server(server_address)
    svr.serve_until_stopped()
    svr.server_close()


"""Send request to the server and process response."""
def do_request(options, args):  
    if ServerBase == SocketServer.UnixStreamServer:
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

    # Send request
    s.connect(server_address)
    s.sendall(' '.join(args) + '\n')

    # Print response
    sfile = s.makefile('rb')
    line = sfile.readline()
    while line:
        print line,
        line = sfile.readline()


#######################################################################
#######################################################################

if __name__ == '__main__':

    optparser = optparse.OptionParser(usage=usage,
                                      version=version)
    (options, args) = optparser.parse_args()

    if len(args) == 0:
        optparser.print_help()
        sys.exit(-1)

    if args[0] == 'start':
        run_server(options, args[1:])
    else:
        do_request(options, args)

然后,队列(threadedqueue.py - 对不起这个名字,感觉不是特别有创意):

#! /usr/bin/python
# -*- coding: utf-8 -*- 
import os, time, threading, psutil, resource, logging, subprocess as sp, sys
from Queue import Queue
from threading import Thread

SERVER_LOG=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'convertCentral.log')
logging.basicConfig(format='[%(asctime)s.%(msecs).03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=SERVER_LOG, level=logging.INFO)

class QueueServer(object):

    current_video_queue = Queue(maxsize=0)
    N_WORKER_THREADS = 1
    counter = 0


    def __init__(self):
        logging.info("[QueueServer] Initializing the video conversion queue")
        for i in range(self.N_WORKER_THREADS):
            logging.info("Firing thread")
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    ''' Converts the video using Handbrake via subprocess'''
    def convertVideo(self, video):
        logging.info("Now converting %s" % video)
        fileName, fileExtension = os.path.splitext(video)
        payload = "nice -n 15 HandBrakeCLI -i %s -e x264 -q 15 -o %s.mp4" % (video, fileName)

        pr = sp.Popen(payload, shell=True, stdout=open('/dev/null', 'w'), stderr=sp.STDOUT)
        logging.info('Started handbrake')
        pr.wait()
        logging.info("EXIT CODE: %s " % pr.returncode)

        self.counter = self.counter + 1
        logging.info("Conversion's done. %d" % self.counter)



    ''' A worker thread '''
    def worker(self):
        while True:
            logging.info("Getting one")
            item = self.current_video_queue.get()
            logging.info("Firing conversion: %s" % item)
            self.convertVideo(item)
            self.current_video_queue.task_done()
            logging.info("All done")



    ''' Adds a video to the video conversion queue '''
    def add(self, video):
        logging.info("* Adding %s  to the queue" % video)
        self.current_video_queue.put(video)
        logging.info("* Added %s  to the queue" % video)
        time.sleep(3)

这是交易:如果我自己运行线程队列,它会很好用。但是,如果我使用 server.py 运行它,则转换永远不会发生,因为 Handbrake 崩溃了。

以下是日志:

Hal@ubuntu:~/Desktop/convertCentral$ python server.py start
convertCentral running on /tmp/server_socket

Hal@ubuntu:~/Desktop/convertCentral$ python server.py convert UNKNOWN_PARAMETER_VALUE.WMV 

[2014-04-17 18:05:44.793] request : convert UNKNOWN_PARAMETER_VALUE.WMV
Converting UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.793] * Adding UNKNOWN_PARAMETER_VALUE.WMV  to the queue
[2014-04-17 18:05:44.793] * Added UNKNOWN_PARAMETER_VALUE.WMV  to the queue
[2014-04-17 18:05:44.793] Firing conversion: UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.794] Now converting UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.796] Started handbrake
[2014-04-17 18:05:45.046] Exit code: 0 
[2014-04-17 18:05:45.046] Conversion's done. 1
[2014-04-17 18:05:45.046] All done
[2014-04-17 18:05:45.047] Getting one

我将子进程的输出记录到文件中。这是我得到的:

[18:05:44] hb_init: starting libhb thread
HandBrake 0.9.9 (2013051800) - Linux x86_64 - http://handbrake.fr
4 CPUs detected
Opening UNKNOWN_PARAMETER_VALUE.WMV...
[18:05:44] hb_scan: path=UNKNOWN_PARAMETER_VALUE.WMV, title_index=1
libbluray/bdnav/index_parse.c:162: indx_parse(): error opening UNKNOWN_PARAMETER_VALUE.WMV/BDMV/index.bdmv
libbluray/bdnav/index_parse.c:162: indx_parse(): error opening UNKNOWN_PARAMETER_VALUE.WMV/BDMV/BACKUP/index.bdmv
libbluray/bluray.c:1725: nav_get_title_list(UNKNOWN_PARAMETER_VALUE.WMV) failed (0x7fd44c000900)
[18:05:44] bd: not a bd - trying as a stream/file instead
libdvdnav: Using dvdnav version 4.1.3
libdvdread: Encrypted DVD support unavailable.
libdvdread: Can't stat UNKNOWN_PARAMETER_VALUE.WMV
No such file or directory
libdvdnav: vm: failed to open/read the DVD
[18:05:44] dvd: not a dvd - trying as a stream/file instead
[18:05:44] hb_stream_open: open UNKNOWN_PARAMETER_VALUE.WMV failed
[18:05:44] scan: unrecognized file type
[18:05:44] libhb: scan thread found 0 valid title(s)
No title found.
HandBrake has exited.

因此,我们可以证明该脚本确实可以启动 Handbrake,但日志表明 Handbrake 无法识别文件格式并当场死亡。同样,如果我自己运行 threadedqueue.py 脚本,这不会发生。

我猜 Handbrake 没有以某种方式加载它的库。这是代码不起作用的原因吗?我怎样才能让手刹工作?

4

1 回答 1

3

(该死的浏览器丢失了我之前的答案,希望这不会成为一个重复)

这两个错误Can't stat UNKNOWN_PARAMETER_VALUE.WMVopen UNKNOWN_PARAMETER_VALUE.WMV failed建议的文件未找到错误,但看起来文件名正在传递到手刹命令行。

因此,为了确保使用正确的文件:在 python 代码中,将用户提供给 server.py 的任何文件名转换为绝对路径名(请参阅参考资料os.path.*)。在任何地方使用这些绝对路径名(尤其是在手刹命令/Popen 中)

于 2014-04-21T22:16:47.800 回答