我还是新手,所以如果我犯了任何错误或我的问题不够具体,我深表歉意,请纠正我!我正在开发一个程序,用于通过串行连接控制激光实验室中的两个 Zaber TLSR300B 直线运动轨道。我使用 pyserial 与他们通信没有问题,我已经能够写入和读取数据了。我的问题更多是关于如何构建我的程序以实现所需的功能(我很少接受正式的编程培训)。
我希望程序做的是提供许多方法,允许用户向轨道发送命令,然后返回轨道响应的内容。但是,我不希望程序在检查响应时挂起,所以我不能只用写命令后跟读命令来编写方法。对于某些命令,轨道会立即响应(返回 ID、返回当前位置等),但对于其他命令,轨道会在执行请求的操作后响应(移动到位置、移动回家等)。例如,如果发送 move_absolute 命令,则轨道将移动到所需位置,然后发送带有新位置的回复。此外,可以使用物理旋钮手动移动轨道,这会使它们在移动时不断发送当前位置(这就是我需要连续读取串行数据的原因)。
我在下面附加了代码,当有数据要读取并将其放入队列时,我实现了一个线程以从串行端口读取数据。然后另一个线程从队列中取出项目并处理它们,修改存储每个轨道当前属性的 ZaberTLSR300B 对象的适当值。底部的方法是我希望用户能够调用的一些方法。他们只需将命令写入串行端口,然后由连续运行的读取线程获取和处理响应。
我遇到的主要问题是这些方法不知道轨道响应什么,因此它们无法返回回复,我想不出解决这个问题的方法。所以我不能写这样的东西:
newPosition = TrackManager.move_absolute(track 1, 5000)
或者
currentPosition = TrackManager.return_current_position(track 2)
这最终是我希望用户能够做的(也是因为我想在未来在此之上实现某种 GUI)。
无论如何,我是否以正确的方式解决这个问题?或者有没有更简洁的方法来实现这种行为?如果需要,我愿意完全重写一切!
谢谢,如果有什么不清楚的地方请告诉我。
Zaber TLSR300B 手册(如果需要):http ://www.zaber.com/wiki/Manuals/T-LSR
代码:
轨道经理课程
import serial
import threading
import struct
import time
from collections import deque
from zaberdevices import ZaberTLSR300B
class TrackManager:
def __init__(self):
self.serial = serial.Serial("COM5",9600,8,'N',timeout=None)
self.track1 = ZaberTLSR300B(1, self.serial)
self.track2 = ZaberTLSR300B(2, self.serial)
self.trackList = [self.track1, self.track2]
self.serialQueue = deque()
self.runThread1 = True
self.thread1 = threading.Thread(target=self.workerThread1)
self.thread1.start()
self.runThread2 = True
self.thread2 = threading.Thread(target=self.workerThread2)
self.thread2.start()
def workerThread1(self):
while self.runThread1 == True:
while self.serial.inWaiting() != 0:
bytes = self.serial.read(6)
self.serialQueue.append(struct.unpack('<BBl', bytes))
def workerThread2(self):
while self.runThread2 == True:
try:
reply = self.serialQueue.popleft()
for track in self.trackList:
if track.trackNumber == reply[0]:
self.handleReply(track, reply)
except:
continue
def handleReply(self, track, reply):
if reply[1] == 10:
track.update_position(reply[2])
elif reply[1] == 16:
track.storedPositions[address] = track.position
elif reply[1] == 20:
track.update_position(reply[2])
elif reply[1] == 21:
track.update_position(reply[2])
elif reply[1] == 60:
track.update_position(reply[2])
def move_absolute(self, trackNumber, position):
packet = struct.pack("<BBl", trackNumber, 20, position)
self.serial.write(packet)
def move_relative(self, trackNumber, distance):
packet = struct.pack("<BBl", trackNumber, 21, distance)
self.serial.write(packet)
def return_current_position(self, trackNumber):
packet = struct.pack("<BBl", trackNumber, 60, 0)
self.serial.write(packet)
def return_stored_position(self, trackNumber, address):
packet = struct.pack("<BBl", trackNumber, 17, address)
self.serial.write(packet)
def store_current_position(self, trackNumber, address):
packet = struct.pack("<BBl", trackNumber, 16, address)
self.serial.write(packet)
zaberdevices.py ZaberTLSR300B 类
class ZaberTLSR300B:
def __init__(self, trackNumber, serial):
self.trackNumber = trackNumber
self.serial = serial
self.position = None
self.storedPositions = []
def update_position(self, position):
self.position = position