0

我有 Flask 应用程序并在其中实现了 RPC。

引擎.py

import pickle
import pika
from databasyfacade.rpc import api

__author__ = 'Marboni'

def on_request(ch, method, props, body):
    request = pickle.loads(body)
    func = getattr(api, request['func'])
    try:
        result = func(*request['args'])
    except Exception, e:
        response = {
            'status': 'ERROR',
            'error': e
        }
    else:
        response = {
            'status': 'OK',
            'result': result
        }

    ch.basic_publish(exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        ),
        body=pickle.dumps(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)


def init(host):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host))

    channel = connection.channel()
    channel.queue_declare(queue='facade_rpc')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='facade_rpc')

api.py

from databasyfacade.services import profiles_service

__author__ = 'Marboni'

def profile(user_id):
    return profiles_service.profile(user_id)

当 Flask 应用程序初始化时,它运行 method init(host)


现在我需要测试我的应用程序如何响应 RPC 调用。所以我写了以下 RPC 客户端:

客户端.py

import pickle
import uuid
import pika

__author__ = 'Marboni'

class RpcClient(object):
    def __init__(self, host):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)

        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)


    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body


    def call(self, func, *args):
        request = {
            'func': func,
            'args': args
        }

        self.response = None
        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange='',
            routing_key='facade_rpc',
            properties=pika.BasicProperties(
                reply_to = self.callback_queue,
                correlation_id = self.corr_id,
            ),
            body=pickle.dumps(request))

        while self.response is None:
            self.connection.process_data_events()
        response = pickle.loads(self.response)
        if response['status'] == 'ERROR':
            e = response['error']
            raise e
        else:
            return response['result']

然后我基于 Flask-Testing 框架编写了测试。它在每个测试方法之间初始化 Flask 应用程序,因此我们可以与之交互。

测试.py

from databasyfacade.rpc import RpcClient
from databasyfacade.testing import DatabasyTest, fixtures
from databasyfacade.testing.testdata import UserData, ProfileData

__author__ = 'Marboni'

class RpcTest(DatabasyTest):
    @fixtures(UserData, ProfileData)
    def test_profile(self, data):
        rpc = RpcClient(self.app.config['RABBITMQ_HOST'])
        profile = rpc.call('profile', ProfileData.hero.user_id)
        self.assertIsNotNone(profile)
        self.assertEqual(ProfileData.hero.email, profile.email)

此测试在拨打电话时挂起。它在这里无限迭代:

来自客户端.py

while self.response is None:
    self.connection.process_data_events()

这意味着on_response()客户端上的该方法从未调用过。

如果我用 CTRL-C 中断我的测试,我将看到以下堆栈跟踪:

Traceback (most recent call last):
  File "../env/bin/nosetests", line 8, in <module>
    load_entry_point('nose==1.3.0', 'console_scripts', 'nosetests')()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 118, in __init__
    **extra_args)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/unittest/main.py", line 95, in __init__
    self.runTests()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 197, in runTests
    result = self.testRunner.run(self.test)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 61, in run
    test(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/unittest/suite.py", line 65, in __call__
    return self.run(*args, **kwds)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 74, in run
    test(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 45, in __call__
    return self.run(*arg, **kwarg)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 133, in run
    self.runTest(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 151, in runTest
    test(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/flask_testing.py", line 72, in __call__
    self._pre_setup()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/flask_testing.py", line 80, in _pre_setup
    self.app = self.create_app()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/testing/__init__.py", line 12, in create_app
    return app.create_app()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/app.py", line 75, in create_app
    init_rpc(app)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/app.py", line 63, in init_rpc
    rpc.init(app.config['RABBITMQ_HOST'])
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/rpc/engine.py", line 39, in init
    channel.basic_consume(on_request, queue='facade_rpc')
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/channel.py", line 220, in basic_consume
    {'consumer_tag': consumer_tag})])
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1104, in _rpc
    self._wait_on_response(method_frame))
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1124, in _send_method
    self.connection.process_data_events()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 215, in process_data_events
    if self._handle_read():
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 327, in _handle_read
    if self._read_poller.ready():
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 66, in ready
    self.poll_timeout)
KeyboardInterrupt

我尝试运行应用程序并从单独的脚本访问它:

#!/usr/bin/env python
from databasyfacade.rpc.client import RpcClient

rpc = RpcClient('localhost')

profile = rpc.call('profile', 4L)
print profile.email

如您所见,代码与测试中的代码相同,但在这种情况下它可以工作。

这个问题的原因可能是什么?可能是因为 Flask-Testing 在一个进程中同时运行应用程序和客户端?如何检查它/编写正确的测试?

4

1 回答 1

1

我找到了原因,它与MQ无关。使用带有 scoped_session 的 SQLAlchemy 删除方法访问的数据库。在我正确完成会话后问题消失了:

Session.remove()
于 2013-07-19T11:45:42.497 回答