李玲玲 发表于 2024-8-4 20:02:17

Python pymodbus类库使用学习总结

实践环境

Python 3.9.13
https://www.python.org/ftp/python/3.9.13/python-3.9.13-amd64.exe
pymodbus-3.6.8-py3-none-any.whl
https://files.pythonhosted.org/packages/35/19/a9d16f74548d6750acf6604fa74c2cd165b5bc955fe021bf5e1fa04acf14/pymodbus-3.6.8-py3-none-any.whl
pyserial-3.5
备注:如果不安装该模块,采用串口通信时(下述代码中 comm = 'serial'时的通信),会报错NameError: name 'serial' is not defined
pip install pyserialVirtual Serial Port Driver 6.9
链接:https://pan.baidu.com/s/15pCeYyOZWSsbEr_rU1e5hg?pwd=vspd
提取码:vspd
代码实践

修改pymodbus\logging.py

路径: PYTHON_HOME\Lib\site-packages\pymodbus\logging.py
修改源码是为了更方便使用类库自带日志打印器
# ...略
class Log:
    """Class to hide logging complexity.

    :meta private:
    """

    # _logger = logging.getLogger(__name__) # commented by shouke
    _logger = logging.getLogger('logger') # added by shouke # 方便在不同模块获取日志打印器

    @classmethod
    def apply_logging_config(cls, level, log_file_name):
      """Apply basic logging configuration."""
      if level == logging.NOTSET:
            level = cls._logger.getEffectiveLevel()
      if isinstance(level, str):
            level = level.upper()
      log_stream_handler = logging.StreamHandler()
      log_formatter = logging.Formatter(
            "%(asctime)s %(levelname)-5s %(module)s:%(lineno)s %(message)s"
      )

      log_stream_handler.setFormatter(log_formatter)
      ##################### commented by shouke #################
      # cls._logger.addHandler(log_stream_handler)
      # if log_file_name:
      #   log_file_handler = logging.FileHandler(log_file_name)
      #   log_file_handler.setFormatter(log_formatter)
      #   cls._logger.addHandler(log_file_handler)
      ##########################################################
      ##################### added by shouke #################
      if not cls._logger.handlers: # 如果不加这个判断,在不同模块都执行pymodbus_apply_logging_config函数时,会导致同一条日志被重复打印
            cls._logger.addHandler(log_stream_handler)
            if log_file_name:
                log_file_handler = logging.FileHandler(log_file_name)
                log_file_handler.setFormatter(log_formatter)
                cls._logger.addHandler(log_file_handler)
      ##########################################################
      cls.setLevel(level)

# ...略
      
pymodbus_apply_logging_config(level=logging.INFO) # 统一日志打印器配置异步服务器和异步客户端实现

异步服务器代码实现

server_async.py
#!/usr/bin/env python3
'''Pymodbus 异步服务器示例

多线程异步服务器的一个示例。
'''

import asyncio

import logging
### 不修改 PYTHON_HOME\Lib\site-packages\pymodbus\logging.py源码的基础上,获取日志打印器相关方法及说明:
# 方法1
# _logger = logging.getLogger(__name__) # 采用该_logger打印的日志看不到
# logging.basicConfig(level=logging.DEBUG) # 不注释上行代码的基础上添加这行代码,日志能打印出来,但是日志所在模块不为当前模块

# 方法2
# _logger = logging.getLogger(__file__)
# logging.basicConfig(level=logging.INFO) #添加这行代码,确保日志能打印出来,但是打印出来的日志所在模块不为当前模块

# 方法3:
# from pymodbus.logging import Log,pymodbus_apply_logging_config
# pymodbus_apply_logging_config(level=logging.INFO) # 如果缺少这行代码,下面async_helper中的日志打印将无法在控制台输出
# _logger = Log._logger

# 修改 PYTHON_HOME\Lib\site-packages\pymodbus\logging.py源码的基础上,获取日志打印器相关方法:
_logger = logging.getLogger('logger')
# _logger.setLevel(logging.INFO)


from pymodbus import __version__ as pymodbus_version
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
    ModbusSparseDataBlock,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server import (
    StartAsyncSerialServer,
    StartAsyncTcpServer,
    StartAsyncTlsServer,
    StartAsyncUdpServer,
)


