0

基本上我现在应该测试 modbus 功能,无论出于何种原因,modbus 无法在第二次调用时读取寄存器,例如:函数:

# Modbus functions
def read_system_uptime(c, ssh):
    try:
        # Get uptime with ubus
        ssh_result = str(exec_ssh("ubus call system info", ssh))
        ssh_obj = json.loads(ssh_result)
        uptime = str(ssh_obj['uptime'])

        regs = c.read_holding_registers(1, 2)
        if (regs != None and regs[0] == 0):
            result = check_match(uptime, regs[1])
            logger("System uptime", uptime, regs[1], result)
        elif (regs != None and regs[0] != 0):
            #Check results
            temp = regs[0] * 65536 + regs[1]
            result = check_match(uptime, temp)
            logger("System uptime", uptime, temp, result)
        else:
            error_logger("System hostname", "Could not read register value")
    except Exception as error:
        error_logger("system uptime", error)
def read_mobile_strength(c, ssh):
    try:
        regs = c.read_holding_registers(3, 2)
        if regs != None:
            bits = decimal_To_Binary(regs[1])
            a = str(binary_To_Complement(bits)).strip()

            #Check results
            gsmctl = str(exec_ssh("gsmctl -q", ssh)).strip()
            result = check_match(gsmctl, a)
            logger("Mobile signal strength", gsmctl, a, result)
        else:
            error_logger("Mobile signal strength", "Could not read register value")
    except Exception as error:
        error_logger("Mobile signal strength", error)
def read_system_hostname(c, ssh):
    try:
        regs = c.read_holding_registers(7, 16)
        if regs != None:
            a = str(dec_to_text(regs)).strip()

            #Check results
            name = str(exec_ssh("uci get system.system.hostname", ssh)).strip()
            result = check_match(name, a)
            logger("System hostname", name, a, result)
        else:
            error_logger("System hostname", "Could not read register value")

    except Exception as error:
        error_logger("System hostname", error)

主要的:

# Main fuction
def main():
    #router_ip, router_port, router_username, router_password = run_args(sys.argv[1:])
    ssh = paramiko.SSHClient()
    if ssh_connect(ssh, router_ip, router_port, router_username, router_password) != False:
        modbus = modbus_start(router_ip, modbus_port, modbus_id)
        if modbus != False:
            read_system_hostname(modbus, ssh)
            read_system_uptime(modbus, ssh)
            read_mobile_strength(modbus, ssh)


# Run main()
if __name__ == '__main__':
    main()

如果我像这样运行代码,我将无法获得系统正常运行时间的价值。

        if modbus != False:
            read_system_hostname(modbus, ssh)
            read_mobile_strength(modbus, ssh)
            read_system_uptime(modbus, ssh)

现在我不会得到移动强度值,每隔一个函数就会发生这种情况,我可以移动这些函数,比如将它们移动到第一个位置,它会起作用,所以这不是函数错误,我猜它是 pyModbusTCP 甚至可能是我正在使用故障的设备,但我不确定。有人知道如何处理吗?因为我以前从未使用过这个库。我是否必须清除我读取的数据或类似的东西?目前我有一个愚蠢的解决方案,只是放置一个“假”函数来读取寄存器并且什么都不做并将它放在其他之间,所以重要的会起作用,但如果可能的话我想避免这种情况。

编辑: Modbus 启动功能:

def modbus_start(ip, port, device):
    try:
        client = ModbusClient(host=ip, port=port, unit_id=device, auto_open=True)
        print("Modbus client start: Success")
        logging.info("Started Modbus client")
        return client
    except Exception as error:
        print("ERROR: Failed to start Modbus client: ")
        print(error)
        logging.error("Failed to start Modbus client: %s", error)
        return False
4

0 回答 0