Python Modbus库完全指南:从入门到精通
引言:为什么选择Python进行Modbus开发?
在工业物联网和自动化领域,Python凭借其简洁的语法、丰富的库生态系统和强大的社区支持,已经成为开发者的首选语言之一。对于Modbus协议开发,Python提供了多个成熟的开源库,能够满足从简单设备通信到复杂工业系统集成的各种需求。
本文将全面介绍Python中主流的Modbus库,包括pymodbus、minimalmodbus、uModbus等,通过实际代码示例和最佳实践,帮助您快速掌握Python Modbus开发的核心技能。
一、主流Python Modbus库对比
1.1 pymodbus:功能最全面的选择
特点:
– 支持Modbus TCP、RTU、ASCII等多种协议
– 同时支持客户端(主站)和服务器(从站)模式
– 异步/同步两种编程模式
– 活跃的社区和持续更新
适用场景:
– 复杂的工业控制系统
– 需要同时作为主站和从站的场景
– 高性能要求的应用
1.2 minimalmodbus:轻量级解决方案
特点:
– 代码简洁,易于学习和使用
– 专注于Modbus RTU协议
– 依赖少,部署简单
– 适合嵌入式系统和资源受限环境
适用场景:
– 简单的设备通信
– 嵌入式系统和单板计算机
– 快速原型开发
1.3 uModbus:现代化设计
特点:
– 现代化的API设计
– 支持异步编程
– 良好的类型提示
– 模块化架构
适用场景:
– 需要异步处理的应用
– 大型项目开发
– 需要良好代码维护性的场景
二、pymodbus深度解析
2.1 安装和基础配置
# 安装pymodbus
pip install pymodbus
# 或者安装包含异步支持的版本
pip install pymodbus[asyncio]
2.2 Modbus TCP客户端示例
from pymodbus.client import ModbusTcpClient
# 创建TCP客户端
client = ModbusTcpClient(
host='192.168.1.100', # 设备IP地址
port=502, # Modbus TCP端口
timeout=3, # 超时时间(秒)
retries=3 # 重试次数
)
try:
# 连接设备
if client.connect():
print("成功连接到Modbus设备")
# 读取保持寄存器(地址0,数量10)
result = client.read_holding_registers(address=0, count=10, slave=1)
if not result.isError():
print(f"读取到的寄存器值: {result.registers}")
else:
print(f"读取失败: {result}")
# 写入单个保持寄存器(地址5,值1234)
result = client.write_register(address=5, value=1234, slave=1)
if not result.isError():
print("寄存器写入成功")
# 读取线圈状态(地址0,数量8)
result = client.read_coils(address=0, count=8, slave=1)
if not result.isError():
print(f"线圈状态: {result.bits}")
finally:
# 关闭连接
client.close()
2.3 Modbus RTU客户端示例
from pymodbus.client import ModbusSerialClient
# 创建RTU客户端
client = ModbusSerialClient(
method='rtu', # 协议类型(rtu或ascii)
port='/dev/ttyUSB0', # 串口设备
baudrate=9600, # 波特率
bytesize=8, # 数据位
parity='N', # 校验位(N:无, E:偶, O:奇)
stopbits=1, # 停止位
timeout=1 # 超时时间
)
try:
if client.connect():
print("成功连接到串口设备")
# 批量读取输入寄存器
result = client.read_input_registers(address=0, count=20, slave=1)
if not result.isError():
data = result.registers
# 处理数据:假设前10个是温度值(需要除以10)
temperatures = [value/10.0 for value in data[:10]]
print(f"温度数据: {temperatures}")
finally:
client.close()
2.4 异步客户端示例
import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def read_device_data():
# 创建异步客户端
client = AsyncModbusTcpClient(
host='192.168.1.100',
port=502
)
await client.connect()
try:
# 同时读取多个设备的数据
tasks = []
for slave_id in [1, 2, 3]:
task = client.read_holding_registers(
address=0,
count=10,
slave=slave_id
)
tasks.append(task)
# 并行执行所有读取任务
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
if not result.isError():
print(f"设备{i+1}数据: {result.registers}")
finally:
client.close()
# 运行异步函数
asyncio.run(read_device_data())
三、minimalmodbus快速上手
3.1 基础使用
import minimalmodbus
import serial
# 创建仪器对象
instrument = minimalmodbus.Instrument(
port='/dev/ttyUSB0', # 串口设备
slaveaddress=1, # 从站地址
mode='rtu' # 协议模式
)
# 配置串口参数
instrument.serial.baudrate = 9600
instrument.serial.bytesize = 8
instrument.serial.parity = serial.PARITY_NONE
instrument.serial.stopbits = 1
instrument.serial.timeout = 1.0 # 秒
# 读取保持寄存器
try:
# 读取单个寄存器(地址30001)
temperature = instrument.read_register(
registeraddress=0, # 寄存器地址(0对应30001)
functioncode=3, # 功能码3:读保持寄存器
signed=True # 有符号整数
)
print(f"温度: {temperature}°C")
# 读取多个寄存器
values = instrument.read_registers(
registeraddress=0,
number_of_registers=5,
functioncode=3
)
print(f"多个寄存器值: {values}")
# 写入单个寄存器
instrument.write_register(
registeraddress=10,
value=100,
functioncode=6
)
except Exception as e:
print(f"通信错误: {e}")
3.2 高级功能
# 自定义字节顺序(大端/小端)
instrument.byteorder = minimalmodbus.BYTEORDER_BIG
# 设置调试模式(显示原始通信数据)
instrument.debug = True
# 使用不同的精度和缩放因子
def read_temperature_with_precision():
# 假设温度寄存器存储的是实际值*10
raw_value = instrument.read_register(0, 3)
temperature = raw_value / 10.0
return temperature
# 批量读取优化
def batch_read_registers(addresses):
"""批量读取多个寄存器,优化通信效率"""
results = {}
for addr in addresses:
try:
value = instrument.read_register(addr, 3)
results[addr] = value
except Exception as e:
results[addr] = f"错误: {e}"
return results
四、uModbus现代化开发
4.1 异步服务器示例
import asyncio
from umodbus.server.tcp import RequestHandler, get_server
from umodbus.conf import ModbusDataType
# 定义数据存储
data_store = {
'coils': {0: False, 1: True, 2: False},
'discrete_inputs': {0: True, 1: False},
'holding_registers': {
0: 100,
1: 200,
2: 300
},
'input_registers': {
0: 400,
1: 500
}
}
async def handle_request(slave_id, function_code, address, quantity):
"""处理Modbus请求"""
if function_code == 1: # 读线圈
return [data_store['coils'].get(addr, False)
for addr in range(address, address + quantity)]
elif function_code == 3: # 读保持寄存器
return [data_store['holding_registers'].get(addr, 0)
for addr in range(address, address + quantity)]
elif function_code == 6: # 写单个寄存器
# 在实际应用中,这里会更新数据存储
return True
return None
async def main():
# 创建TCP服务器
server = get_server(
RequestHandler,
data_store=data_store,
request_handler=handle_request
)
# 启动服务器
await server.serve_forever()
# 运行服务器
asyncio.run(main())
五、实际应用案例
5.1 工业温度监控系统
import time
from datetime import datetime
import json
from pymodbus.client import ModbusTcpClient
class TemperatureMonitor:
def __init__(self, device_configs):
"""
初始化温度监控系统
device_configs: 设备配置列表
"""
self.devices = []
for config in device_configs:
client = ModbusTcpClient(
host=config['ip'],
port=config.get('port', 502),
timeout=config.get('timeout', 3)
)
self.devices.append({
'client': client,
'name': config['name'],
'slave_id': config.get('slave_id', 1),
'registers': config['registers']
})
self.data_history = []
def read_all_temperatures(self):
"""读取所有设备的温度数据"""
readings = []
for device in self.devices:
client = device['client']
if not client.connect():
print(f"无法连接到设备: {device['name']}")
continue
try:
device_readings = {}
for reg_name, reg_config in device['registers'].items():
result = client.read_holding_registers(
address=reg_config['address'],
count=reg_config.get('count', 1),
slave=device['slave_id']
)
if not result.isError():
raw_value = result.registers[0]
# 应用转换公式(如除以10得到实际温度)
actual_value = raw_value / reg_config.get('scale', 1.0)
device_readings[reg_name] = {
'raw': raw_value,
'actual': actual_value,
'unit': reg_config.get('unit', '°C')
}
readings.append({
'device': device['name'],
'timestamp': datetime.now().isoformat(),
'readings': device_readings
})
except Exception as e:
print(f"读取设备{device['name']}时出错: {e}")
finally:
client.close()
# 保存到历史记录
self.data_history.extend(readings)
# 只保留最近1000条记录
if len(self.data_history) > 1000:
self.data_history = self.data_history[-1000:]
return readings
def check_alarms(self, readings):
"""检查报警条件"""
alarms = []
for device_reading in readings:
for sensor_name, sensor_data in device_reading['readings'].items():
value = sensor_data['actual']
# 简单报警逻辑
if value > 80: # 高温报警
alarms.append({
'device': device_reading['device'],
'sensor': sensor_name,
'value': value,
'type': 'high_temperature',
'message': f"温度过高: {value}°C",
'timestamp': device_reading['timestamp']
})
elif value < 10: # 低温报警
alarms.append({
'device': device_reading['device'],
'sensor': sensor_name,
'value': value,
'type': 'low_temperature',
'message': f"温度过低: {value}°C",
'timestamp': device_reading['timestamp']
})
return alarms
def export_data(self, filename='temperature_data.json'):
"""导出数据到JSON文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump({
'history': self.data_history,
'export_time': datetime.now().isoformat()
}, f, ensure_ascii=False, indent=2)
print(f"数据已导出到: {filename}")
# 使用示例
if __name__ == "__main__":
# 设备配置
device_configs = [
{
'name': '锅炉1',
'ip': '192.168.1.101',
'registers': {
'temperature': {'address': 0, 'scale': 10.0, 'unit': '°C'},
'pressure': {'address': 1, 'scale': 100.0, 'unit': 'kPa'}
}
},
{
'name': '反应釜2',
'ip': '192.168.1.102',
'registers': {
'temperature': {'address': 0, 'scale': 10.0, 'unit': '°C'},
'ph_value': {'address': 1, 'scale': 100.0, 'unit': 'pH'}
}
}
]
# 创建监控器
monitor = TemperatureMonitor(device_configs)
# 监控循环
try:
while True:
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始读取数据...")
# 读取数据
readings = monitor.read_all_temperatures()
# 显示数据
for reading in readings:
print(f"\n设备: {reading['device']}")
for sensor, data in reading['readings'].items():
print(f" {sensor}: {data['actual']} {data['unit']}")
# 检查报警
alarms = monitor.check_alarms(readings)
if alarms:
print("\n⚠️ 报警信息:")
for alarm in alarms:
print(f" {alarm['message']}")
# 每5秒读取一次
time.sleep(5)
except KeyboardInterrupt:
print("\n监控停止")
# 导出数据
monitor.export_data()
5.2 数据可视化界面
“`python
import tkinter as tk
from tkinter import ttk
import threading
import time
from datetime import datetime
from pymodbus.client import ModbusTcpClient
class ModbusMonitorGUI:
def init(self, root):
self.root = root
self.root.title(“Modbus设备监控系统”)
self.root.geometry(“800×600”)
# 设备配置
self.devices = [
{'name': 'PLC_1', 'ip': '192.168.1.100', 'port': 502, 'slave_id': 1},
{'name': 'PLC_2', 'ip': '192.168.1.101', 'port': 502, 'slave_id': 2}
]
# 数据存储
self.data = {}
for device in self.devices:
self.data[device['name']] = {
'registers': {},
'last_update': None,
'status': '离线'
}
# 创建界面
self.setup_ui()
# 启动监控线程
self.monitoring = True
self.monitor_thread = threading.Thread(target=self.monitor_devices)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def setup_ui(self):
"""设置用户界面"""
# 创建主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 设备状态区域
status_frame = ttk.LabelFrame(main_frame, text="设备状态", padding="10")
status_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
self.status_labels = {}
for i, device in enumerate(self.devices):
label = ttk.Label(status_frame, text=f"{device['name']}: 离线")
label.grid(row=i, column=0, sticky=tk.W, padx=5, pady=2)
self.status_labels[device['name']] = label
# 数据展示区域
data_frame = ttk.LabelFrame(main_frame, text="实时数据", padding="10")
data_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
# 创建Treeview显示数据
columns = ('设备', '寄存器', '地址', '值', '时间')
self.tree = ttk.Treeview(data_frame, columns=columns, show='headings', height=15)
for col in columns:
self.tree.heading(col, text=col)
self.tree.column(col, width=100)
# 添加滚动条
scrollbar = ttk.Scrollbar(data_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# 控制按钮区域
button_frame = ttk.Frame(main_frame)
button_frame.grid(row=2, column=0, sticky=(tk.W, tk.E))
ttk.Button(button_frame, text="刷新数据", command=self.manual_refresh).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="导出数据", command=self.export_data).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="停止监控", command=self.stop_monitoring).pack(side=tk.LEFT, padx=5)
# 配置网格权重
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(1, weight=1)
data_frame.columnconfigure(0, weight=1)
data_frame.rowconfigure(0, weight=1)
def read_device_data(self, device_config):
"""读取单个设备的数据"""
client = ModbusTcpClient(
host=device_config['ip'],
port=device_config['port'],
timeout=3
)
try:
if client.connect():
# 读取寄存器0-19
result = client.read_holding_registers(
address=0,
count=20,
slave=device_config['slave_id']
)
if not result.isError():
return {
'status': '在线',
'data': result.registers,
'timestamp': datetime.now()
}
else:
return {
'status': '读取错误',
'data': None,
'timestamp': datetime.now()
}
else:
return {
'status': '连接失败',
'data': None,
'timestamp': datetime.now()
}
except Exception as e:
return {
'status': f'异常: {str(e)}',
'data': None,
'timestamp': datetime.now()
}
finally:
client.close()
def monitor_devices(self):
"""监控设备线程"""
while self.monitoring:
for device in self.devices:
result = self.read_device_data(device)
# 更新数据存储
self.data[device['name']].update({
'registers': result['data'] or {},
'last_update': result['timestamp'],
'status': result['status']
})
# 更新UI(需要在主线程中执行)
self.root.after(0, self.update_ui, device['name'], result)
# 每2秒更新一次
time.sleep(2)
def update_ui(self, device_name, result):
"""更新用户界面"""
# 更新状态标签
status_text = f"{device_name}: {result['status']}"
if result['timestamp']:
status_text += f" ({result['timestamp'].strftime('%H:%M:%S')})"
self.status_labels[device_name].config(text=status_text)
# 更新颜色
if result['status'] == '在线':
self.status_labels[device_name].config(foreground='green')
elif '错误' in result['status'] or '失败' in result['status']:
self.status_labels[device_name].config(foreground='red')
else:
self.status_labels[device_name].config(foreground='orange')
# 更新数据表格
if result['data']:
# 清除旧数据
for item in self.tree.get_children():
if self.tree.item(item)['values'][0] == device_name:
self.tree.delete(item)
# 添加新数据
for i, value in enumerate(result['data']):
self.tree.insert('', 'end', values=(
device_name,
f'400{i+1:02d}',
i,
value,
result['timestamp'].strftime('%H:%M:%S') if result['timestamp'] else ''
))
def manual_refresh(self):
"""手动刷新数据"""
for device in self.devices:
result = self.read_device_data(device)
self.update_ui(device['name'], result)
def export_data(self):
"""导出数据到文件"""
import json
from datetime import datetime
export_data = {
'export_time': datetime.now().isoformat(),
'devices': {}
}
for device_name, device_data in self.data.items():
export_data['devices'][device_name] = {
'status': device_data['status'],
'last_update': device_data['last_update'].isoformat() if device_data['last_update'] else None,
'registers': device_data['registers']
}
filename = f"modbus_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
# 显示导出成功消息
from tkinter import messagebox
messagebox.showinfo("导出成功", f"数据已导出到: {filename}")
def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
self.root.quit()
运行GUI应用
if name == “main“:
root = tk.Tk()
app = ModbusMonitorGUI(root)
root.mainloop()