# hook函数
def server_request_tracer(request, *_addr):
    """跟踪请求

    所有服务器请求在被处理之前都经过此过滤器
    """
    _logger.info(f'---> REQUEST: {request}') # 输出,类似这样: ---> REQUEST: ReadBitRequest(0,1)

def server_response_manipulator( response):
    """操纵响应

    所有服务器响应在发送之前都通过此过滤器
    过滤器返回:
    - 响应,原始或修改后的
    - 跳过编码,发出是否对响应进行编码的信号
    """
    _logger.info(f'---> RESPONSE: {response}') # 输出,类似这样:---> RESPONSE: ReadCoilsResponse(1)
    # response.should_respond = False # 如果让该行代码生效,则客户端收不到服务器响应
    return response, False


class Args:
    comm = 'tcp' # 通讯模式,可选值 tcp、udp serial、tls
    comm_defaults = {
      'tcp': ['socket', 5020],
      # 'tcp': ['rtu', 5020], # 如果采用ModbusRTU协议 则使用这个
      'udp': ['socket', 5020],
      # 'serial': ['rtu', '/dev/ptyp0'], # Linux
      'serial': ['rtu', 'COM1']# Windows(本例中COM1和COM2是成对的)
      'tls': ['tls', 5020]
    }

    framer = comm_defaults # 帧类型
    port = comm_defaults # 站点端口
    host = ''
    baudrate = 9600 # 串行设备波特率 即每秒传输的位数

    # 连续的、无间隙顺序存储数据块(寄存器块)
    datablock = lambda : ModbusSequentialDataBlock(0x01, * 100)# pylint: disable=unnecessary-lambda-assignment

    # 连续的,或者可能有间隙的稀疏、不规则存储数据块
    # datablock = lambda : ModbusSparseDataBlock({0x00: 0, 0x05: 1})# pylint: disable=unnecessary-lambda-assignment

    # 工厂模式
    # datablock = lambda : ModbusSequentialDataBlock.create()# pylint: disable=unnecessary-lambda-assignment,unnecessary-lambda

    # 如果从节点数量不为0
    #服务器使用服务器上下文,该上下文允许服务器,以针对不同的从设备ID使用不同的从站上下文(slave context)进行响应。
    # 默认情况下,它将为提供的每个slave ID返回相同的上下文(广播模式)。
    # 但是,可以通过将single标识设置为False并且提供slave ID到上下文映射的字典来覆盖这种行为:
    # 从机上下文也可以按zero_mode初始化,这意味着到地址(0-7)的请求将映射到地址(0-7)。
    # 默认值为False,因此地址(0-7)将映射到(1-8):
    # context = {
    #   0x01: ModbusSlaveContext( # 0x01为从设备、从机地址
    #         di=datablock(), # 输入离散量
    #         co=datablock(), # 输出线圈
    #         hr=datablock(), # 保持寄存器
    #         ir=datablock(), # 输入寄存器
    #   ),
    #   0x02: ModbusSlaveContext(
    #         di=datablock(),
    #         co=datablock(),
    #         hr=datablock(),
    #         ir=datablock(),
    #   ),
    #   0x03: ModbusSlaveContext(
    #         di=datablock(),
    #         co=datablock(),
    #         hr=datablock(),
    #         ir=datablock(),
    #         zero_mode=True,
    #   ),
    #   }
    # single = False

    #如果从节点数量为0
    context = ModbusSlaveContext(
      di=datablock(), co=datablock(), hr=datablock(), ir=datablock()
    )
    single = True

    # 构建数据存储
    context = ModbusServerContext(slaves=context,
                                  single=single
                                  )

    # ----------------------------------------------------------------------- #
    # 初始化服务器信息
    # 不对属性字段做任何设置,则字段值默认为空字符串
    # ----------------------------------------------------------------------- #
    identity = ModbusDeviceIdentification(
      info_name={
            "VendorName": "Pymodbus",
            "ProductCode": "PM",
            "VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
            "ProductName": "Pymodbus Server",
            "ModelName": "Pymodbus Server",
            "MajorMinorRevision": pymodbus_version,
      }
    )

    # sslctx=sslctx,# 用于TLS的SSLContext (默认为None, 自动创建)
    cert_file_path = './certificates/pymodbus.crt' # 用于TLS 证书文件路径(如果sslctx为None时使用该文件)
    key_file_path = './certificates/pymodbus.key'# 用于TLS 私钥文件路径 (如果sslctx为None时使用该文件)


