|
使用pymodbus库进行modbus tcp通信
使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。
本篇主要介绍使用pymodbus库进行modbus tcp仿真,实现pc端读取plc或工业设备modbus变量。
安装pymodbus:创建modbus tcp server
这里我们先创建一个虚拟的modbus设备,如果你手里有一个plc或者工业设备,可以直接跳过本节。
- '''
- * @Author: liuzhao
- * @Last Modified time: 2022-10-05 09:56:13
- '''
- from pymodbus.server.sync import (
- StartTcpServer,
- )
- from pymodbus.datastore import (
- ModbusSequentialDataBlock,
- ModbusServerContext,
- ModbusSlaveContext,
- )
- from pymodbus.version import version
- datablock = ModbusSequentialDataBlock.create()
- context = ModbusSlaveContext(
- di=datablock,
- co=datablock,
- hr=datablock,
- ir=datablock,
- )
- single = True
- # Build data storage
- store = ModbusServerContext(slaves=context, single=single)
- if __name__ == '__main__':
- address = ("0.0.0.0", 503)
- StartTcpServer(
- context=store, # Data storage
- address=address, # listen address
- allow_reuse_address=True, # allow the reuse of an address
- )
复制代码 直接运行该脚本,就可以在本机的503端口创建一台modbus设备了,具体实现暂不深追,我们学习的重点是客户端对modbus变量的读写。
读写modbus变量
modbus变量类型以及地址
Object typeAccessSizeAddressCoilRead-write1 bit00001 – 09999Discrete inputRead-only1 bit10001 – 19999Input registerRead-only16 bits30001 – 39999Holding registerRead-write16 bits40001 – 49999coil是线圈,Discrete input是数字量输入,Input register是模拟量输入,Holding register是保持寄存器。一般地址范围是0-65535
读取常规变量
读写线圈 | 读取输入变量 | 读写保持寄存器- from pymodbus.client.sync import ModbusTcpClient
- from pymodbus.bit_read_message import ReadCoilsResponse
- from pymodbus.register_read_message import ReadInputRegistersResponse
- from pymodbus.exceptions import ConnectionException # 连接失败,用于异常处理
- host = '127.0.0.1'
- port = 503
- client = ModbusTcpClient(host,port)
- # 写入线圈
- client.write_coil(1, True)
- client.write_coil(2, False)
- client.write_coil(3, True)
- # 读取线圈 注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个
- result:ReadCoilsResponse = client.read_coils(address=1,cout=8) # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题
- print(result.isError())
- # 不建议使用
- print(result.getBit(7)) # 这里的参数address不是plc里的地址,而是python列表的address,
- print('read_coils ')
- # 建议使用
- print(result.bits) # 打印读取结果,一共8位
- # 读取其中的位
- print(
- result.bits[0],
- result.bits[1],
- result.bits[2]
- ) # 相当于result.getBit(0)
- # 读取数字输入
- result = client.read_discrete_inputs(address=10001,count=8) # 从10001开始读,读取8位
- print(result.bits)
- # 读取模拟输入寄存器
- input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8)
- # print(f'is_error:{input_register_result.isError()}')
- print('read_input_registers ')
- print(input_register_result.registers)
- print(input_register_result.getRegister(0))
- # 读写保持寄存器
- client.write_register(address=40001,value=100)
- result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1)
- print('read_holding_registers ')
- print(result.registers)
- # 关闭连接
- client.close()
复制代码 读取复杂变量
字符串、浮点数、负数等
这里需要注意modbus设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。
根据不同的设备,对照下表调整正确的组合方式。
Word OrderByte orderWord1Word2BigBig0x12340x5678BigLittle0x34120x7856LittleBig0x56780x1234LittleLittle0x78560x3412- # 复杂数据类型
- from collections import OrderedDict
- import logging
- from pymodbus.client.sync import ModbusTcpClient as ModbusClient
- from pymodbus.constants import Endian
- from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
- ORDER_DICT = {"<": "LITTLE", ">": "BIG"}
- def run_binary_payload_client(host:str,port:int):
-
- for word_endian, byte_endian in (
- (Endian.Big, Endian.Big),
- (Endian.Big, Endian.Little),
- (Endian.Little, Endian.Big),
- (Endian.Little, Endian.Little),
- ):
- print("-" * 60)
- print(f"Word Order: {ORDER_DICT[word_endian]}")
- print(f"Byte Order: {ORDER_DICT[byte_endian]}")
- print()
-
- builder = BinaryPayloadBuilder(
- wordorder=word_endian,
- byteorder=byte_endian,
- )
-
- # 写入的变量
- my_string = "abcd-efgh123345765432"
- builder.add_string(my_string)
- builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
- builder.add_8bit_int(-0x12)
- builder.add_8bit_uint(0x12)
- builder.add_16bit_int(-0x5678)
- builder.add_16bit_uint(0x1234)
- builder.add_32bit_int(-0x1234)
- builder.add_32bit_uint(0x12345678)
- builder.add_16bit_float(12.34)
- builder.add_16bit_float(-12.34)
- builder.add_32bit_float(22.34)
- builder.add_32bit_float(-22.34)
- builder.add_64bit_int(-0xDEADBEEF)
- builder.add_64bit_uint(0x12345678DEADBEEF)
- builder.add_64bit_uint(0x12345678DEADBEEF)
- builder.add_64bit_float(123.45)
- builder.add_64bit_float(-123.45)
- registers = builder.to_registers()
- print("Writing Registers:")
- print(registers)
- print("\n")
- payload = builder.build()
- address = 40001 # 从40001开始写入
- # We can write registers
- client.write_registers(address, registers, unit=1) # 写入
-
- # 读取复杂变量
- print("Reading Registers:")
- address = 40001
- count = len(payload)
- print(f"payload_len {count}")
- result = client.read_holding_registers(address, count, slave=1)
- print(result.registers)
- print("\n")
- decoder = BinaryPayloadDecoder.fromRegisters(
- result.registers, byteorder=byte_endian, wordorder=word_endian
- )
- # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder
- assert (
- decoder._byteorder == builder._byteorder # pylint: disable=protected-access
- ) # nosec
- assert (
- decoder._wordorder == builder._wordorder # pylint: disable=protected-access
- ) # nosec
- decoded = OrderedDict(
- [
- ("string", decoder.decode_string(len(my_string))),
- ("bits", decoder.decode_bits()),
- ("8int", decoder.decode_8bit_int()),
- ("8uint", decoder.decode_8bit_uint()),
- ("16int", decoder.decode_16bit_int()),
- ("16uint", decoder.decode_16bit_uint()),
- ("32int", decoder.decode_32bit_int()),
- ("32uint", decoder.decode_32bit_uint()),
- ("16float", decoder.decode_16bit_float()),
- ("16float2", decoder.decode_16bit_float()),
- ("32float", decoder.decode_32bit_float()),
- ("32float2", decoder.decode_32bit_float()),
- ("64int", decoder.decode_64bit_int()),
- ("64uint", decoder.decode_64bit_uint()),
- ("ignore", decoder.skip_bytes(8)),
- ("64float", decoder.decode_64bit_float()),
- ("64float2", decoder.decode_64bit_float()),
- ]
- )
- print("Decoded Data")
- for name, value in iter(decoded.items()):
- print(
- "%s\t" % name, # pylint: disable=consider-using-f-string
- hex(value) if isinstance(value, int) else value,
- )
- print("\n")
- # 关闭连接
- client.close()
- if __name__ == "__main__":
- run_binary_payload_client("127.0.0.1", 503)
复制代码 总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
来源:https://www.jb51.net/python/323478l1j.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|