0

我有一个 Beaglebone Black 连接到 CAN 总线设备:电池。

在 Beaglebone Black 上作为 GUI 运行的龙卷风网络。

CAN总线读取循环不断从CAN总线读取数据以更新电池实例的状态

但是我怎样才能让两个 IOLOOP 一起工作并共享 Battery 实例呢?

在此处输入图像描述

龙卷风网:

class Battery(object):
    status = {}



class API_Handler(web.RequestHandler):
    def get(self, dev, cmd):
        if cmd == 'data':
            self.write(self.application.battery0.status)


class Application(web.Application):
    def __init__(self):

        self.battery0 = Battery('bat0')    

        routing = [
            (r'/api/battery/(data|)', API_Handler),
        ]

        settings = {
            'template_path': os.path.join(os.path.dirname(__file__), "templates"),
            'static_path': os.path.join(os.path.dirname(__file__), "static"),
        }

        web.Application.__init__(self, routing, debug=True, **settings)


if __name__ == "__main__":
    import tornado

    app = Application()
    app.listen(address='0.0.0.0', port=8888)
    tornado.ioloop.IOLoop.instance().start()

CAN总线读取循环,代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import errno
import functools
import tornado.ioloop
import socket
import struct


can_frame_fmt = "=IB3x8s"
can_frame_size = struct.calcsize(can_frame_fmt)

def build_can_frame(can_id, data):
    can_dlc = len(data)
    data = data.ljust(8, b'\x00')
    return struct.pack(can_frame_fmt, can_id, can_dlc, data)

def dissect_can_frame(frame):
    can_id, can_dlc, data = struct.unpack(can_frame_fmt, frame)
    return (can_id, can_dlc, data[:can_dlc])

def connection_ready(sock, fd, events):
    while True:
        try:
            cf, addr = sock.recvfrom(can_frame_size)
        except socket.error as e:
            if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
        dissect_can_frame(cf)


if __name__ == '__main__':
    sock = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
    sock.bind(('can0',))
    sock.setblocking(0)

    io_loop = tornado.ioloop.IOLoop.current()
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    io_loop.start()
4

1 回答 1

0

如我所见,您正在运行两个应用程序,因此很难共享 Battery 实例。第一个解决方案 - 将所有功能组合在一个应用程序中,以便电池实例简单可用,但您将面临在一个 ioloop 中服务 HTTP 请求和处理来自 CAN 的套接字事件的挑战。

所以这是另一种解决方案,保留两个应用程序但不尝试共享电池实例,只需从 CAN 侦听器向您的 GUI 应用程序发出 http 请求。

例如在 CAN 文件中:

from tornado.httpclient import AsyncHTTPClient

...

def connection_ready(sock, fd, events):
    while True:
        try:
           cf, addr = sock.recvfrom(can_frame_size)
        except socket.error as e:
            if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                raise
            return
        http_client = AsyncHTTPClient()
        http_client.fetch("http://localhost:8888/update-battery",
                          method="POST",
                          body="can_id={}&can_dlc={}&data={}".format(
                              dissect_can_frame(cf)))

如果数据中有字节,最好使用 python urllib 中的 urlencode 对正文进行编码。

在 GUI 文件中:

class Battery_Update_Handler(web.RequestHandler):
    def post(self):
        self.application.battery0.status = dict(
            can_id=self.get_argument("can_id", None),
            can_dlc=self.get_argument("can_dlc", None),
            data=self.get_argument("data", None))


class Application(web.Application):
    def __init__(self):

        self.battery0 = Battery('bat0')    

        routing = [
            (r'/api/battery/(data|)', API_Handler),
            (r'/update-battery', Battery_Update_Handler)
        ]

        settings = {
            'template_path': os.path.join(os.path.dirname(__file__), "templates"),
            'static_path': os.path.join(os.path.dirname(__file__), "static"),
        }

        web.Application.__init__(self, routing, debug=True, **settings)
于 2016-03-24T16:19:21.353 回答