async def run_async_server():
    """Run server."""

    txt = f'### start ASYNC server, listening on {Args.port} - {Args.comm}'
    _logger.info(txt)

    if Args.comm == 'tcp':
      address = (Args.host if Args.host else '', Args.port if Args.port else None)
      server = await StartAsyncTcpServer(
            context=Args.context,# 数据存储
            identity=Args.identity,# 服务器标识
            # TBD host=
            # TBD port=
            address=address,# 监听地址
            # custom_functions=[],# 允许自定义处理函数
            framer=Args.framer,# 使用的帧策略
            # ignore_missing_slaves=True,# 忽略对缺失的slave的请求
            # broadcast_enable=False,# 是否允许广播 将 slave id 0视为广播地址
            # timeout=1# 等待请求完成的时间 # waiting time for request to complete

            # 可选参数,实现hook功能
            request_tracer=server_request_tracer,
            response_manipulator=server_response_manipulator
      )
    elif Args.comm == 'udp':
      address = (
            Args.host if Args.host else "127.0.0.1",
            Args.port if Args.port else None,
      )
      server = await StartAsyncUdpServer(
            context=Args.context,
            identity=Args.identity,
            address=address,
            # custom_functions=[],
            framer=Args.framer,
            # ignore_missing_slaves=True,
            # broadcast_enable=False,
            # timeout=1
      )
    elif Args.comm == 'serial':
      # socat -d -d PTY,link=/tmp/ptyp0,raw,echo=0,ispeed=9600
      #             PTY,link=/tmp/ttyp0,raw,echo=0,ospeed=9600
      server = await StartAsyncSerialServer(
            context=Args.context,
            identity=Args.identity,
            # timeout=1,# waiting time for request to complete
            port=Args.port,# 串口
            # custom_functions=[],
            framer=Args.framer,
            # stopbits=1,# 要使用的停止位数(The number of stop bits to use)
            # bytesize=8,# 序列化消息字节大小(The bytesize of the serial messages)
            # parity="N",# 使用哪种奇偶校验
            baudrate=Args.baudrate,# 用于串行设备的波特率
            # handle_local_echo=False,# 处理USB-to-RS485适配器的本地echo(Handle local echo of the USB-to-RS485 adaptor)
            # ignore_missing_slaves=True,# ignore request to a missing slave
            # broadcast_enable=False,#
            # strict=True,# 使用严格的计时,针对Modbus RTU 为t1.5(use strict timing, t1.5 for Modbus RTU)
      )
    elif Args.comm == 'tls':
      address = (Args.host if Args.host else '', Args.port if Args.port else None)
      server = await StartAsyncTlsServer(
            context=Args.context,# Data storage
            host='localhost',# 定义用于连接的tcp地址
            # port=port,# tcp监听端口
            identity=Args.identity,# server identify
            # custom_functions=[],# allow custom handling
            address=address,
            framer=Args.framer,
            certfile=Args.cert_file_path,# The cert file path for TLS (used if sslctx is None)
            # sslctx=sslctx,# The SSLContext to use for TLS (default None and auto create)
            keyfile=Args.key_file_path,# The key file path for TLS (used if sslctx is None)
            # password="none",# 用于解密私钥文件的密码
            # ignore_missing_slaves=True,
            # broadcast_enable=False,
            # timeout=1
      )
    return server


async def async_helper():
    _logger.info("Starting server...")
    await run_async_server()


if __name__ == "__main__":
    asyncio.run(async_helper(), debug=True)异步客户端代码实现

client_async.py
#!/usr/bin/env python3

'''Pymodbus异步客户端示例
'''
import asyncio
import logging

import pymodbus.client as modbusClient
from pymodbus import ModbusException

_logger = logging.getLogger('logger')

class Args:
    comm = 'tcp' # 通讯模式,可选值 tcp、udp serial、tls
    comm_defaults = {
      'tcp': ['socket', 5020],
      # 'tcp': ['rtu', 5020], # 如果采用ModbusRTU协议 则使用这个
      'udp': ['socket', 5020],
      # 'serial': ['rtu', '/dev/ptyp0'], # Linux
      'serial': ['rtu', 'COM2']# Windows(本例中COM1和COM2是用Virtual Serial Port Driver 6.9软件成对添加的虚拟端口)
      'tls': ['tls', 5020]
    }

    framer = comm_defaults # 帧类型
    port = comm_defaults # 站点端口
    host = '127.0.0.1' # 服务端地址
    baudrate = 9600 # 串行设备波特率

    timeout = 10 # 客户端访问服务器超时时间(该参数仅用于客户端(slave节点)),float型

    # sslctx=sslctx,# 用于TLS的SSLContext (默认为None, 自动创建)
    cert_file_path = './certificates/pymodbus.crt' # 用于TLS 证书文件路径(如果sslctx为None时使用该文件)
    key_file_path = './certificates/pymodbus.key'# 用于TLS 私钥文件路径 (如果sslctx为None时使用该文件)


