0

我有一个要同步的龙卷风服务器。我有一个客户端,它同时向服务器发出异步请求。它每 5 秒用一次心跳 ping 服务器,其次,它会尽可能地对作业发出 GET 请求。

在服务器端,有一个包含作业的线程安全队列。如果队列为空,它将阻塞 20 秒。我希望它保持连接并阻塞 20 秒,当它返回时,它会向客户端写入“No job”。一旦有作业可用,它应该立即将其写入客户端,因为 queue.get() 会返回。我希望在此请求被阻止时,心跳继续在后台发生。在这里,我从同一个客户端向服务器发出两个异步请求。

这是我构建的一个示例项目,它模拟了我的问题。

服务器:

import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen

q = Queue()


class HeartBeatHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        print("Heart beat")


class JobHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        print("Job")
        try:
            job = yield q.get(block=True, timeout=20)
            self.write(job)
        except Exception as e:
            self.write("No job")


def make_app():
    return tornado.web.Application([
        (r"/heartbeat", HeartBeatHandler),
        (r"/job", JobHandler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    try:
        tornado.ioloop.IOLoop.current().start()
    except KeyboardInterrupt:
        tornado.ioloop.IOLoop.current().stop()

客户:

import asyncio
from tornado import httpclient, gen


@gen.coroutine
def heartbeat_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient()
        heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
                                                   body="")
        try:
            yield http_client.fetch(heartbeat_request)
            yield asyncio.sleep(5)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


@gen.coroutine
def worker_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
        job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
        try:
            response = yield http_client.fetch(job_request)
            print(response.body)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(heartbeat_routine())
    asyncio.ensure_future(worker_routine())
    loop.run_forever()

问题:

  1. 问题在于,当 queue.get() 阻塞时,心跳也会阻塞 20 秒。这是我不想要的。
  2. 正如您在我的客户端中看到的,我将请求超时设置为 180 秒。但这似乎对龙卷风不起作用。如果将 queue.get() 超时时间增加到 20 秒以上,它会返回错误代码,说明请求超时。
4

1 回答 1

1
  1. If you use a thread-safe queue, you must use not use blocking operations from the IOLoop thread. Instead, run them in a thread pool:

    job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
    

    Alternately, you could use Tornado's async (but thread-unsafe) queue, and use IOLoop.add_callback whenever you need to interact with the queue from another thread.

  2. There's some magic in the AsyncHTTPClient constructor, which tries to share existing instances when possible, but this means that constructor arguments are only effective the first time. The worker_routine is picking up the default instances created by heartbeat_routine. Add force_instance=True to ensure you get a fresh client in worker_routine (and call .close() on it when you're done)

于 2018-11-09T14:54:29.640 回答