1

我正在为名为py_pushover的Pushover API创建一个包装器。其中一项功能是能够使用websocket直接连接到他们的服务器。在尝试“测试”我的异步代码时,我遇到了一些麻烦。运行以下:

import py_pushover as py_po
from tests.helpers import app_key, device_id, secret
import time


def print_msg(messages):
    print(messages)

if __name__ == "__main__":
    cm = py_po.client.ClientManager(app_key, secret=secret, device_id=device_id)
    cm.listen_async(print_msg)

    for i in range(10):
        time.sleep(3)
        print("Meh")

    cm.stop_listening()
    cm.clear_server_messages()

完全按照需要工作。但是,当我将“回调”函数包装到测试套件中时:

class TestMessage(unittest.TestCase):
    def setUp(self):
        self.pm = py_po.message.MessageManager(app_key, user_key)
        self.client = py_po.client.ClientManager(app_key, secret=secret, device_id=device_id)
        self.cleanUpClient()
        self.client.listen_async(self.client_message_receieved)  # <--- BREAKS HERE!

    def tearDown(self):
        self.client.stop_listening()
        self.cleanUpClient()

    def cleanUpClient(self):
        self.client.retrieve_message()
        for msg in self.client.messages:
            if msg['priority'] >= py_po.PRIORITIES.EMERGENCY and msg['acked'] != 1:
                self.client.acknowledge_message(msg['receipt'])
        self.client.clear_server_messages()

        self.client.retrieve_message()
        self.assertEquals(len(self.client.messages), 0)

    def client_message_receieved(self, messages):
        self.stored_messages = messages

    def test_val_msg(self):

        # Testing a normal push message
        send_message = 'Testing normal push'
        self.pm.push_message(send_message, device='test_device')
        # self.client.retrieve_message()

        self.assertEquals(send_message, self.stored_messages[0]['message'])

我收到以下错误:

TypeError: cannot serialize '_io.TextIOWrapper' object

我假设这是由于一些酸洗错误造成的,但我不太确定问题出在哪里。如何设置允许我测试调用回调函数的多处理代码的测试套件?

我的客户端代码如下(精简到不那么眼痛):

import websocket
import logging
from multiprocessing import Process, Pipe

from py_pushover import BaseManager, send, base_url

logging.basicConfig(filename='client.log', level=logging.INFO)

class ClientManager(BaseManager):
    """
    Manages the interface between the Pushover Servers and user.  This can be instantiated with or without the user
    secret and device id.  If no secret is provided, the user MUST login before interfacing with the Pushover servers.
    If no device id is provided, the user MUST register this client as a device before interfacing with the Pushover
    servers.
    """
    _login_url = base_url + "users/login.json"
    _register_device_url = base_url + "devices.json"
    _message_url = base_url + "messages.json"
    _del_message_url = base_url + "devices/{device_id}/update_highest_message.json"
    _ack_message_url = base_url + "receipts/{receipt_id}/acknowledge.json"
    _ws_connect_url = "wss://client.pushover.net/push"
    _ws_login = "login:{device_id}:{secret}\n"

    def __init__(self, app_token, secret=None, device_id=None):
        """
        :param str app_token: application id from Pushover API
        :param str secret: (Optional) user secret given after validation of login
        :param str device_id: (Optional) device id of this client
        :return:
        """
        super(ClientManager, self).__init__(app_token)
        self.__secret__ = secret
        self.__device_id__ = device_id
        self.messages = []
        self._ws_app = websocket.WebSocketApp(
            self._ws_connect_url,
            on_open=self._on_ws_open,
            on_message=self._on_ws_message,
            on_error=self._on_ws_error,
            on_close=self._on_ws_close
        )
        self.__on_msg_receipt__ = None
        self.__p__ = Process()

    def listen(self, on_msg_receipt):
        """
        Listens for messages from the server.  When a message is received, a call to the on_msg_receipt function with a
          single parameter representing the messages received.

        :param on_msg_receipt: function to call when a message is received
        """
        self.__on_msg_receipt__ = on_msg_receipt
        self._ws_app.run_forever()

    def listen_async(self, on_msg_receipt):
        """
        Creates a Process for listening to the Pushover server for new messages.  This process then listens for messages
          from the server.  When a message is received, a call to the on_msg_receipt function with a single parameter
          representing the messages received.

        :param on_msg_receipt: function to call when a message is received
        """
        self.__p__ = Process(target=self.listen, args=(on_msg_receipt,))
        self.__p__.start()  # <-- BREAKS HERE!

    def stop_listening(self):
        """
        Stops the listening process from accepting any more messages.
        """
        if self.__p__:
            self.__p__.terminate()
            self.__p__ = None

    def _on_ws_open(self, ws):
        """
        Function used when the websocket is opened for the first time.

        :param ws: the websocket
        """
        logging.info("Opening connection to Pushover server...")
        ws.send(self._ws_login.format(device_id=self.__device_id__, secret=self.__secret__))
        logging.info("----Server Connection Established----")

    def _on_ws_message(self, ws, message):
        """
        Function used for when the websocket recieves a message.  Per the Pushover API guidelines 1 of 4 responses
        will be sent:

            1. `#` - Keep-alive packet, no response needed.
            2. `!` - A new message has arrived; you should perform a sync.
            3. `R` - Reload request; you should drop your connection and re-connect.
            4. `E` - Error; a permanent problem occured and you should not automatically re-connect.
                     Prompt the user to login again or re-enable the device.

        :param ws: the websocket
        :param message: message received from remote server
        """
        message = message.decode("utf-8")
        logging.debug("Message received: " + message)
        if message == "#":
            pass

        elif message == "!":
            self.retrieve_message()
            if self.__on_msg_receipt__:
                self.__on_msg_receipt__(self.messages)

        elif message == "R":
            logging.info("Reconnecting to server (requested from server)...")
            ws.close()
            self.listen(self.__on_msg_receipt__)

        elif message == "E":
            logging.error("Server connection failure!")

        else:  # message isn't of the type expected.  Raise an error.
            raise NotImplementedError  #todo Implement an appropriate exception

    def _on_ws_error(self, ws, error):
        """
        Function used when the websocket encounters an error.  The error is logged

        :param ws: the websocket
        :param error: the error encountered
        """
        logging.error('Error: ' + error)

    def _on_ws_close(self, ws):
        """
        Function used when the websocket closes the connection to the remote server.

        :param ws: the websocket
        """
        logging.info("----Server Connection Closed----")
        self._ws_app = None

注意:我不是想推广我的py_pushover模块,而是在此处链接它以防需要查看更多代码。

4

0 回答 0