async def run_async_client(modbus_calls=None):
    """Run sync client."""

    _logger.info("### Create client object")
    if Args.comm == "tcp":
      client = modbusClient.AsyncModbusTcpClient(
            Args.host,
            port=Args.port,# on which port
            # Common optional parameters:
            framer=Args.framer,
            timeout=Args.timeout,
            retries=3,
            reconnect_delay=1,
            reconnect_delay_max=10,
            #    retry_on_empty=False,
            # TCP setup parameters
            #    source_address=("localhost", 0),
      )
    elif Args.comm == "udp":
      client = modbusClient.AsyncModbusUdpClient(
            Args.host,
            port=Args.port,
            # Common optional parameters:
            framer=Args.framer,
            timeout=Args.timeout,
            #    retries=3,
            #    retry_on_empty=False,
            # UDP setup parameters
            #    source_address=None,
      )
    elif Args.comm == "serial":
      client = modbusClient.AsyncModbusSerialClient(
            Args.port,
            # Common optional parameters:
            #    framer=ModbusRtuFramer,
            timeout=Args.timeout,
            #    retries=3,
            #    retry_on_empty=False,
            # Serial setup parameters
            baudrate=Args.baudrate,
            #    bytesize=8,
            #    parity="N",
            #    stopbits=1,
            #    handle_local_echo=False,
            #    strict=True,
      )
    elif Args.comm == "tls":
      client = modbusClient.AsyncModbusTlsClient(
            Args.host,
            port=Args.port,
            # Common optional parameters:
            framer=Args.framer,
            timeout=Args.timeout,
            #    retries=3,
            #    retry_on_empty=False,
            # TLS setup parameters
            sslctx=modbusClient.AsyncModbusTlsClient.generate_ssl(
                certfile=Args.cert_file_path,
                keyfile=Args.key_file_path,
            #    password="none",
            ),
            server_hostname="localhost",
      )

    _logger.info("### Client starting")
    await client.connect()
    assert client.connected
    if modbus_calls:
      await modbus_calls(client)
    client.close()
    _logger.info("### End of Program")


async def run_a_few_calls(client):
   
    try:
      # 读取线圈状态
      rr = await client.read_coils(0, 1, slave=0) # 从 0x00 地址开始读取1个线圈
      print(rr.bits) # 输出:
      # assert len(rr.bits) == 8
      print(rr) # 输出:ReadCoilsResponse(8)

      # 读输入离散量
      rr = await client.read_holding_registers(0, 1, slave=0) # 从 0x00 地址开始读取1个线圈
      print(rr.registers) # 输出:

      rr = await client.read_holding_registers(4, 2, slave=0) # 从0x04 地址开始读取2个线圈
      # assert rr.registers == 17
      # assert rr.registers == 17
      print(rr.registers) # 输出:

      # 读保持寄存器
      rr = await client.read_holding_registers(5, 4) # 从 0x05 地址开始读取4个线圈
      print(rr.registers) # 输出:

      # 读输入寄存器
      rr = await client.read_input_registers(0x0F, 3, slave=0) # 从 0x0F 地址开始读取4个线圈
      print(rr.registers) # 输出:
      rr = await client.read_input_registers(15, 3, slave=0) # 从 0x0F 地址开始读取4个线圈
      print(rr) # 输出:ReadInputRegistersResponse (3)
      print(rr.registers) # 输出:

      # 写单个线圈
      rr = await client.write_coil(9, False, slave=0) # 将布尔值False写入 0x09 地址
      print(rr) # 输出:WriteCoilResponse(9) => False
      rr = await client.read_coils(9, 1, slave=0)
      print(rr.bits) # 输出:

      # 写多个线圈
      await client.write_coils(10, False, slave=0) # 将布尔值False写入 0x0A 地址
      rr = await client.read_coils(10, 1, slave=0)
      print(rr.bits) # 输出:
      rr = await client.read_coils(11, 1, slave=0)
      print(rr.bits) # 输出:

      await client.write_coils(10, , slave=0) # 将布尔值False写入 0x0A 0x0B 地址
      rr = await client.read_coils(11, 1, slave=0)
      print(rr.bits) # 输出:

      # 写单个保持寄存器
      await client.write_register(12, 0x0F, slave=0) # 将0x0F写入 0x0C 地址
      rr = await client.read_input_registers(12, 3, slave=0)
      print(rr.registers) # 输出:
      rr = await client.read_holding_registers(12, 4)
      print(rr.registers) # 输出:

      # 写多个保持寄存器
      await client.write_registers(13, 0x0F, slave=0) # 将0x0F写入 0x0D 地址
      rr = await client.read_holding_registers(13, 2)
      print(rr.registers) # 输出:

      await client.write_registers(13, , slave=0) # 将0x0F写入 0x0D,0x0E 地址
      rr = await client.read_holding_registers(13, 2)
      print(rr.registers) # 输出:
    except ModbusException:
      pass


