我正在尝试对 linux 上的串行端口进行多路访问。我正在使用一个只有一个串行端口的嵌入式系统,如果有多个进程与之通信会很好。
常见的用例是:
- 一个主程序运行测试(发送命令和接收输出);
- 另一个记录所有串口活动;
- 在测试期间出现一些错误后,用户终端打开以发送其他命令和/或执行事后分析。
首先,我制作了一个简单的 python 脚本来打开 n 个伪终端对(加上串口),并使用 poll 语句将输入/输出定向到正确的位置:
# Removed boiler plate and error checking for clarity
##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking
##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
master, slave = os.openpty()
# Print slave names so others know where to connect
print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
pts.append(master)
##### Poller setup
poller = select.poll()
poller.register(ttyS.fd, select.POLLIN | select.POLLPRI)
for pt in pts:
poller.register(pt, select.POLLIN | select.POLLPRI)
##### MAIN
while True:
events = poller.poll(500)
for fd, flag in events:
# fd has input
if flag & (select.POLLIN | select.POLLPRI):
# Data on serial
if fd == ttyS.fd:
data = ttyS.read(80)
for pt in pts:
os.write(pt, data)
# Data on other pty
else:
ttyS.write(os.read(fd, 80))
如果每个 pty 都已连接,则此方法非常有效。如果有一些未连接的 pty,最终它的缓冲区会填满并在写入时阻塞。似乎我需要知道连接了哪些从站或某种按需 pty 开放。
我在这个问题上找到了一个巧妙的技巧,这个人只需要从串口部分读取数据,所以我修改了我的脚本:
##### Serial port setup
ttyS = serial.Serial(device, baudrate, width, parity, stopbits, 1, xon, rtc)
ttyS.setTimeout(0) # Non-blocking
##### PTYs setup
pts = []
for n in range(number_of_slave_terminals):
master, slave = os.openpty()
# slaves
print >>sys.stderr, 'MUX > fd: %d pty: %s' % (slave, os.ttyname(slave))
os.close(slave) # POLLHUP trick
# masters
pts.append(master)
##### Poller setup
reader = select.poll()
writer = select.poll()
reader.register(ttyS, select.POLLIN | select.POLLPRI)
for pt in pts:
reader.register(pt, select.POLLIN | select.POLLPRI)
writer.register(pt, select.POLLIN | select.POLLPRI | select.POLLOUT)
def write_to_ptys(data):
events = writer.poll(500)
for fd, flag in events:
# There is someone on the other side...
if not (flag & select.POLLHUP):
os.write(fd, data)
##### MAIN
while True:
events = reader.poll(500)
for fd, flag in events:
if flag & (select.POLLIN | select.POLLPRI):
# Data on serial
if fd == ttyS.fd:
write_to_tty(ttyS.read(80))
# Data on other pty
else:
ttyS.write(os.read(fd, 80))
哪个有效,但使用了 100% 的 CPU,因为读取器轮询充满了 POLLHUP 事件。
我想如果我使用 TCP 套接字而不是伪终端,我可以得到我想要的。缺点是我必须修改所有其他已经与终端一起使用的脚本才能使用套接字(我知道我可以使用 socat,我只是想要更简单的东西)。此外,还有所有的网络开销......
那么,有什么想法吗?
我不介意使用其他工具,只要设置简单。我也不介意使用其他语言,我最喜欢 Python。