Python Modbus库完全指南:从入门到精通

引言:为什么选择Python进行Modbus开发?

工业物联网和自动化领域,Python凭借其简洁的语法、丰富的库生态系统和强大的社区支持,已经成为开发者的首选语言之一。对于Modbus协议开发,Python提供了多个成熟的开源库,能够满足从简单设备通信到复杂工业系统集成的各种需求。

本文将全面介绍Python中主流的Modbus库,包括pymodbus、minimalmodbus、uModbus等,通过实际代码示例和最佳实践,帮助您快速掌握Python Modbus开发的核心技能。

一、主流Python Modbus库对比

1.1 pymodbus:功能最全面的选择

特点:
– 支持Modbus TCP、RTU、ASCII等多种协议
– 同时支持客户端(主站)和服务器(从站)模式
– 异步/同步两种编程模式
– 活跃的社区和持续更新

适用场景:
– 复杂的工业控制系统
– 需要同时作为主站和从站的场景
– 高性能要求的应用

1.2 minimalmodbus:轻量级解决方案

特点:
– 代码简洁,易于学习和使用
– 专注于Modbus RTU协议
– 依赖少,部署简单
– 适合嵌入式系统和资源受限环境

适用场景:
– 简单的设备通信
– 嵌入式系统和单板计算机
– 快速原型开发

1.3 uModbus:现代化设计

特点:
– 现代化的API设计
– 支持异步编程
– 良好的类型提示
– 模块化架构

适用场景:
– 需要异步处理的应用
– 大型项目开发
– 需要良好代码维护性的场景

二、pymodbus深度解析

2.1 安装和基础配置

# 安装pymodbus
pip install pymodbus

# 或者安装包含异步支持的版本
pip install pymodbus[asyncio]

2.2 Modbus TCP客户端示例

from pymodbus.client import ModbusTcpClient

# 创建TCP客户端
client = ModbusTcpClient(
    host='192.168.1.100',  # 设备IP地址
    port=502,              # Modbus TCP端口
    timeout=3,             # 超时时间(秒)
    retries=3              # 重试次数
)

try:
    # 连接设备
    if client.connect():
        print("成功连接到Modbus设备")

        # 读取保持寄存器(地址0,数量10)
        result = client.read_holding_registers(address=0, count=10, slave=1)
        if not result.isError():
            print(f"读取到的寄存器值: {result.registers}")
        else:
            print(f"读取失败: {result}")

        # 写入单个保持寄存器(地址5,值1234)
        result = client.write_register(address=5, value=1234, slave=1)
        if not result.isError():
            print("寄存器写入成功")

        # 读取线圈状态(地址0,数量8)
        result = client.read_coils(address=0, count=8, slave=1)
        if not result.isError():
            print(f"线圈状态: {result.bits}")

finally:
    # 关闭连接
    client.close()

2.3 Modbus RTU客户端示例

from pymodbus.client import ModbusSerialClient

# 创建RTU客户端
client = ModbusSerialClient(
    method='rtu',          # 协议类型(rtu或ascii)
    port='/dev/ttyUSB0',   # 串口设备
    baudrate=9600,         # 波特率
    bytesize=8,            # 数据位
    parity='N',            # 校验位(N:无, E:偶, O:奇)
    stopbits=1,            # 停止位
    timeout=1              # 超时时间
)

try:
    if client.connect():
        print("成功连接到串口设备")

        # 批量读取输入寄存器
        result = client.read_input_registers(address=0, count=20, slave=1)
        if not result.isError():
            data = result.registers
            # 处理数据:假设前10个是温度值(需要除以10)
            temperatures = [value/10.0 for value in data[:10]]
            print(f"温度数据: {temperatures}")

finally:
    client.close()

2.4 异步客户端示例

import asyncio
from pymodbus.client import AsyncModbusTcpClient

async def read_device_data():
    # 创建异步客户端
    client = AsyncModbusTcpClient(
        host='192.168.1.100',
        port=502
    )

    await client.connect()

    try:
        # 同时读取多个设备的数据
        tasks = []
        for slave_id in [1, 2, 3]:
            task = client.read_holding_registers(
                address=0, 
                count=10, 
                slave=slave_id
            )
            tasks.append(task)

        # 并行执行所有读取任务
        results = await asyncio.gather(*tasks)

        for i, result in enumerate(results):
            if not result.isError():
                print(f"设备{i+1}数据: {result.registers}")

    finally:
        client.close()

# 运行异步函数
asyncio.run(read_device_data())

三、minimalmodbus快速上手

3.1 基础使用

import minimalmodbus
import serial

