我有一个带有MCC118 daqhat的树莓派。目标有三个:
- 获取数据并将其记录到数据库
- 尽可能接近实时地通过专用 LAN 将数据流式传输到浏览器(网络 GET/POST 速度受限)
- 分析此数据并根据分析驱动设备 (RPi.GPIO)
我的程序同时受 cpu 和 I/O 限制,因为程序必须等待来自 daqhat 的数据输入(I/O 限制),并且一旦读取数据,就必须对其进行记录和分析(cpu 限制)。对于 cpu 绑定部分,我正在利用多处理来构建数据库。然后将通过查询数据库来更新浏览器显示(通过烧瓶)。为了有效地做到这一点,我开发了以下示例代码作为项目记录和显示方面的基础:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import random
import psycopg2 as sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from multiprocessing import Process
def fill_table():
conn_fill = sql.connect('dbname=mp_example user=pi')
cf = conn_fill.cursor()
while True:
cf.execute('INSERT INTO ran_num (ran) VALUES (%s);', (random.random(),))
conn_fill.commit()
def read_table():
conn_read = sql.connect('dbname=mp_example user=pi')
cr = conn_read.cursor()
while True:
cr.execute('SELECT * FROM ran_num WHERE id=(SELECT MAX(id) FROM ran_num);')
val = cr.fetchone()
if val:
print('\r' + str(val[0]) + ' - ' + str(round(float(val[1]), 3)), end='')
sys.stdout.flush()
print('\n')
def main():
conn_main = sql.connect('dbname=mp_example user=pi')
cm = conn_main.cursor()
cm.execute('DROP TABLE IF EXISTS ran_num;')
cm.execute('CREATE TABLE ran_num (id SERIAL PRIMARY KEY, ran VARCHAR);')
conn_main.commit()
cm.close()
conn_main.close()
print('Starting program, press CTRL + C to quit... \n')
if __name__ == '__main__':
main()
try:
p1 = Process(target=fill_table)
p1.start()
p2 = Process(target=read_table)
p2.start()
except KeyboardInterrupt:
p1.join()
p2.join()
print('Processes have closed without zombies...\n')
我刚刚向 Postgres 介绍了自己(我使用的是 sqlite3),以便以我认为是“数据安全”的方式来实现写入和查询数据的并发性。但是,我的问题如下:
- 是否有更高效的内存和/或数据安全的方式来执行此操作(例如管道)?
KeyboardInterrupt
抛出异常时是否有更好的退出方式?(我的异常打印语句从不打印,表明进程也在离开进程僵尸。)
注意:我最初只是在每次 GET 请求通过 LAN 进入时从 daqhat 读取单个数据值,然而,这意味着采样率取决于网络速度。为了数据的完整性和一致性,我想使用多处理来让数据采集以恒定的采样率完成它的工作,并根据网络请求从数据库中读取数据。