5

我正在使用 Python 构建一个算法交易平台。多种算法每天从 09:30 到 16:00 监控市场并相应地执行交易。

我正在寻找的是从客户端任意启动和停止算法。因此,我希望在任何给定时间运行一个服务器脚本multiprocessing和一个可以启动/停止/列出算法(应该单独运行)的客户端。process

如何做到这一点的任何例子?大多数在线示例都是针对队列服务器的,这似乎不适合我的问题。

编辑:

我正在尝试使用包来做到这一点multiprocessing。使用队列的想法对我来说似乎是错误的,因为我知道任意数量的进程将运行一整天或至少直到我说停止。我不想运行一个简短的脚本,让工作人员在完成前一个工作后从队列中消费下一个工作。实际上,我正在考虑使用Manager将永远运行的服务器脚本,并在请求时在单独的进程/线程中启动新脚本。但是,我希望能够向进程发送停止信号以杀死它。我确实有一种感觉,我这样做有点倒退:-) 我所拥有的是:

服务器.py:

import multiprocessing as mp
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from time import strftime


class Server(object):
    def __init__(self, port=50000, authkey=''):
        self.processes = {}
        self._authkey = authkey
        self.port = port
        self.server = None
        self.running = False
        BaseManager.register('get_process', callable=lambda: self)


    def start_server(self):
        manager = BaseManager(address=('', self.port), authkey=self._authkey)
        self.server = manager.get_server()
        try:
            self._logmessage("Server started")
            self.running = True
            self.server.serve_forever()
        except (KeyboardInterrupt, SystemExit):
            self.shutdown()

    def start_process(self, mod, fcn, *args, **kwargs):
        mod = __import__(mod, globals(), locals(), ['object'], -1)
        key = "{0}.{1}".format(mod, fcn)
        assert not key in self.processes, \
            "Process named '%s' already exists" % key
        p = Process(target=getattr(mod, fcn), name=mod, args=(None, ), kwargs=kwargs)
        self._logmessage("Process '%s' started" % key)
        p.start()
        # p.join()
        self.processes[key] = p

    def stop_process(self, key):
        self.processes[key].terminate()
        del self.processes[key]

    def get_processes(self):
        return self.processes.keys()

    def shutdown(self):
        for child in mp.active_children():
            child.terminate()
        self.server.shutdown()
        self.running = False
        print "Shutting down"

    def _logmessage(self, msg):
        print "%s: %s" % (strftime('%Y-%m-%d %H:%M:%S'), msg)


if __name__ == '__main__':
    server = Server(authkey='abc')
    try:
        server.start_server()
    except (KeyboardInterrupt, SystemExit):
        server.shutdown()

客户端.py:

from multiprocessing.managers import BaseManager
import time


class Client(object):
    def __init__(self, host='', port=50000, authkey=''):
        self.host = host
        self.port = port
        self.manager = None
        self.process = None
        self._type_id = 'get_process'
        self._authkey = authkey
        self.manager = BaseManager(address=(self.host, self.port), authkey=self._authkey)
        BaseManager.register(self._type_id)

    def connect(self):
        try:
            self.manager.connect()
            self._logmessage("Connected to server")
        except:
            self._logmessage("Could not connect to server")
        self.process = getattr(self.manager, self._type_id)()

    def start_process(self, mod, fcn):
        self.process.start_process(mod, fcn)
        self._logmessage("Process '%s' started" % fcn)

    def list_processes(self):
        print self.process.get_processes()

    @property
    def connected(self):
        return self.manager._state.value == self.manager._state.STARTED

    def _logmessage(self, msg):
        print "%s: %s" % (time.strftime('%Y-%m-%d %H:%M:%S'), msg)


def test(data):
    while True:
        print time.time()
        time.sleep(1.)


if __name__ == '__main__':
    from algotrading.server.process_client import Client
    client = Client(authkey='abc')
    client.connect()
    client.start_process("algotrading.server.process_client", "test")
    client.list_processes()
4

2 回答 2

0

您可以实现一个套接字服务器,它监听客户端并启动线程来执行算法。

我认为 RPC 将是最简单的解决方案。

一些启示:目前在 Python 中做 RPC 的选择是什么?

于 2014-01-13T09:53:10.583 回答
0

查看Supervisord,它允许远程管理进程,以及自动启动/重启可配置性。

根据您的可扩展性和灾难恢复需求,您可能正在考虑将您的“监控/交易流程”分布在运行的多台服务器上。虽然 supervisord 实际上只设计用于管理单台机器,但您可以构建一个管理器应用程序,通过它的 xml-rpc 接口协调多个服务器,每个服务器都运行 supervisord。

Cron 或 Celery 可用于您的日常启动/停止计划。

于 2014-01-13T10:04:29.997 回答