11

我正在尝试,到目前为止,未能使用 python asyncio 访问串行端口。

我非常感谢在简单 fd 上使用新的 python 异步框架的任何提示。

干杯!

詹姆士

4

8 回答 8

5

这是使用 FD 的另一种方式

import asyncio
import serial

s = serial.Serial('/dev/pts/13', 9600)


def test_serial():
    '''
    read a line and print.
    '''
    text = ""
    msg = s.read().decode()
    while (msg != '\n'):
        text += msg
        msg = s.read().decode()
    print(text)
    loop.call_soon(s.write, "ok\n".encode())

loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()
于 2015-01-13T20:34:48.287 回答
4

pySerial 正在获得直接asyncio支持。它现在处于实验状态,但对我来说正在按预期工作。

取自文档的示例:

class Output(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print('port opened', transport)
        transport.serial.rts = False
        transport.write(b'hello world\n')

    def data_received(self, data):
        print('data received', repr(data))
        self.transport.close()

    def connection_lost(self, exc):
        print('port closed')
        asyncio.get_event_loop().stop()

loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
于 2016-09-06T11:49:41.177 回答
4

这是一个使用pyserial-asyncio的工作示例:

from asyncio import get_event_loop
from serial_asyncio import open_serial_connection

async def run():
    reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
    while True:
        line = await reader.readline()
        print(str(line, 'utf-8'))

loop = get_event_loop()
loop.run_until_complete(run())
于 2020-01-29T17:53:25.920 回答
3

另一种选择是使用阻塞调用编写所有串行内容,然后使用 run_in_executor 在不同的线程中运行它:

import asyncio
import concurrent

from serial import Serial

# Normal serial blocking reads
# This could also do any processing required on the data
def get_byte():
    return s.read(1)

# Runs blocking function in executor, yielding the result
@asyncio.coroutine
def get_byte_async():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        res = yield from loop.run_in_executor(executor, get_byte)
        return res

def get_and_print():
    b = yield from get_byte_async()
    print (b)

s = Serial("COM11", 19200, timeout=10)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print())
于 2015-01-13T17:13:26.390 回答
1