async def main():

    await run_async_client(modbus_calls=run_a_few_calls)


if __name__ == "__main__":
    asyncio.run(main(), debug=True)相关说明


[*]pymodbus.datastore.ModbusSequentialDataBlock
在 Modbus 协议中,数据通常被组织成多个数据块,而每个数据块包含一定数量的数据寄存器、者线圈或者离散量。该类是一个用于创建顺序排列的Modbus顺序数据存储数据块的类。
例如:
ModbusSequentialDataBlock(0x00, * 100) # 创建了一个从地址 0x00 开始,包含 100(即包含100个地址) 个初始值为 17 的数据块实践时发现,此时通过read_coils读取线圈,读取线圈起始地址不能超过99,否则服务端会报错Exception Response(129, 1, IllegalAddress),
client.read_coils(98, 1, slave=0) # 可正常读取
client.read_coils(99, 1, slave=0) # 报错修改下服务端数据块起始地址
ModbusSequentialDataBlock(0x01, * 100) # 创建了一个从地址 0x01 开始实践时发现,此时通过read_coils读取线圈,读取线圈起始地址不能超过100,否则服务端会报错Exception Response(129, 1, IllegalAddress),
client.read_coils(99, 1, slave=0) # 可正常读取
client.read_coils(100, 1, slave=0) # 报错
[*]pymodbus.datastore.ModbusSparseDataBlock
用于创建稀疏数据块的类。该类允许创建包含不连续地址的数据块(可随机访问)。具体来说,可以在数据块中指定特定地址的数据,而无需为数据块的每个地址都分配内存。这种方式可以有效地节省内存空间,尤其是在处理大量数据时。
例如:
sparse = ModbusSparseDataBlock({10: , 30: 1, 40: *20})创建一个拥有3个地址的数据块。
一个地址从0x10开始,长度为4(即包含4个地址),初始值分别为3,5,6,8,一个地址从0x30开始,长度为1,初始值为10,一个地址从0x40开始,长度为20,初始为0
sparse = ModbusSparseDataBlock(*100)创建从地址0x00开始,长度为100,初始值为10的数据块
sparse = ModbusSparseDataBlock() # 创建空的数据块
sparse.setValues(0, *10)# 添加从地址0x00开始,长度为10,值为10的数据块
sparse.setValues(30, *5) # 添加从地址0x30开始,长度为5, 值为20的数据块
# 注意,除非执行类__init__初始化函数时,将 mutable 属性设置为True(默认值),否则无法使用 setValues 函数来添加新的数据块
[*]pymodbus.datastore.ModbusSlaveContext
用于创建每种数据访问都存储在一个块中的一个modbus数据模型。该类可用来模拟 Modbus 从设备上下文。可以在这个上下文中添加多个不同类型的数据块,模拟一个完整的 Modbus 从设备。
例子:
ModbusSlaveContext(
                di=datablock(), # 输入离散量(Discrete Inputs)
                co=datablock(), # 输出线圈 (Coils)
                hr=datablock(), # 保持寄存器(Holding Register)
                ir=datablock(), # 输入寄存器(Input Register)
            )
