2

我是新手。我现在的项目是当当前端决定启动modbus服务时,我会为modbus服务创建一个进程。然后在父进程中获取值,通过ZeroMQPUB/SUB传值,我现在想在modbus服务进程中更新modbus寄存器的值。

我尝试了 update_server.py 提供的 pymodbus 提到的方法twisted.internet.task.LoopingCall()更新了寄存器的值,但这将使我无法通过客户端连接到我的服务器。我不知道为什么?

用于LoopingCall()建立服务器,客户端连接时的日志。

/home/xiaohe/图片/Selection_050.png

然后我尝试将uploading和startTCPserver都放在async循环中,但是启动后第一次才进入update,然后就没有进入。

目前,我正在使用LoopingCall()来处理更新,但我认为这不是一个好方法。

这是我初始化 PUB 和所有可以读取标签的标签的代码。

from loop import cycle
import asyncio
from multiprocessing import Process
from persistence import models as pmodels
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
from zmq.asyncio import Context
from common import logging
from server.modbustcp import i3ot_tcp as sertcp
import common.config as cfg
import communication.admin as ca
import json
import os
import signal
from datetime import datetime
from server.opcuaserver import i3ot_opc as seropc

async def main():
    future = []
    task = []
    global readers, readers_old, task_flag
    logger.debug("connecting to database and create table.")
    pmodels.connect_create()
    logger.debug("init read all address to create loop task.")
    cycle.init_readers(readers)
    ctx = Context()
    publisher = ctx.socket(zmq.PUB)
    logger.debug("init publish [%s].", addrs)
    publisher.bind(addrs)
    readers_old = readers.copy()
    for reader in readers:
        task.append(asyncio.ensure_future(
            cycle.run_readers(readers[reader], publisher)))
    if not len(task):
        task_flag = True
    logger.debug("task length [%s - %s].", len(task), task)
    opcua_server = LocalServer(seropc.opc_server, "opcua")
    future = [
        start_get_all_address(),
        start_api(),
        create_address_loop(publisher, task),
        modbus_server(),
        opcua_server.run()
    ]
    logger.debug("run loop...")
    await asyncio.gather(*future)

asyncio.run(main(), debug=False)

这是为了获取设备标签值并发布它。

async def run_readers(reader, publisher):
    while True:
        await reader.run(publisher)


class DataReader:
    def __init__(self, freq, clients):
        self._addresses = []
        self._frequency = freq
        self._stop_signal = False
        self._clients = clients
        self.signature = sign_data_reader(self._addresses)

    async def run(self, publisher):
        while not self._stop_signal:
            for addr in self._addresses:
                await addr.read()
                data = {
                    "type": "value",
                    "data": addr._final_value
                }
                publisher.send_pyobj(data)
                if addr._status:
                    if addr.alarm_log:
                        return_alarm_log = pbasic.get_log_by_time(addr.alarm_log['date'])
                        if return_alarm_log:
                            data = {
                                "type": "alarm",
                                "data": return_alarm_log
                            }
                            publisher.send_pyobj(data)
                    self.data_send(addr)
                    logger.debug("run send data")
            await asyncio.sleep(int(self._frequency))

    def stop(self):
        self._stop_signal = True

modbus 服务器导入

from common import logging
from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from persistence import service as pservice
from persistence import basic as pbasic
import zmq
import common.config as cfg
import struct
import os
import signal
from datetime import datetime
from twisted.internet.task import LoopingCall
def updating_writer(a):
    logger.info("in updates of modbus tcp server.")
    context = a[0]
    # while True:
    if check_pid(os.getppid()) is False:
        os.kill(os.getpid(), signal.SIGKILL)
    url = ("ipc://{}" .format(cfg.get('ipc', 'pubsub')))
    logger.debug("connecting to [%s].", url)
    ctx = zmq.Context()
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect(url)
    subscriber.setsockopt(zmq.SUBSCRIBE, b"")
    slave_id = 0x00
    msg = subscriber.recv_pyobj()
    logger.debug("updates.")
    if msg['data']['data_type'] in modbus_server_type and msg['type'] == 'value':
        addr = pservice.get_mbaddress_to_write_value(msg['data']['id'])
        if addr:
            logger.debug(
                "local address and length [%s - %s].",
                addr['local_address'], addr['length'])
            values = get_value_by_type(msg['data']['data_type'], msg['data']['final'])
            logger.debug("modbus server updates values [%s].", values)
            register = get_register(addr['type'])
            logger.debug(
                "register [%d] local address [%d] and value [%s].",
                register, addr['local_address'], values)
            context[slave_id].setValues(register, addr['local_address'], values)
        # time.sleep(1)
