2

目前,我正在使用 ZeroRPC,我让“工人”连接到“服务器”并完成服务器发送给他们的工作。

目前,只要有调用,就会通过 ZeroRPC 进行调用,据我所知,它使用 FIFO 队列。

我想使用我自己的队列来限制/优先处理呼叫。

我希望 ZeroRPC 公开一个Event在其内部队列为空时触发的 gevent。

4

2 回答 2

3

您想要做的是在您的服务器中创建自己的工作队列。并按照您希望的优先级向自己发送呼叫。

由于几行代码比 3 卷中的任何吸血鬼故事都表达得更多,让我们在伪代码中查看服务器的外观:

myqueue = MySuperBadAssQueue()

def myqueueprocessor():
  for request in myqueue: # blocks until next request
    gevent.spawn(request.processme) # do the job asynchronously

gevent.spawn(myqueueprocessor) # do that at startup

class Server:

  def dosomething(args...blabla...):  # what users are calling
    request = Request(args...blabla...)
    myqueue.put(request)  # something to do buddy!
    return request.future.get() # return when request is completed
                                # (can also raise an exception)

# An example of what a request could look like:
class Request:
  def __init__(self, ....blablabla...):
    self.future = gevent.AsyncResult()

   def process():
     try:
         result = someworker(self.args*) # call some worker
         self.future.set(result) # complete the initial request
     except Exception as e:
         self.future.set_exception(e)

它由 MySuperBadAssQueue 来完成所有智能工作,如果需要,可以节流,必要时取消带有异常的请求,等等......

ZeroRPC 不会公开任何事件来让您知道它的“内部”队列是否为空:

事实上,ZeroRPC 中并没有明确的队列。会发生什么,只是先到先得,确切的顺序取决于 ZeroMQ 和 Gevent IOLoop(libevent 或 libev 取决于版本)。碰巧在实践中,这很方便地像 FIFO 队列一样播放。

于 2012-12-05T19:36:19.393 回答
1

我自己没有尝试过,但我已经阅读了源代码。我很有动力,因为我想自己做这件事。

似乎您要做的是继承zerorpc.Server并覆盖该_acceptor方法。根据消息来源_acceptor是接收消息然后产生线程来运行它们。因此,如果您更改逻辑/循环以合并您的队列,您可以使用它来限制。

于 2013-09-15T02:00:39.633 回答