1

我有一个通过 Klein POST 并运行 Celery 异步作业的 React 应用程序(通过 RabbitMQ)。我想通过 WAMP Pub/Sub 显示某个表中的所有任务以及状态更新(我使用 Crossbar 作为路由器)。我的 React 表组件获取初始数据并订阅更改,如下所示:

class Table extends React.Component {
  componentWillMount(){
    this.props.session //Autobahn session variable passed down as prop
      .subscribe("celery.update.task", (args,kwargs,details)=>{update state})
  }
  componentDidMount(){
    //Gets all tasks in db (SQLite)
    this.props.session.call("celery.all.tasks", (a,k,d)=>{set initial state}
  }
  render() {
    //render table with data from state
  }

我目前正在尝试让后端的 WAMP 组件(使用 AutobahnPython)在 celery 事件上发布“celery.update.task”。此应用程序将面向 Internet,因此我想使用安全 websockets (wss),因此按照教程运行 WAMP 组件。我当前的问题是用于监听 celery 事件的 WAMP 组件没有发布异步(一旦 python 进程结束,所有发布都会被发送):

import threading, time, ast, six
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunnner
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet._sslverify import OpenSSLCertificateAuthorities
from twisted.internet.ssl import CertificateOptions
from OpenSSL import crypto

class MonitorThread(ApplicationSession):
  @inlineCallbacks
  def onJoin(self, details):
    from app_queue import celery #Celery instance
    self.celery_app = celery
    self.interval = 1
    self.state = self.celery_app.events.State()
    self.thread = threading.Thread(target=self.run, args=())
    self.thread.daemon = True
    self.thread.start()
    yield 

  @inlineCallbacks
  #there are analogous callbacks for "received", "started" and "failed"
  def handle_task_success(self, event):
    ...
    res = yield self.publish("celery.task.update", "some_task_update") 
    resultValue(res)

  def run(self): 
    while True:
      try:
        with self.celery_app.connection() as connection: 
          recv = self.select_app.events.Receiver(connection, handlers={
            "task-succeeded": self.handle_task_success
          })
          recv.capture(limit=None, timeout=None, wakeup=True)
      except (KeyboardInterrupt, SystemExit):
        raise
      except Exception:
        pass
      time.sleep(self.interval)

if __name__ == "__main__":
  cert = crypto.load_certificate(crypto.FILETYPE_PEM,six.u(open('.crossbar/example.cert.pem', "r").read()))
  options = CertificateOptions(trustRoot=OpenSSLCertificateAuthorities([cert]))
  runner = ApplicationRunner(url=u"wss://127.0.0.1:443/ws", realm=u"realm1", ssl=options)
  runner.run(MonitorThread)

上面的代码能够监控所有事件,但发布方法不会推送给订阅者,直到 python 进程结束。我希望 WAMP 组件在每个 Celery 事件(接收、启动、成功、失败)上发布到“celery.task.update”,以便表组件实时更新。

有没有办法使这项工作?我试过没有成功尝试thisthis

4

0 回答 0