def tcp_server(pid):
    logger.info("Get server configure and device's tags.")
    st = datetime.now()
    data = get_servie_and_all_tags()
    if data:
        logger.debug("register address space.")
        register_address_space(data)
    else:
        logger.debug("no data to create address space.")

    length = register_number()
    store = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [0] * length),
        co=ModbusSequentialDataBlock(0, [0] * length),
        hr=ModbusSequentialDataBlock(0, [0] * length),
        ir=ModbusSequentialDataBlock(0, [0] * length)
    )
    context = ModbusServerContext(slaves=store, single=True)

    identity = ModbusDeviceIdentification()
    identity.VendorName = 'pymodbus'
    identity.ProductCode = 'PM'
    identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
    identity.ProductName = 'pymodbus Server'
    identity.ModelName = 'pymodbus Server'
    identity.MajorMinorRevision = '2.2.0'

    # ----------------------------------------------------------------------- #
    # set loop call and run server
    # ----------------------------------------------------------------------- #

    try:
        logger.debug("thread start.")
        loop = LoopingCall(updating_writer, (context, ))
        loop.start(1, now=False)

        # process = Process(target=updating_writer, args=(context, os.getpid(),))
        # process.start()

        address = (data['tcp_ip'], int(data['tcp_port']))
        nt = datetime.now() - st
        logger.info("modbus tcp server begin has used [%s] s.", nt.seconds)
        pservice.write_server_status_by_type('modbus', 'running')
        StartTcpServer(context, identity=identity, address=address)
    except Exception as e:
        logger.debug("modbus server start error [%s].", e)
        pservice.write_server_status_by_type('modbus', 'closed')

这是我为 modbus 进程创建的代码。

def process_stop(p_to_stop):
    global ptcp_flag
    pid = p_to_stop.pid
    os.kill(pid, signal.SIGKILL)
    logger.debug("process has closed.")
    ptcp_flag = False


def ptcp_create():
    global ptcp_flag
    pid = os.getpid()
    logger.debug("sentry pid [%s].", pid)
    ptcp = Process(target=sertcp.tcp_server, args=(pid,))
    ptcp_flag = True
    return ptcp


async def modbus_server():
    logger.debug("get mosbuc server's status.")
    global ptcp_flag
    name = 'modbus'
    while True:
        ser = pservice.get_server_status_by_name(name)
        if ser['enabled']:
            if ser['tcp_status'] == 'closed' or ser['tcp_status'] == 'running':
                tags = pbasic.get_tag_by_name(name)
                if len(tags):
                    if ptcp_flag is False:
                        logger.debug("[%s] status [%s].", ser['tcp_name'], ptcp_flag)
                        ptcp = ptcp_create()
                        ptcp.start()
                    else:
                        logger.debug("modbus server is running ...")
                else:
                    logger.debug("no address to create [%s] server.", ser['tcp_name'])
                    pservice.write_server_status_by_type(name, "closed")
            else:
                logger.debug("[%s] server is running ...", name)
        else:
            if ptcp_flag:
                process_stop(ptcp)
                logger.debug("[%s] has been closed.", ser['tcp_name'])
                pservice.write_server_status_by_type(name, "closed")
            logger.debug("[%s] server not allowed to running.", name)
        await asyncio.sleep(5)

这是 Docker 运行的命令。

/usr/bin/docker run --privileged --network host --name scout-sentry -v /etc/scout.cfg:/etc/scout.cfg -v /var/run:/var/run -v /sys:/sys -v /dev/mem:/dev/mem -v /var/lib/scout:/data --rm shulian/scout-sentry

这是 Docker 配置文件/etc/scout.cfg

[scout]
mode=product

[logging]
level=DEBUG

[db]
path=/data

