我正在尝试,到目前为止,未能使用 python asyncio 访问串行端口。
我非常感谢在简单 fd 上使用新的 python 异步框架的任何提示。
干杯!
詹姆士
我正在尝试,到目前为止,未能使用 python asyncio 访问串行端口。
我非常感谢在简单 fd 上使用新的 python 异步框架的任何提示。
干杯!
詹姆士
这是使用 FD 的另一种方式
import asyncio
import serial
s = serial.Serial('/dev/pts/13', 9600)
def test_serial():
'''
read a line and print.
'''
text = ""
msg = s.read().decode()
while (msg != '\n'):
text += msg
msg = s.read().decode()
print(text)
loop.call_soon(s.write, "ok\n".encode())
loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
pySerial 正在获得直接asyncio
支持。它现在处于实验状态,但对我来说正在按预期工作。
取自文档的示例:
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False
transport.write(b'hello world\n')
def data_received(self, data):
print('data received', repr(data))
self.transport.close()
def connection_lost(self, exc):
print('port closed')
asyncio.get_event_loop().stop()
loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
这是一个使用pyserial-asyncio的工作示例:
from asyncio import get_event_loop
from serial_asyncio import open_serial_connection
async def run():
reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
while True:
line = await reader.readline()
print(str(line, 'utf-8'))
loop = get_event_loop()
loop.run_until_complete(run())
另一种选择是使用阻塞调用编写所有串行内容,然后使用 run_in_executor 在不同的线程中运行它:
import asyncio
import concurrent
from serial import Serial
# Normal serial blocking reads
# This could also do any processing required on the data
def get_byte():
return s.read(1)
# Runs blocking function in executor, yielding the result
@asyncio.coroutine
def get_byte_async():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
res = yield from loop.run_in_executor(executor, get_byte)
return res
def get_and_print():
b = yield from get_byte_async()
print (b)
s = Serial("COM11", 19200, timeout=10)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print())
感谢大家的建议,最后,我以稍微不同的方式解决了问题,并在 asyncio 中使用了支持良好的套接字连接,但随后使用 ser2net ( http://sourceforge.net/projects/ser2net/ ) 访问串行端口。
这需要大约 10 秒来配置,这意味着 python 代码现在也可以处理访问远程串行端口。
前阵子写了一个AsyncFile
类,接口比低级协议简单。
原代码在这里:https ://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20
class AsyncFile:
"""A local file class for use with the ``asyncio`` module.
``loop`` should be the event loop in use.
``filename`` is the name of the file to be opened.
``fileobj`` should be a regular file-like object.
``mode`` is the open mode accepted by built-in function ``open``.
If ``filename`` is specified, the named file will be opened. And if
``fileobj`` is specified, that file object will be used directly. You
cannot specify both ``filename`` and ``fileobj``.
This class can be used in a ``with`` statement.
"""
DEFAULT_BLOCK_SIZE = 8192
def __init__(self, loop=None, filename=None,
fileobj=None, mode='rb'):
if (filename is None and fileobj is None) or \
(filename is not None and fileobj is not None):
raise RuntimeError('Confilicting arguments')
if filename is not None:
if 'b' not in mode:
raise RuntimeError('Only binary mode is supported')
fileobj = open(filename, mode=mode)
elif 'b' not in fileobj.mode:
raise RuntimeError('Only binary mode is supported')
fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
if filename is not None:
fileobj.close()
errcode = ctypes.get_errno()
raise OSError((errcode, errno.errorcode[errcode]))
self._fileobj = fileobj
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._rbuffer = bytearray()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def fileno(self):
return self._fileobj.fileno()
def seek(self, offset, whence=None):
if whence is None:
return self._fileobj.seek(offset)
else:
return self._fileobj.seek(offset, whence)
def tell(self):
return self._fileobj.tell()
def _read_ready(self, future, n, total):
if future.cancelled():
self._loop.remove_reader(self._fileobj.fileno())
return
try:
res = self._fileobj.read(n)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_reader(self._fileobj.fileno())
future.set_exception(exc)
return
if not res: # EOF
self._loop.remove_reader(self._fileobj.fileno())
future.set_result(bytes(self._rbuffer))
return
self._rbuffer.extend(res)
if total > 0:
more_to_go = total - len(self._rbuffer)
if more_to_go <= 0: # enough
res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
self._loop.remove_reader(self._fileobj.fileno())
future.set_result(bytes(res))
else:
more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, more_to_go, total)
else: # total < 0
# This callback is still registered with total < 0,
# nothing to do here
pass
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self._loop)
if n == 0:
future.set_result(b'')
else:
try:
res = self._fileobj.read(n)
except (BlockingIOError, InterruptedError):
if n < 0:
self._rbuffer.clear()
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, self.DEFAULT_BLOCK_SIZE, n)
else:
self._rbuffer.clear()
read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, read_block_size, n)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(res)
return future
def _write_ready(self, future, data, written):
if future.cancelled():
self._loop.remove_writer(self._fileobj.fileno())
return
try:
res = self._fileobj.write(data)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_writer(self._fileobj.fileno())
future.set_exception(exc)
return
if res < len(data):
data = data[res:]
self._loop.add_writer(self._fileobj.fileno(),
self._write_ready,
future, data, written + res)
else:
self._loop.remove_writer(self._fileobj.fileno())
future.set_result(written + res)
@asyncio.coroutine
def write(self, data):
future = asyncio.Future(loop=self._loop)
if len(data) == 0:
future.set_result(0)
else:
try:
res = self._fileobj.write(data)
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._fileobj.fileno(),
self._write_ready,
future, data, 0)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(res)
return future
def stat(self):
return os.stat(self._fileobj.fileno(), follow_symlinks=True)
def close(self):
self._loop.remove_reader(self._fileobj.fileno())
self._loop.remove_writer(self._fileobj.fileno())
self._fileobj.close()
考虑使用aioserial。
这是一个例子:
import aioserial
import asyncio
async def read_and_print(aioserial_instance: aioserial.AioSerial):
while True:
print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)
aioserial_com1: aioserial.AioSerial = aioserial.AioSerial(port='COM1')
asyncio.run(read_and_print(aioserial_com1))
对版主,
答案与此类似,但不重复。
这是我在异步串行端口上的尝试。此接口允许您将 serial.Serial 实例包装到 AIOSerial 类中,然后您可以执行await AIOSerial.readline()
且await AIOSerial.write(data)
不必使用 asyncio.Protocol() 样式的回调。
import asyncio
import sys
import serial
class AIOSerial:
def __init__(self, serial, ioloop=None):
self._serial = serial
# Asynchronous I/O requires non-blocking devices
self._serial.timeout = 0
self._serial.write_timeout = 0
if ioloop is not None:
self.loop = ioloop
else:
self.loop = asyncio.get_event_loop()
self.loop.add_reader(self._serial.fd, self._on_read)
self._rbuf = b''
self._rbytes = 0
self._wbuf = b''
self._rfuture = None
self._delimiter = None
def _on_read(self):
data = self._serial.read(4096)
self._rbuf += data
self._rbytes = len(self._rbuf)
self._check_pending_read()
def _on_write(self):
written = self._serial.write(self._wbuf)
self._wbuf = self._wbuf[written:]
if not self._wbuf:
self.loop.remove_writer(self._serial.fd)
def _check_pending_read(self):
future = self._rfuture
if future is not None:
# get data from buffer
pos = self._rbuf.find(self._delimiter)
if pos > -1:
ret = self._rbuf[:(pos+len(self._delimiter))]
self._rbuf = self._rbuf[(pos+len(self._delimiter)):]
self._delimiter = self._rfuture = None
future.set_result(ret)
return future
async def read_until(self, delimiter=b'\n'):
while self._delimiter:
await self._rfuture
self._delimiter = delimiter
self._rfuture = asyncio.Future()
#future = self._check_pending_read()
return await self._rfuture
async def readline(self):
return await self.read_until()
async def write(self, data):
need_add_writer = not self._wbuf
self._wbuf = self._wbuf + data
if need_add_writer:
self.loop.add_writer(self._serial.fd, self._on_write)
return len(data)
示例用法:
async def go_serial():
ser = serial.Serial(sys.argv[1], 9600) #, rtscts=True, dsrdtr=True)
print(ser)
aser = AIOSerial(ser)
written = await aser.write(b'test 1\n')
print('written', written)
data = await aser.readline()
print('got from readline', data)
while True:
await aser.write(b'.\n')
data = await aser.readline()
print('GOT!', data)
await asyncio.sleep(2.78)
async def main():
for n in range(120):
await asyncio.sleep(1)
print('n=%d' % n)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(go_serial())
loop.run_until_complete(main())
这将设置串行端口和两个异步任务:go_serial 和 main。Main 只运行了 120 秒,然后循环退出。go_serial 写入和读取串行端口,期望对发送的每一行进行回复。
然后使用await aser.write(b'blah')
and await aser.readline()
(或者await aser.read_until(b'\r\n')
如果您想要不同的分隔符)完成对串行端口的读取和写入。
请注意,它并没有真正准备好生产,因为人们希望对缓冲区的数量有一些限制。
为了测试这一点,我使用以下脚本模拟了一个串行端口,该脚本输出创建的 pty 的名称,然后是上面示例的参数。
#!/usr/bin/python3
import fcntl
import time
import os
import errno
import pty
chars = []
ser, s = pty.openpty()
oldflags = fcntl.fcntl(ser, fcntl.F_GETFL)
# make the PTY non-blocking
fcntl.fcntl(ser, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
print('Created: %s' % os.ttyname(s))
while True:
time.sleep(0.1)
c = None
try:
c = os.read(ser, 10)
except OSError as err:
if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
c = None
else:
raise
if c:
chars.append(c)
data = b''.join(chars)
if b'\n' in data:
one, data = data.split(b'\n', 1)
b = b'%.6f\n' % time.time()
os.write(ser, b)
print(one)
chars = [data]