我的关键区域是一个搁置对象,我使用事件和锁的组合来保持线程分离。但是,这是行不通的。我的意思是进程挂起,我在某处发生了未经检查的锁,之后什么也没有发生。
我确实发现了问题。获得了锁并且搁置对象发生了变化。然后,当发生 event.set() 的条件时,所有线程都已经在 event.wait() 之后的代码块内。换句话说,他们在不应该的条件下运行代码。我将远离事件并进入条件对象,我使用的是事件,因为最初我使用的是队列。队列虽然没有持久性,所以现在我正在使用这种方法。
它是这样的:
import SocketServer
SocketServer.TCPServer.allow_reuse_address = True
from Dropbox import Dropbox
import threading
import json
class DropboxServer(SocketServer.StreamRequestHandler):
def handle(self):
inp = self.request.recv(8192).strip()
Dropbox.enqueue(json.loads(inp), "DropboxServer")
self.request.send(json.dumps({"status":"success"}))
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
event = threading.Event()
event.clear()
lock = threading.Lock()
Dropbox.lock = lock
Dropbox.event = event
for i in range(Dropbox.thread_count):
thread = Dropbox()
thread.start()
server = SocketServer.TCPServer((HOST, PORT), DropboxServer)
print "serving forever"
server.serve_forever()
和
from dropbox import session, client
import Queue
import random
import string
from time import time
import shelve
class Dropbox(threading.Thread):
thread_count = 20
APP_KEY = 'key'
APP_SECRET = 'sec'
queue = Queue.Queue()
requests = shelve.open("requests.db")
def __init__(self):
self.sess = session.DropboxSession(self.APP_KEY, self.APP_SECRET, 'dropbox')
self.sess.set_token("token", "secret")
self.dropbox = client.DropboxClient(self.sess)
super(Dropbox, self).__init__(group=None, target=None, name=None, args=(), kwargs={})
Dropbox.requests["dropbox"] = []
def run(self):
while True:
self.put_file(Dropbox.dequeue(self.name))
def put_file(self, request):
file = open(request["local_file_path"])
self.dropbox.put_file(request["destination"], file)
file.close()
print(self.name + " ending. " + str(time() - Dropbox.start_time))
"""
{
destination : project_name/AssetFile.png
local_file_path : /path/to/asset.png
token: A9s8d*d8
secret: a9s9*38
}
"""
@staticmethod
def enqueue(request, name):
if "destination" not in request:
for single_request in request:
size = Dropbox.enqueue(single_request, name)
return size
Dropbox.lock.acquire()
print(name + " acquired lock")
temp = Dropbox.requests["dropbox"]
temp.append(request)
print(name + " enqueue size: " + str(len(temp)))
Dropbox.requests["dropbox"] = temp
Dropbox.lock.release()
size = len(temp)
if size == 1:
Dropbox.start_time = time()
print(name + " event set")
Dropbox.event.set()
print(name + " releasing lock")
return size
@staticmethod
def dequeue(name):
Dropbox.lock.acquire()
print(name + " acquired lock")
temp = Dropbox.requests["dropbox"]
try:
result = temp.pop(0)
except IndexError:
Dropbox.event.wait()
print(name + " dequeue size: " + str(len(temp)))
Dropbox.requests["dropbox"] = temp
Dropbox.lock.release()
if len(temp) == 0:
print(name + " event cleared")
Dropbox.event.clear()
print(name + " releasing lock")
return result