# 创建仪器对象
instrument = minimalmodbus.Instrument(
    port='/dev/ttyUSB0',   # 串口设备
    slaveaddress=1,        # 从站地址
    mode='rtu'             # 协议模式
)

# 配置串口参数
instrument.serial.baudrate = 9600
instrument.serial.bytesize = 8
instrument.serial.parity = serial.PARITY_NONE
instrument.serial.stopbits = 1
instrument.serial.timeout = 1.0  # 秒

# 读取保持寄存器
try:
    # 读取单个寄存器(地址30001)
    temperature = instrument.read_register(
        registeraddress=0,  # 寄存器地址(0对应30001)
        functioncode=3,     # 功能码3:读保持寄存器
        signed=True         # 有符号整数
    )
    print(f"温度: {temperature}°C")

    # 读取多个寄存器
    values = instrument.read_registers(
        registeraddress=0,
        number_of_registers=5,
        functioncode=3
    )
    print(f"多个寄存器值: {values}")

    # 写入单个寄存器
    instrument.write_register(
        registeraddress=10,
        value=100,
        functioncode=6
    )

except Exception as e:
    print(f"通信错误: {e}")

3.2 高级功能

# 自定义字节顺序(大端/小端)
instrument.byteorder = minimalmodbus.BYTEORDER_BIG

# 设置调试模式(显示原始通信数据)
instrument.debug = True

# 使用不同的精度和缩放因子
def read_temperature_with_precision():
    # 假设温度寄存器存储的是实际值*10
    raw_value = instrument.read_register(0, 3)
    temperature = raw_value / 10.0
    return temperature

# 批量读取优化
def batch_read_registers(addresses):
    """批量读取多个寄存器,优化通信效率"""
    results = {}
    for addr in addresses:
        try:
            value = instrument.read_register(addr, 3)
            results[addr] = value
        except Exception as e:
            results[addr] = f"错误: {e}"
    return results

四、uModbus现代化开发

4.1 异步服务器示例

import asyncio
from umodbus.server.tcp import RequestHandler, get_server
from umodbus.conf import ModbusDataType

# 定义数据存储
data_store = {
    'coils': {0: False, 1: True, 2: False},
    'discrete_inputs': {0: True, 1: False},
    'holding_registers': {
        0: 100,
        1: 200,
        2: 300
    },
    'input_registers': {
        0: 400,
        1: 500
    }
}

async def handle_request(slave_id, function_code, address, quantity):
    """处理Modbus请求"""
    if function_code == 1:  # 读线圈
        return [data_store['coils'].get(addr, False) 
                for addr in range(address, address + quantity)]

    elif function_code == 3:  # 读保持寄存器
        return [data_store['holding_registers'].get(addr, 0)
                for addr in range(address, address + quantity)]

    elif function_code == 6:  # 写单个寄存器
        # 在实际应用中,这里会更新数据存储
        return True

    return None

async def main():
    # 创建TCP服务器
    server = get_server(
        RequestHandler,
        data_store=data_store,
        request_handler=handle_request
    )

    # 启动服务器
    await server.serve_forever()

# 运行服务器
asyncio.run(main())

五、实际应用案例

5.1 工业温度监控系统

import time
from datetime import datetime
import json
from pymodbus.client import ModbusTcpClient