感谢大家的建议,最后,我以稍微不同的方式解决了问题,并在 asyncio 中使用了支持良好的套接字连接,但随后使用 ser2net ( http://sourceforge.net/projects/ser2net/ ) 访问串行端口。

这需要大约 10 秒来配置,这意味着 python 代码现在也可以处理访问远程串行端口。

于 2014-05-26T09:56:04.507 回答
1

前阵子写了一个AsyncFile类,接口比低级协议简单。

原代码在这里:https ://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20

class AsyncFile:
    """A local file class for use with the ``asyncio`` module.
    ``loop`` should be the event loop in use.
    ``filename`` is the name of the file to be opened.
    ``fileobj`` should be a regular file-like object.
    ``mode`` is the open mode accepted by built-in function ``open``.
    If ``filename`` is specified, the named file will be opened. And if
    ``fileobj`` is specified, that file object will be used directly. You
    cannot specify both ``filename`` and ``fileobj``.
    This class can be used in a ``with`` statement.
    """

    DEFAULT_BLOCK_SIZE = 8192

    def __init__(self, loop=None, filename=None,
                 fileobj=None, mode='rb'):
        if (filename is None and fileobj is None) or \
                (filename is not None and fileobj is not None):
            raise RuntimeError('Confilicting arguments')

        if filename is not None:
            if 'b' not in mode:
                raise RuntimeError('Only binary mode is supported')
            fileobj = open(filename, mode=mode)
        elif 'b' not in fileobj.mode:
            raise RuntimeError('Only binary mode is supported')

        fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
        if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
            if filename is not None:
                fileobj.close()
            errcode = ctypes.get_errno()
            raise OSError((errcode, errno.errorcode[errcode]))

        self._fileobj = fileobj

        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop
        self._rbuffer = bytearray()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def fileno(self):
        return self._fileobj.fileno()

    def seek(self, offset, whence=None):
        if whence is None:
            return self._fileobj.seek(offset)
        else:
            return self._fileobj.seek(offset, whence)

    def tell(self):
        return self._fileobj.tell()

    def _read_ready(self, future, n, total):
        if future.cancelled():
            self._loop.remove_reader(self._fileobj.fileno())
            return

        try:
            res = self._fileobj.read(n)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as exc:
            self._loop.remove_reader(self._fileobj.fileno())
            future.set_exception(exc)
            return

        if not res:     # EOF
            self._loop.remove_reader(self._fileobj.fileno())
            future.set_result(bytes(self._rbuffer))
            return

        self._rbuffer.extend(res)

        if total > 0:
            more_to_go = total - len(self._rbuffer)
            if more_to_go <= 0:  # enough
                res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
                self._loop.remove_reader(self._fileobj.fileno())
                future.set_result(bytes(res))
            else:
                more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
                self._loop.add_reader(self._fileobj.fileno(),
                                      self._read_ready,
                                      future, more_to_go, total)
        else:   # total < 0
            # This callback is still registered with total < 0,
            # nothing to do here
            pass

    @asyncio.coroutine
    def read(self, n=-1):
        future = asyncio.Future(loop=self._loop)

        if n == 0:
            future.set_result(b'')
        else:
            try:
                res = self._fileobj.read(n)
            except (BlockingIOError, InterruptedError):
                if n < 0:
                    self._rbuffer.clear()
                    self._loop.add_reader(self._fileobj.fileno(),
                                          self._read_ready,
                                          future, self.DEFAULT_BLOCK_SIZE, n)
                else:
                    self._rbuffer.clear()
                    read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
                    self._loop.add_reader(self._fileobj.fileno(),
                                          self._read_ready,
                                          future, read_block_size, n)
            except Exception as exc:
                future.set_exception(exc)
            else:
                future.set_result(res)

        return future

    def _write_ready(self, future, data, written):
        if future.cancelled():
            self._loop.remove_writer(self._fileobj.fileno())
            return

        try:
            res = self._fileobj.write(data)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as exc:
            self._loop.remove_writer(self._fileobj.fileno())
            future.set_exception(exc)
            return

        if res < len(data):
            data = data[res:]
            self._loop.add_writer(self._fileobj.fileno(),
                                  self._write_ready,
                                  future, data, written + res)
        else:
            self._loop.remove_writer(self._fileobj.fileno())
            future.set_result(written + res)

    @asyncio.coroutine
    def write(self, data):
        future = asyncio.Future(loop=self._loop)

        if len(data) == 0:
            future.set_result(0)
        else:
            try:
                res = self._fileobj.write(data)
            except (BlockingIOError, InterruptedError):
                self._loop.add_writer(self._fileobj.fileno(),
                                      self._write_ready,
                                      future, data, 0)
            except Exception as exc:
                future.set_exception(exc)
            else:
                future.set_result(res)

        return future

    def stat(self):
        return os.stat(self._fileobj.fileno(), follow_symlinks=True)

    def close(self):
        self._loop.remove_reader(self._fileobj.fileno())
        self._loop.remove_writer(self._fileobj.fileno())
        self._fileobj.close()
于 2015-05-20T03:38:24.307 回答
0

考虑使用aioserial

这是一个例子:

import aioserial
import asyncio


async def read_and_print(aioserial_instance: aioserial.AioSerial):
    while True:
        print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)


aioserial_com1: aioserial.AioSerial = aioserial.AioSerial(port='COM1')

asyncio.run(read_and_print(aioserial_com1))

对版主,

答案与此类似,不重复。

于 2018-09-25T04:00:01.223 回答
0

这是我在异步串行端口上的尝试。此接口允许您将 serial.Serial 实例包装到 AIOSerial 类中,然后您可以执行await AIOSerial.readline()await AIOSerial.write(data)不必使用 asyncio.Protocol() 样式的回调。

