我正在开发一个 Python 守护进程,负责将多媒体文件转换为 .mp4 格式。这个想法是让守护程序运行,并且每当用户需要时,我将所需的视频添加到队列中,并且一个线程最终从队列中获取视频并通过子进程调用 Handbrake 以进行转换。为简单起见,我目前只使用一个线程。
这是我的代码。首先,守护进程(server.py,改编自 Kris Johnson 的帖子):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os, sys
import os.path
import logging.config
import SocketServer
import optparse
import resource
import socket, tempfile
import time
from threadedqueue import QueueServer
version = '0.1'
SERVER_PORT=6666
SERVER_SOCKET='server_socket'
SERVER_TYPE=SocketServer.UnixStreamServer
ServerBase = SERVER_TYPE
if ServerBase == SocketServer.UnixStreamServer:
server_address = os.path.join(tempfile.gettempdir(), SERVER_SOCKET)
SERVER_LOG=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'convertCentral.log')
logging.basicConfig(format='[%(asctime)s.%(msecs).03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=SERVER_LOG, level=logging.INFO)
class RequestHandler(SocketServer.StreamRequestHandler):
"""Request handler
An instance of this class is created for each connection made
by a client. The Server class invokes the instance's
setup(), handle(), and finish() methods.
The template implementation here simply reads a single line from
the client, breaks that up into whitespace-delimited words, and
then uses the first word as the name of a "command." If there is
a method called "do_COMMAND", where COMMAND matches the
commmand name, then that method is invoked. Otherwise, an error
message is returned to the client.
"""
def handle(self):
"""Service a newly connected client.
The socket can be accessed as 'self.connection'. 'self.rfile'
can be used to read from the socket using a file interface,
and 'self.wfile' can be used to write to the socket using a
file interface.
When this method returns, the connection will be closed.
"""
# Read a single request from the input stream and process it.
request = self.rfile.readline()
if request:
self.server.log('request %s: %s',
self.connection.getpeername(), request.rstrip())
try:
self.process_request(request)
except Exception, e:
self.server.log('exception: %s' % str(e))
self.wfile.write('Error: %s\n' % str(e))
else:
self.server.log('error: unable to read request')
self.wfile.write('Error: unable to read request')
def process_request(self, request):
"""Process a request.
This method is called by self.handle() for each request it
reads from the input stream.
This implementation simply breaks the request string into
words, and searches for a method named 'do_COMMAND',
where COMMAND is the first word. If found, that method is
invoked and remaining words are passed as arguments.
Otherwise, an error is returned to the client.
"""
words = request.split()
if len(words) == 0:
self.server.log('error: empty request')
self.wfile.write('Error: empty request\n')
return
command = words[0]
args = words[1:]
methodname = 'do_' + command
if not hasattr(self, methodname):
self.server.log('error: invalid command')
self.wfile.write('Error: "%s" is not a valid command\n' % command)
return
method = getattr(self, methodname)
method(*args)
def do_stop(self, *args):
self.wfile.write('Stopping server\n')
self.server.stop()
"""Process an 'echo' command"""
def do_echo(self, *args):
self.wfile.write(' '.join(args) + '\n')
"""Process a 'convert' command"""
def do_convert(self, video):
self.wfile.write('Converting %s\n' % (video))
try:
self.server.addVideo(video)
except Exception as e:
logging.info("ERROR: %s" % e)
class Server(ServerBase):
def __init__(self, server_address):
self.__daemonize()
self.pool = QueueServer()
if ServerBase == SocketServer.UnixStreamServer:
# Delete the socket file if it already exists
if os.access(server_address, 0):
os.remove(server_address)
ServerBase.__init__(self, server_address, RequestHandler)
def addVideo(self, video):
self.pool.add(video)
def log(self, format, *args):
try:
message = format % args
logging.info("%s" % message)
except Exception, e:
print str(e)
def serve_until_stopped(self):
self.log('started')
self.__stopped = False
while not self.__stopped:
self.handle_request()
self.log('stopped')
def stop(self):
self.__stopped = True
def __daemonize(self):
UMASK = 0
WORKDIR = '/'
MAXFD = 1024
if hasattr(os, 'devnull'):
REDIRECT_TO = os.devnull
else:
REDIRECT_TO = '/dev/null'
try :
if os.fork() != 0:
os._exit(0)
os.setsid()
if os.fork() != 0:
os._exit(0)
os.chdir(WORKDIR)
os.umask(UMASK)
except OSError, e:
self.log('exception: %s %s', e.strerror, e.errno)
raise Exception, "%s [%d]" % (e.strerror, e.errno)
except Exception, e:
self.log('exception: %s', str(e))
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
maxfd = MAXFD
for fd in range(0, maxfd):
try:
os.close(fd)
except OSError:
pass
os.open(REDIRECT_TO, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
""" Run a server as a daemon """
def run_server(options, args):
print("convertCentral running on %s" % server_address)
svr = Server(server_address)
svr.serve_until_stopped()
svr.server_close()
"""Send request to the server and process response."""
def do_request(options, args):
if ServerBase == SocketServer.UnixStreamServer:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Send request
s.connect(server_address)
s.sendall(' '.join(args) + '\n')
# Print response
sfile = s.makefile('rb')
line = sfile.readline()
while line:
print line,
line = sfile.readline()
#######################################################################
#######################################################################
if __name__ == '__main__':
optparser = optparse.OptionParser(usage=usage,
version=version)
(options, args) = optparser.parse_args()
if len(args) == 0:
optparser.print_help()
sys.exit(-1)
if args[0] == 'start':
run_server(options, args[1:])
else:
do_request(options, args)
然后,队列(threadedqueue.py - 对不起这个名字,感觉不是特别有创意):
#! /usr/bin/python
# -*- coding: utf-8 -*-
import os, time, threading, psutil, resource, logging, subprocess as sp, sys
from Queue import Queue
from threading import Thread
SERVER_LOG=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'convertCentral.log')
logging.basicConfig(format='[%(asctime)s.%(msecs).03d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', filename=SERVER_LOG, level=logging.INFO)
class QueueServer(object):
current_video_queue = Queue(maxsize=0)
N_WORKER_THREADS = 1
counter = 0
def __init__(self):
logging.info("[QueueServer] Initializing the video conversion queue")
for i in range(self.N_WORKER_THREADS):
logging.info("Firing thread")
t = Thread(target=self.worker)
t.daemon = True
t.start()
''' Converts the video using Handbrake via subprocess'''
def convertVideo(self, video):
logging.info("Now converting %s" % video)
fileName, fileExtension = os.path.splitext(video)
payload = "nice -n 15 HandBrakeCLI -i %s -e x264 -q 15 -o %s.mp4" % (video, fileName)
pr = sp.Popen(payload, shell=True, stdout=open('/dev/null', 'w'), stderr=sp.STDOUT)
logging.info('Started handbrake')
pr.wait()
logging.info("EXIT CODE: %s " % pr.returncode)
self.counter = self.counter + 1
logging.info("Conversion's done. %d" % self.counter)
''' A worker thread '''
def worker(self):
while True:
logging.info("Getting one")
item = self.current_video_queue.get()
logging.info("Firing conversion: %s" % item)
self.convertVideo(item)
self.current_video_queue.task_done()
logging.info("All done")
''' Adds a video to the video conversion queue '''
def add(self, video):
logging.info("* Adding %s to the queue" % video)
self.current_video_queue.put(video)
logging.info("* Added %s to the queue" % video)
time.sleep(3)
这是交易:如果我自己运行线程队列,它会很好用。但是,如果我使用 server.py 运行它,则转换永远不会发生,因为 Handbrake 崩溃了。
以下是日志:
Hal@ubuntu:~/Desktop/convertCentral$ python server.py start
convertCentral running on /tmp/server_socket
Hal@ubuntu:~/Desktop/convertCentral$ python server.py convert UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.793] request : convert UNKNOWN_PARAMETER_VALUE.WMV
Converting UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.793] * Adding UNKNOWN_PARAMETER_VALUE.WMV to the queue
[2014-04-17 18:05:44.793] * Added UNKNOWN_PARAMETER_VALUE.WMV to the queue
[2014-04-17 18:05:44.793] Firing conversion: UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.794] Now converting UNKNOWN_PARAMETER_VALUE.WMV
[2014-04-17 18:05:44.796] Started handbrake
[2014-04-17 18:05:45.046] Exit code: 0
[2014-04-17 18:05:45.046] Conversion's done. 1
[2014-04-17 18:05:45.046] All done
[2014-04-17 18:05:45.047] Getting one
我将子进程的输出记录到文件中。这是我得到的:
[18:05:44] hb_init: starting libhb thread
HandBrake 0.9.9 (2013051800) - Linux x86_64 - http://handbrake.fr
4 CPUs detected
Opening UNKNOWN_PARAMETER_VALUE.WMV...
[18:05:44] hb_scan: path=UNKNOWN_PARAMETER_VALUE.WMV, title_index=1
libbluray/bdnav/index_parse.c:162: indx_parse(): error opening UNKNOWN_PARAMETER_VALUE.WMV/BDMV/index.bdmv
libbluray/bdnav/index_parse.c:162: indx_parse(): error opening UNKNOWN_PARAMETER_VALUE.WMV/BDMV/BACKUP/index.bdmv
libbluray/bluray.c:1725: nav_get_title_list(UNKNOWN_PARAMETER_VALUE.WMV) failed (0x7fd44c000900)
[18:05:44] bd: not a bd - trying as a stream/file instead
libdvdnav: Using dvdnav version 4.1.3
libdvdread: Encrypted DVD support unavailable.
libdvdread: Can't stat UNKNOWN_PARAMETER_VALUE.WMV
No such file or directory
libdvdnav: vm: failed to open/read the DVD
[18:05:44] dvd: not a dvd - trying as a stream/file instead
[18:05:44] hb_stream_open: open UNKNOWN_PARAMETER_VALUE.WMV failed
[18:05:44] scan: unrecognized file type
[18:05:44] libhb: scan thread found 0 valid title(s)
No title found.
HandBrake has exited.
因此,我们可以证明该脚本确实可以启动 Handbrake,但日志表明 Handbrake 无法识别文件格式并当场死亡。同样,如果我自己运行 threadedqueue.py 脚本,这不会发生。
我猜 Handbrake 没有以某种方式加载它的库。这是代码不起作用的原因吗?我怎样才能让手刹工作?