[*]pymodbus.datastore.ModbusServerContext
这表示从上下文的主集合,用于创建一个服务器上下文,并将从站上下文添加到服务器上下文中。
如果初始化时,属性single被设置为True,它将被视为单个上下文(所有的从设备共享相同的 Modbus 地址空间,没有独立的地址范围),因此每个slave ID都返回相同的上下文。通过分析源码可知,当single被设置为True时,会创建一个从设备上下文,设备地址默认为 0,
如果single设置为False,它将被解释为从站上下文的集合从属上下文(每个从设备都有独立的 Modbus 地址空间,它们的地址范围是相互独立的)
[*]pymodbus.client.mixin.ModbusClientMixin

[*]def read_coils(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)
读线圈(功能码 0x01)

[*]address要读取数据的起始地址
[*]count可选参数,要读取的线圈数量(针对read_coils,发现count设置大于1的数和设置为1是一样的效果)
[*]slave 可选参数,Modbus从机ID(实践发现,服务端构建服务器实例时,如果single设置为True时, 这里的slave只要不超出合法值范围,可以随便填,但是如果single设置为False,则必须填写正确的从机ID)
[*]kwargs可选参数,实验性参数
异常抛出ModbusException,下同,不再赘述

[*]def read_discrete_inputs(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)
读输入离散量(对应功能码 0x02)
参数说明参考read_coils
[*]def read_holding_registers(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)
读保持寄存器(对应功能码 0x03)
参数说明参考read_coils
[*]def read_input_registers(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)
读输入寄存器(对应功能码 0x04)
参数说明参考read_coils
[*]def write_coil(self, address: int, value: bool, slave: int = 0, **kwargs: Any)
写单个线圈(对应功能码 0x05)

[*]address要写入数据的起始地址
[*]value要写入的布尔值
[*]slave 可选参数,Modbus从机ID
[*]kwargs可选参数,实验性参数

[*]defwrite_coils( self, address: int, values: list | bool, slave: int = 0, **kwargs: Any)
写多个线圈(对应功能码 0x0F)

[*]address要写入数据的起始地址
[*]values要写入的布尔值列表、或者单个布尔值
[*]slave 可选参数,Modbus从机ID
[*]kwargs可选参数,实验性参数

[*]def write_register(self, address: int, value: int, slave: int = 0, **kwargs: Any)
写单个寄存器(功能码 0x06)

[*]address要写入数据的起始地址
[*]value要写入的整数
[*]slave 可选参数,Modbus从机ID
[*]kwargs可选参数,实验性参数

[*]def write_registers( self, address: int, values: list | int, slave: int = 0, **kwargs: Any)
写多个寄存器(功能码 0x10)

[*]address要写入数据的起始地址
[*]values要写入的整数列表、或者单个整数
[*]slave 可选参数,Modbus从机ID
[*]kwargs可选参数,实验性参数

[*]

[*]
为服务器设置初始化 payload实现

server_payload.py
#!/usr/bin/env python3
'''Pymodbus服务器Payload示例。
此示例展示如何使用builder初始化具复杂的内存layout的服务器

'''

import asyncio

import logging
_logger = logging.getLogger('logger')

from pymodbus.constants import Endian
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext
)
from pymodbus.server import StartAsyncTcpServer

from pymodbus.payload import BinaryPayloadBuilder

async def run_async_server():
    """Run server."""

    builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE, wordorder=Endian.LITTLE)
    builder.add_string("abcdefgh")
    builder.add_bits()
    builder.add_8bit_int(-0x12)
    builder.add_8bit_uint(0x12)
    builder.add_16bit_int(-0x5678)
    builder.add_16bit_uint(0x1234)
    builder.add_32bit_int(-0x1234)
    builder.add_32bit_uint(0x12345678)
    builder.add_16bit_float(12.34)
    builder.add_16bit_float(-12.34)
    builder.add_32bit_float(22.34)
    builder.add_32bit_float(-22.34)
    builder.add_64bit_int(-0xDEADBEEF)
    builder.add_64bit_uint(0x12345678DEADBEEF)
    builder.add_64bit_uint(0xDEADBEEFDEADBEED)
    builder.add_64bit_float(123.45)
    builder.add_64bit_float(-123.45)

    datablock = ModbusSequentialDataBlock(1, builder.to_registers())

    context = ModbusSlaveContext(
      di=datablock, co=datablock, hr=datablock, ir=datablock # 注意,datablock不能加括号()
    )
    single = True

    # 构建数据存储
    context = ModbusServerContext(slaves=context, single=single)

    txt = f'### start ASYNC server, listening on 5020 - tcp'
    _logger.info(txt)

    address = ('', 5020)
    server = await StartAsyncTcpServer(
      context=context,# 数据存储
      # TBD host=
      # TBD port=
      address=address,# 监听地址
      # custom_functions=[],# 允许自定义处理函数
      framer='socket',# 使用的帧策略
      # ignore_missing_slaves=True,# 忽略对缺失的slave的请求
      # broadcast_enable=False,# 是否允许广播 将 slave id 0视为广播地址
      # timeout=1# 等待请求完成的时间 # waiting time for request to complete
    )
    return server