import asyncio
import sys

import serial


class AIOSerial:
    def __init__(self, serial, ioloop=None):
        self._serial = serial
        # Asynchronous I/O requires non-blocking devices
        self._serial.timeout = 0
        self._serial.write_timeout = 0

        if ioloop is not None:
            self.loop = ioloop
        else:
            self.loop = asyncio.get_event_loop()
        self.loop.add_reader(self._serial.fd, self._on_read)
        self._rbuf = b''
        self._rbytes = 0
        self._wbuf = b''
        self._rfuture = None
        self._delimiter = None

    def _on_read(self):
        data = self._serial.read(4096)
        self._rbuf += data
        self._rbytes = len(self._rbuf)
        self._check_pending_read()

    def _on_write(self):
        written = self._serial.write(self._wbuf)
        self._wbuf = self._wbuf[written:]
        if not self._wbuf:
            self.loop.remove_writer(self._serial.fd)

    def _check_pending_read(self):
        future = self._rfuture
        if future is not None:
            # get data from buffer
            pos = self._rbuf.find(self._delimiter)
            if pos > -1:
                ret = self._rbuf[:(pos+len(self._delimiter))]
                self._rbuf = self._rbuf[(pos+len(self._delimiter)):]
                self._delimiter = self._rfuture = None
                future.set_result(ret)
                return future

    async def read_until(self, delimiter=b'\n'):
        while self._delimiter:
            await self._rfuture

        self._delimiter = delimiter
        self._rfuture = asyncio.Future()
        #future = self._check_pending_read()
        return await self._rfuture

    async def readline(self):
        return await self.read_until()

    async def write(self, data):
        need_add_writer = not self._wbuf

        self._wbuf = self._wbuf + data
        if need_add_writer:
            self.loop.add_writer(self._serial.fd, self._on_write)
        return len(data)

示例用法:

async def go_serial():
    ser = serial.Serial(sys.argv[1], 9600) #, rtscts=True, dsrdtr=True)
    print(ser)
    aser = AIOSerial(ser)

    written = await aser.write(b'test 1\n')
    print('written', written)
    data = await aser.readline()
    print('got from readline', data)

    while True:
        await aser.write(b'.\n')
        data = await aser.readline()
        print('GOT!', data)
        await asyncio.sleep(2.78)

async def main():
    for n in range(120):
        await asyncio.sleep(1)
        print('n=%d' % n)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(go_serial())
    loop.run_until_complete(main())

这将设置串行端口和两个异步任务:go_serial 和 main。Main 只运行了 120 秒,然后循环退出。go_serial 写入和读取串行端口,期望对发送的每一行进行回复。

然后使用await aser.write(b'blah')and await aser.readline()(或者await aser.read_until(b'\r\n')如果您想要不同的分隔符)完成对串行端口的读取和写入。

请注意,它并没有真正准备好生产,因为人们希望对缓冲区的数量有一些限制。

为了测试这一点,我使用以下脚本模拟了一个串行端口,该脚本输出创建的 pty 的名称,然后是上面示例的参数。

#!/usr/bin/python3
import fcntl
import time
import os
import errno
import pty


chars = []
ser, s = pty.openpty()
oldflags = fcntl.fcntl(ser, fcntl.F_GETFL)
# make the PTY non-blocking
fcntl.fcntl(ser, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)

print('Created: %s' % os.ttyname(s))


while True:
    time.sleep(0.1)
    c = None
    try:
        c = os.read(ser, 10)
    except OSError as err:
        if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
            c = None
        else:
            raise
    if c:
        chars.append(c)

    data = b''.join(chars)
    if b'\n' in data:
        one, data = data.split(b'\n', 1)
        b = b'%.6f\n' % time.time()
        os.write(ser, b)
        print(one)
    chars = [data]
于 2018-06-09T13:52:55.770 回答