我正在尝试在mininet中编写一些自动化测试,其目的是:
- 创建简单拓扑
- 在服务器和客户端之间运行一些通信
- 收集一些数据并在一段时间后停止
我可以通过运行 miniedit 配置并在连接到主机的两个终端上手动启动服务器和客户端脚本来运行这些测试,这种情况可以正常工作。
但是,我正在尝试使用 Python API 执行此操作,但在交换了几个数据包后,我在主机上启动的脚本停止正常工作。正如我之前所说,如果我在两个终端上手动启动它们,这两个脚本可以正常工作,所以我认为可能还有另一个问题。我最初认为这可能是我用来实现服务器和客户端脚本(pyModbus和pyModbusTCP)的模块上的一个错误,但事实上它们在手动启动时没有问题,这让我认为我可能做错了什么我通过 Python API 启动脚本的方式。
这是启动模拟的脚本:
#!/usr/bin/python
from mininet.topo import Topo
from mininet.net import Mininet
from mininet.util import dumpNodeConnections
from mininet.log import setLogLevel
# The first argument is the project path
clientPath = sys.argv[1] + 'test/integration/test_training/client.py'
serverPath = sys.argv[1] + 'test/integration/test_training/server.py'
class SingleSwitchTopo(Topo):
def build(self, n=2):
switch = self.addSwitch('s1')
for h in range(n):
host = self.addHost('h%s' % (h + 1))
self.addLink(host, switch)
def test():
topo = SingleSwitchTopo(n=2)
net = Mininet(topo)
net.start()
hosts = net.hosts
server = hosts[0]
client = hosts[1]
# Start the server process in the background
server.sendCmd('python3 ' + serverPath)
# Start the client and print the output
client.cmdPrint('python3 ' + clientPath)
# Here there would be the rest of the test
if __name__ == '__main__':
setLogLevel('info')
test()
启动此脚本时,日志显示客户端在一段时间内正确接收了响应。但是,在某个时刻,服务器总是会开始发送空响应,当我在 miniedit 配置上手动运行脚本时,这种情况永远不会发生。
这些是服务器和客户端脚本(这些是作为默认示例给出的简单 pyModbus 和 pyModbusTCP 脚本):
服务器
该服务器基本上会定期更新一些值并侦听查询。
#!/usr/bin/python3
from pymodbus.server.async import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from twisted.internet.task import LoopingCall
server_ip = '10.0.0.1'
server_port = 502
start_address = 1
final_address = [1]*100
#LOG
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
init_val = 0
# updating server function
def updating_server(a):
log.debug("updating PLC registers..")
context = a[0]
register = 4
slave_id = 0x00
address = 0
values = context[slave_id].getValues(register, address, count=5)
values = [v + 1 for v in values]
log.debug("new values: " + str(values))
context[slave_id].setValues(register, address, values)
store = ModbusSlaveContext(
di = ModbusSequentialDataBlock(0, [1]*100),
co = ModbusSequentialDataBlock(0, [2]*100),
hr = ModbusSequentialDataBlock(0, [3]*100),
ir = ModbusSequentialDataBlock(0, [init_val]*100))
context = ModbusServerContext(slaves=store, single=True)
identity = ModbusDeviceIdentification()
identity.VendorName = 'Pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'Pymodbus Server'
identity.ModelName = 'Pymodbus Server'
identity.MajorMinorRevision = '1.0'
#server run and update
time = 0.5
loop = LoopingCall(f=updating_server, a=(context,))
loop.start(time, now=False)
StartTcpServer(context, identity=identity, address=(server_ip, server_port))
客户
此客户端连接到服务器并定期发送查询。
#!/usr/bin/python3
from pyModbusTCP.client import ModbusClient
import time
server_ip = '10.0.0.1'
server_port = 502
c = ModbusClient(host=server_ip, port=server_port, auto_open=True)
c.debug(True)
while True:
if c.is_open():
# read 5 input registers starting at address 0
values = c.read_input_registers(0, 5)
print(values)
time.sleep(0.1)
else:
c.open()
time.sleep(0.5)
我已经在这个问题上敲了一段时间,但找不到解决方案。我错过了什么吗?