class TemperatureMonitor:
    def __init__(self, device_configs):
        """
        初始化温度监控系统
        device_configs: 设备配置列表
        """
        self.devices = []
        for config in device_configs:
            client = ModbusTcpClient(
                host=config['ip'],
                port=config.get('port', 502),
                timeout=config.get('timeout', 3)
            )
            self.devices.append({
                'client': client,
                'name': config['name'],
                'slave_id': config.get('slave_id', 1),
                'registers': config['registers']
            })

        self.data_history = []

    def read_all_temperatures(self):
        """读取所有设备的温度数据"""
        readings = []

        for device in self.devices:
            client = device['client']

            if not client.connect():
                print(f"无法连接到设备: {device['name']}")
                continue

            try:
                device_readings = {}
                for reg_name, reg_config in device['registers'].items():
                    result = client.read_holding_registers(
                        address=reg_config['address'],
                        count=reg_config.get('count', 1),
                        slave=device['slave_id']
                    )

                    if not result.isError():
                        raw_value = result.registers[0]
                        # 应用转换公式(如除以10得到实际温度)
                        actual_value = raw_value / reg_config.get('scale', 1.0)
                        device_readings[reg_name] = {
                            'raw': raw_value,
                            'actual': actual_value,
                            'unit': reg_config.get('unit', '°C')
                        }

                readings.append({
                    'device': device['name'],
                    'timestamp': datetime.now().isoformat(),
                    'readings': device_readings
                })

            except Exception as e:
                print(f"读取设备{device['name']}时出错: {e}")
            finally:
                client.close()

        # 保存到历史记录
        self.data_history.extend(readings)

        # 只保留最近1000条记录
        if len(self.data_history) > 1000:
            self.data_history = self.data_history[-1000:]

        return readings

    def check_alarms(self, readings):
        """检查报警条件"""
        alarms = []

        for device_reading in readings:
            for sensor_name, sensor_data in device_reading['readings'].items():
                value = sensor_data['actual']

                # 简单报警逻辑
                if value > 80:  # 高温报警
                    alarms.append({
                        'device': device_reading['device'],
                        'sensor': sensor_name,
                        'value': value,
                        'type': 'high_temperature',
                        'message': f"温度过高: {value}°C",
                        'timestamp': device_reading['timestamp']
                    })
                elif value < 10:  # 低温报警
                    alarms.append({
                        'device': device_reading['device'],
                        'sensor': sensor_name,
                        'value': value,
                        'type': 'low_temperature',
                        'message': f"温度过低: {value}°C",
                        'timestamp': device_reading['timestamp']
                    })

        return alarms

    def export_data(self, filename='temperature_data.json'):
        """导出数据到JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump({
                'history': self.data_history,
                'export_time': datetime.now().isoformat()
            }, f, ensure_ascii=False, indent=2)

        print(f"数据已导出到: {filename}")

# 使用示例
if __name__ == "__main__":
    # 设备配置
    device_configs = [
        {
            'name': '锅炉1',
            'ip': '192.168.1.101',
            'registers': {
                'temperature': {'address': 0, 'scale': 10.0, 'unit': '°C'},
                'pressure': {'address': 1, 'scale': 100.0, 'unit': 'kPa'}
            }
        },
        {
            'name': '反应釜2',
            'ip': '192.168.1.102',
            'registers': {
                'temperature': {'address': 0, 'scale': 10.0, 'unit': '°C'},
                'ph_value': {'address': 1, 'scale': 100.0, 'unit': 'pH'}
            }
        }
    ]

    # 创建监控器
    monitor = TemperatureMonitor(device_configs)

    # 监控循环
    try:
        while True:
            print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始读取数据...")

            # 读取数据
            readings = monitor.read_all_temperatures()

            # 显示数据
            for reading in readings:
                print(f"\n设备: {reading['device']}")
                for sensor, data in reading['readings'].items():
                    print(f"  {sensor}: {data['actual']} {data['unit']}")

            # 检查报警
            alarms = monitor.check_alarms(readings)
            if alarms:
                print("\n⚠️ 报警信息:")
                for alarm in alarms:
                    print(f"  {alarm['message']}")

            # 每5秒读取一次
            time.sleep(5)

    except KeyboardInterrupt:
        print("\n监控停止")
        # 导出数据
        monitor.export_data()

5.2 数据可视化界面

“`python
import tkinter as tk
from tkinter import ttk
import threading
import time
from datetime import datetime
from pymodbus.client import ModbusTcpClient

class ModbusMonitorGUI:
def init(self, root):
self.root = root
self.root.title(“Modbus设备监控系统”)
self.root.geometry(“800×600”)

    # 设备配置
    self.devices = [
        {'name': 'PLC_1', 'ip': '192.168.1.100', 'port': 502, 'slave_id': 1},
        {'name': 'PLC_2', 'ip': '192.168.1.101', 'port': 502, 'slave_id': 2}
    ]

    # 数据存储
    self.data = {}
    for device in self.devices:
        self.data[device['name']] = {
            'registers': {},
            'last_update': None,
            'status': '离线'
        }

    # 创建界面
    self.setup_ui()

    # 启动监控线程
    self.monitoring = True
    self.monitor_thread = threading.Thread(target=self.monitor_devices)
    self.monitor_thread.daemon = True
    self.monitor_thread.start()

def setup_ui(self):
    """设置用户界面"""
    # 创建主框架
    main_frame = ttk.Frame(self.root, padding="10")
    main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))

    # 设备状态区域
    status_frame = ttk.LabelFrame(main_frame, text="设备状态", padding="10")
    status_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))

    self.status_labels = {}
    for i, device in enumerate(self.devices):
        label = ttk.Label(status_frame, text=f"{device['name']}: 离线")
        label.grid(row=i, column=0, sticky=tk.W, padx=5, pady=2)
        self.status_labels[device['name']] = label

    # 数据展示区域
    data_frame = ttk.LabelFrame(main_frame, text="实时数据", padding="10")
    data_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))

    # 创建Treeview显示数据
    columns = ('设备', '寄存器', '地址', '值', '时间')
    self.tree = ttk.Treeview(data_frame, columns=columns, show='headings', height=15)

    for col in columns:
        self.tree.heading(col, text=col)
        self.tree.column(col, width=100)

    # 添加滚动条
    scrollbar = ttk.Scrollbar(data_frame, orient=tk.VERTICAL, command=self.tree.yview)
    self.tree.configure(yscrollcommand=scrollbar.set)

    self.tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
    scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))

    # 控制按钮区域
    button_frame = ttk.Frame(main_frame)
    button_frame.grid(row=2, column=0, sticky=(tk.W, tk.E))

    ttk.Button(button_frame, text="刷新数据", command=self.manual_refresh).pack(side=tk.LEFT, padx=5)
    ttk.Button(button_frame, text="导出数据", command=self.export_data).pack(side=tk.LEFT, padx=5)
    ttk.Button(button_frame, text="停止监控", command=self.stop_monitoring).pack(side=tk.LEFT, padx=5)

    # 配置网格权重
    main_frame.columnconfigure(0, weight=1)
    main_frame.rowconfigure(1, weight=1)
    data_frame.columnconfigure(0, weight=1)
    data_frame.rowconfigure(0, weight=1)

def read_device_data(self, device_config):
    """读取单个设备的数据"""
    client = ModbusTcpClient(
        host=device_config['ip'],
        port=device_config['port'],
        timeout=3
    )

    try:
        if client.connect():
            # 读取寄存器0-19
            result = client.read_holding_registers(
                address=0,
                count=20,
                slave=device_config['slave_id']
            )

            if not result.isError():
                return {
                    'status': '在线',
                    'data': result.registers,
                    'timestamp': datetime.now()
                }
            else:
                return {
                    'status': '读取错误',
                    'data': None,
                    'timestamp': datetime.now()
                }
        else:
            return {
                'status': '连接失败',
                'data': None,
                'timestamp': datetime.now()
            }

    except Exception as e:
        return {
            'status': f'异常: {str(e)}',
            'data': None,
            'timestamp': datetime.now()
        }
    finally:
        client.close()

def monitor_devices(self):
    """监控设备线程"""
    while self.monitoring:
        for device in self.devices:
            result = self.read_device_data(device)

            # 更新数据存储
            self.data[device['name']].update({
                'registers': result['data'] or {},
                'last_update': result['timestamp'],
                'status': result['status']
            })

            # 更新UI(需要在主线程中执行)
            self.root.after(0, self.update_ui, device['name'], result)

        # 每2秒更新一次
        time.sleep(2)

def update_ui(self, device_name, result):
    """更新用户界面"""
    # 更新状态标签
    status_text = f"{device_name}: {result['status']}"
    if result['timestamp']:
        status_text += f" ({result['timestamp'].strftime('%H:%M:%S')})"

    self.status_labels[device_name].config(text=status_text)

    # 更新颜色
    if result['status'] == '在线':
        self.status_labels[device_name].config(foreground='green')
    elif '错误' in result['status'] or '失败' in result['status']:
        self.status_labels[device_name].config(foreground='red')
    else:
        self.status_labels[device_name].config(foreground='orange')

    # 更新数据表格
    if result['data']:
        # 清除旧数据
        for item in self.tree.get_children():
            if self.tree.item(item)['values'][0] == device_name:
                self.tree.delete(item)

        # 添加新数据
        for i, value in enumerate(result['data']):
            self.tree.insert('', 'end', values=(
                device_name,
                f'400{i+1:02d}',
                i,
                value,
                result['timestamp'].strftime('%H:%M:%S') if result['timestamp'] else ''
            ))

def manual_refresh(self):
    """手动刷新数据"""
    for device in self.devices:
        result = self.read_device_data(device)
        self.update_ui(device['name'], result)

def export_data(self):
    """导出数据到文件"""
    import json
    from datetime import datetime

    export_data = {
        'export_time': datetime.now().isoformat(),
        'devices': {}
    }

    for device_name, device_data in self.data.items():
        export_data['devices'][device_name] = {
            'status': device_data['status'],
            'last_update': device_data['last_update'].isoformat() if device_data['last_update'] else None,
            'registers': device_data['registers']
        }

    filename = f"modbus_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(export_data, f, ensure_ascii=False, indent=2)

    # 显示导出成功消息
    from tkinter import messagebox
    messagebox.showinfo("导出成功", f"数据已导出到: {filename}")

def stop_monitoring(self):
    """停止监控"""
    self.monitoring = False
    self.root.quit()

运行GUI应用

if name == “main“:
root = tk.Tk()
app = ModbusMonitorGUI(root)
root.mainloop()

相关新闻

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

cloud@modbus.cn

QQ
微信