目前,我正在使用 ZeroRPC,我让“工人”连接到“服务器”并完成服务器发送给他们的工作。
目前,只要有调用,就会通过 ZeroRPC 进行调用,据我所知,它使用 FIFO 队列。
我想使用我自己的队列来限制/优先处理呼叫。
我希望 ZeroRPC 公开一个Event
在其内部队列为空时触发的 gevent。
您想要做的是在您的服务器中创建自己的工作队列。并按照您希望的优先级向自己发送呼叫。
由于几行代码比 3 卷中的任何吸血鬼故事都表达得更多,让我们在伪代码中查看服务器的外观:
myqueue = MySuperBadAssQueue()
def myqueueprocessor():
for request in myqueue: # blocks until next request
gevent.spawn(request.processme) # do the job asynchronously
gevent.spawn(myqueueprocessor) # do that at startup
class Server:
def dosomething(args...blabla...): # what users are calling
request = Request(args...blabla...)
myqueue.put(request) # something to do buddy!
return request.future.get() # return when request is completed
# (can also raise an exception)
# An example of what a request could look like:
class Request:
def __init__(self, ....blablabla...):
self.future = gevent.AsyncResult()
def process():
try:
result = someworker(self.args*) # call some worker
self.future.set(result) # complete the initial request
except Exception as e:
self.future.set_exception(e)
它由 MySuperBadAssQueue 来完成所有智能工作,如果需要,可以节流,必要时取消带有异常的请求,等等......
ZeroRPC 不会公开任何事件来让您知道它的“内部”队列是否为空:
事实上,ZeroRPC 中并没有明确的队列。会发生什么,只是先到先得,确切的顺序取决于 ZeroMQ 和 Gevent IOLoop(libevent 或 libev 取决于版本)。碰巧在实践中,这很方便地像 FIFO 队列一样播放。
我自己没有尝试过,但我已经阅读了源代码。我很有动力,因为我想自己做这件事。
似乎您要做的是继承zerorpc.Server
并覆盖该_acceptor
方法。根据消息来源,_acceptor
是接收消息然后产生线程来运行它们。因此,如果您更改逻辑/循环以合并您的队列,您可以使用它来限制。