我有一个通过 Klein POST 并运行 Celery 异步作业的 React 应用程序(通过 RabbitMQ)。我想通过 WAMP Pub/Sub 显示某个表中的所有任务以及状态更新(我使用 Crossbar 作为路由器)。我的 React 表组件获取初始数据并订阅更改,如下所示:
class Table extends React.Component {
componentWillMount(){
this.props.session //Autobahn session variable passed down as prop
.subscribe("celery.update.task", (args,kwargs,details)=>{update state})
}
componentDidMount(){
//Gets all tasks in db (SQLite)
this.props.session.call("celery.all.tasks", (a,k,d)=>{set initial state}
}
render() {
//render table with data from state
}
我目前正在尝试让后端的 WAMP 组件(使用 AutobahnPython)在 celery 事件上发布“celery.update.task”。此应用程序将面向 Internet,因此我想使用安全 websockets (wss),因此按照本教程运行 WAMP 组件。我当前的问题是用于监听 celery 事件的 WAMP 组件没有发布异步(一旦 python 进程结束,所有发布都会被发送):
import threading, time, ast, six
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunnner
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet._sslverify import OpenSSLCertificateAuthorities
from twisted.internet.ssl import CertificateOptions
from OpenSSL import crypto
class MonitorThread(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
from app_queue import celery #Celery instance
self.celery_app = celery
self.interval = 1
self.state = self.celery_app.events.State()
self.thread = threading.Thread(target=self.run, args=())
self.thread.daemon = True
self.thread.start()
yield
@inlineCallbacks
#there are analogous callbacks for "received", "started" and "failed"
def handle_task_success(self, event):
...
res = yield self.publish("celery.task.update", "some_task_update")
resultValue(res)
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.select_app.events.Receiver(connection, handlers={
"task-succeeded": self.handle_task_success
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
pass
time.sleep(self.interval)
if __name__ == "__main__":
cert = crypto.load_certificate(crypto.FILETYPE_PEM,six.u(open('.crossbar/example.cert.pem', "r").read()))
options = CertificateOptions(trustRoot=OpenSSLCertificateAuthorities([cert]))
runner = ApplicationRunner(url=u"wss://127.0.0.1:443/ws", realm=u"realm1", ssl=options)
runner.run(MonitorThread)
上面的代码能够监控所有事件,但发布方法不会推送给订阅者,直到 python 进程结束。我希望 WAMP 组件在每个 Celery 事件(接收、启动、成功、失败)上发布到“celery.task.update”,以便表组件实时更新。