async def async_helper():

    _logger.info("Starting server...")
    await run_async_server()


if __name__ == "__main__":
    asyncio.run(async_helper(), debug=True)带有更新任务的服务器代码实现

server_updating.py
#!/usr/bin/env python3
'''带有更新任务的Pymodbus异步服务器示例。

异步服务器以及随服务器一起连续运行并更新值的任务示例
'''

import asyncio

import logging
_logger = logging.getLogger('logger')

from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext
)
from pymodbus.server import StartAsyncTcpServer


async def updating_task(context):
    '''更新服务器中的数据值
    此任务伴随服务器持续运行,它将每两秒增加一些值

    需要注意的是,getValues和setValues不是并发安全的
    '''

    # fc_as_hex = 3 # 功能码,例如3、0x03 表示读保持寄存器
    fc_as_hex = 4
    slave_id = 0x00 # 从节点ID
    address = 0x00# 数据读取起始地址
    count = 6 # 需要获取的值的数量

    values = context.getValues(fc_as_hex, address, count=count)

    # set values to zero
    values =
    context.setValues(fc_as_hex, address, values)

    txt = f'updating_task: started: initialised values: {values!s} at address {address!s}'
    print(txt)
    _logger.debug(txt)

    # 循环递增
    while True:
      await asyncio.sleep(2)

      values = context.getValues(fc_as_hex, address, count=count)
      values =
      context.setValues(fc_as_hex, address, values)

      txt = f'updating_task: incremented values: {values!s} at address {address!s}'
      print(txt)
      _logger.debug(txt)


async def run_async_server(context):
    """Run server."""

    txt = f'### start ASYNC server, listening on 5020 - tcp'
    _logger.info(txt)

    address = ('', 5020)
    server = await StartAsyncTcpServer(
      context=context,# 数据存储
      # TBD host=
      # TBD port=
      address=address,# 监听地址
      # custom_functions=[],# 允许自定义处理函数
      framer='socket',# 使用的帧策略
      # ignore_missing_slaves=True,# 忽略对缺失的slave的请求
      # broadcast_enable=False,# 是否允许广播 将 slave id 0视为广播地址
      # timeout=1# 等待请求完成的时间 # waiting time for request to complete
    )
    return server


async def async_helper():

    datablock = lambda : ModbusSequentialDataBlock(0x01, * 100)# pylint: disable=unnecessary-lambda-assignment

    context = ModbusSlaveContext(
      di=datablock(), co=datablock(), hr=datablock(), ir=datablock() # 注意,datablock不能加括号()
    )
    single = True

    # 构建数据存储
    context = ModbusServerContext(slaves=context, single=single)

    task = asyncio.create_task(updating_task(context))
    task.set_name("example updating task")

    _logger.info("Starting server...")
    await run_async_server(context)

    task.cancel()


if __name__ == "__main__":
    asyncio.run(async_helper(), debug=True)客户端访问验证
#!/usr/bin/env python3

import asyncio
import logging

import pymodbus.client as modbusClient
from pymodbus import ModbusException

_logger = logging.getLogger('logger')

async def run_async_client(modbus_calls=None):
    """Run sync client."""

    _logger.info("### Create client object")
    client = modbusClient.AsyncModbusTcpClient(
      '127.0.0.1',
      port=5020,# on which port
      # Common optional parameters:
      framer='socket', # 客户端访问服务器超时时间(该参数仅用于客户端(slave节点)),float型
      timeout=10,
      retries=3,
      reconnect_delay=1,
      reconnect_delay_max=10,
      #    retry_on_empty=False,
      # TCP setup parameters
      #    source_address=("localhost", 0),
    )

    _logger.info("### Client starting")
    await client.connect()
    assert client.connected
    if modbus_calls:
      await modbus_calls(client)
    client.close()
    _logger.info("### End of Program")