[ipc]
cs=/var/run/scout-cs.sock
pubsub=/var/run/pubsub.sock

我希望能够在有来自 ZeroMQ 的消息时触发 modbus 值更新功能,并且它将被正确更新。

4

1 回答 1

0

让我们从内而外开始。

...这将使我无法使用客户端连接到我的服务器。我不知道为什么?

ZeroMQ 是一个智能的无代理消息/信令中间件,或者更好的智能消息平台。如果您对 ZeroMQ 架构中的“Zen-of-Zero”艺术不那么熟悉,您可能希望在不到 5 秒的时间内从 ZeroMQ 原则开始,然后再深入了解更多细节。


依据:

从 ZeroMQ 借来的 Scalable Formal Communication ArchetypePUB/SUB并不是零成本的。

这意味着每个基础设施设置(在PUB侧和SUB侧)都需要一些相当长的时间,并且没有人可以确定 AccessNode 配置何时导致 RTO 状态。所以SUB-side (如上所述)应该是一个永久实体,或者用户不应期望在 atwisted.internet.task.LoopingCall()恢复后在零时间内使其成为 RTO。

首选方式:实例化您的(半)持久性zmq.Context(),对其进行配置以便根据<aContextInstance>.socket( zmq.PUB )需要提供服务,最低限度的保护设置是<aSocketInstance>.setsockopt( zmq.LINGER, 0 )所有传输/排队/安全处理细节,外系统向您的代码公开(白名单和安全)规模和资源保护是最有可能的候选者——但细节与您的应用程序域以及您愿意面对准备处理它们的风险有关)。

ZeroMQ 强烈反对共享(零共享)<aContextInstance>.socket()-instances,但zmq.Context()-instance 可以共享/重用(参考ZeroMQ 原则...)/传递给多个线程(如果需要)。

所有<aSocketInstance>{.bind()|.connect()}-方法都很昂贵,因此在尝试使用它们的中介通信服务之前,请尝试设置基础设施 AccessPoint(s) 及其应有的错误处理方式。

每个<aSocketInstance>.setsockopt( zmq.SUBSCRIBE, ... )都是昂贵的,因为它可能需要(取决于(本地/远程)版本)一种非本地的分布式行为形式 - 本地端“设置”订阅,但远程端必须“被告知”这样状态更改并根据实际(传播的)状态“实现”操作。而在早期版本中,所有消息都是从PUB-side 发送的,并且所有SUB-side(s) 都被此类数据淹没并留待“过滤”,这些数据将被移动到本地端内部队列中,较新的版本“在 -side 上实现“主题过滤器” PUB,这进一步增加了设置新操作方式的延迟。


接下来是作案手法:如何<aSocketInstance>.recv()得到结果:

在它们的默认 API 状态下,.recv()-methods 是阻塞的,如果没有消息到达,可能会无限阻塞。

解决方案:<aSocket>.recv()通过始终使用其-modes 来避免调用 ZeroMQ -methods 的阻塞形式,zmq.NOBLOCK或者更确切地说,使用可用的 -methods 测试是否存在任何预期消息<aSocket>.poll( zmq.POLLIN, <timeout> ),使用零超时或受控超时。这使您成为决定代码执行流程的主人。不这样做,您会故意让您的代码依赖于事件的外部序列(或不存在),并且您的架构在处理无限阻塞状态(或潜在的无法挽救的多代理的分布式行为活锁或死锁)时容易出现可怕的问题)

避免事件循环的不受控制的交叉繁殖——比如将 ZeroMQ 驱动的循环传递到外部“回调”类似的处理程序或async装饰的代码块中,其中(非)阻塞逻辑的堆栈可能会破坏最初的想法通过使系统进入无法解决的状态,其中事件错过了预期的事件序列,并且活锁是无法挽救的,或者只是第一次通过。

Stacking -code asynciowith twisted- LoopingCall()-s and async/await-decorated code + ZeroMQ blocking.recv() -s 要么是 Filligrane-Precise-Art-of-Truly-a-Zen-Master 的作品,要么是通往地狱的通行证 - 在所有方面真正的禅宗艺术大师:o)

所以,是的,需要复杂的思考——欢迎来到分布式计算领域!

于 2019-08-29T02:36:36.697 回答