async def run_a_few_calls(client):

    try:
      # 读保持寄存器
      # rr = await client.read_holding_registers(0, 4)
      # print(rr.registers)

      # 读输入寄存器
      rr = await client.read_input_registers(0, 7, slave=0)
      print(rr.registers)
    except ModbusException:
      pass


async def main(cmdline=None):
    """Combine setup and run."""

    await run_async_client(modbus_calls=run_a_few_calls)


if __name__ == "__main__":
    asyncio.run(main(), debug=True)class ModbusSlaveContext(ModbusBaseSlaveContext):

    def getValues(self, fc_as_hex, address, count=1):
      """Get `count` values from datastore.

      :param fc_as_hex: The function we are working with
      :param address: The starting address
      :param count: The number of values to retrieve
      :returns: The requested values from a:a+c
      """
      if not self.zero_mode:
            address += 1
      Log.debug("getValues: fc-[{}] address-{}: count-{}", fc_as_hex, address, count)
      return self.store.getValues(address, count)
      

    def setValues(self, fc_as_hex, address, values):
      """Set the datastore with the supplied values.

      :param fc_as_hex: The function we are working with
      :param address: The starting address
      :param values: The new values to be set
      """
      if not self.zero_mode:
            address += 1
      Log.debug("setValues[{}] address-{}: count-{}", fc_as_hex, address, len(values))
      self.store.setValues(address, values)同步服务器和异步客户端实现

同步服务器代码实现

server_sync.py
#!/usr/bin/env python3
'''Pymodbus 同步服务器示例
'''

import logging

from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext
)

from pymodbus.server import StartTcpServer

_logger = logging.getLogger('logger')


def run_sync_server():

    # 连续的、无间隙顺序存储数据块(寄存器块)
    datablock = lambda : ModbusSequentialDataBlock(0x01, * 100)
    context = ModbusSlaveContext(
      di=datablock(), co=datablock(), hr=datablock(), ir=datablock()
    )
    single = True

    # 构建数据存储
    context = ModbusServerContext(slaves=context,
                                  single=single
                                  )

    txt = f'### start SYNC server'
    _logger.info(txt)

    address = ('', 5020)
    server = StartTcpServer(
      context=context,# Data storage
      # identity=identity,# server identify
      # TBD host=
      # TBD port=
      address=address,# listen address
      # custom_functions=[],# allow custom handling
      framer='socket',# The framer strategy to use
      # ignore_missing_slaves=True,# ignore request to a missing slave
      # broadcast_enable=False,# treat slave_id 0 as broadcast address,
      # timeout=1,# waiting time for request to complete
    )
    return server


def sync_helper():

    server = run_sync_server()
    server.shutdown()


if __name__ == "__main__":
    sync_helper()同步客户端代码实现

client_sync.py
#!/usr/bin/env python3

'''Pymodbus同步客户端示例
'''

import logging

import pymodbus.client as modbusClient
from pymodbus import ModbusException

_logger = logging.getLogger('logger')

def run_sync_client(modbus_calls=None):
    """Run sync client."""

    _logger.info("### Create client object")
    client = modbusClient.ModbusTcpClient(
      '127.0.0.1',
      port=5020,# on which port
      # Common optional parameters:
      framer='socket', # 客户端访问服务器超时时间(该参数仅用于客户端(slave节点)),float型
      timeout=10,
      retries=3,
      reconnect_delay=1,
      reconnect_delay_max=10,
      #    retry_on_empty=False,
      # TCP setup parameters
      #    source_address=("localhost", 0),
    )

    _logger.info("### Client starting")
    client.connect()
    assert client.connected
    if modbus_calls:
      modbus_calls(client)
    client.close()
    _logger.info("### End of Program")


def run_a_few_calls(client):

    try:
      # 读取线圈状态
      rr = client.read_coils(0, 1, slave=1) # 从 0x00 地址开始读取1个线圈
      print(rr.bits) # 输出:
      # assert len(rr.bits) == 8
    except ModbusException:
      pass


def main():

    run_sync_client(modbus_calls=run_a_few_calls)

if __name__ == "__main__":
    main()参考链接

https://pypi.org/project/pymodbus/
https://pymodbus.readthedocs.io/en/dev/index.html

来源:https://www.cnblogs.com/shouke/p/18342169
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Python pymodbus类库使用学习总结