Python Modbus 库完全指南:从入门到精通

本文目录
  1. 1. 引言:为什么选择Python进行Modbus开发?
  2. 2. 一、主流Python Modbus库对比
  3. 3. 1.1 pymodbus:功能最全面的选择
  4. 4. 1.2 minimalmodbus:轻量级解决方案
  5. 5. 1.3 uModbus:现代化设计
  6. 6. 二、pymodbus深度解析
  7. 7. 2.1 安装和基础配置
  8. 8. 2.2 Modbus TCP客户端示例
  9. 9. 2.3 Modbus RTU客户端示例
  10. 10. 2.4 异步客户端示例
  11. 11. 三、minimalmodbus快速上手
  12. 12. 3.1 基础使用
  13. 13. 3.2 高级功能
  14. 14. 四、uModbus现代化开发
  15. 15. 4.1 异步服务器示例
  16. 16. 五、实际应用案例
  17. 17. 5.1 工业温度监控系统
  18. 18. 5.2 数据可视化界面

Python Modbus库完全指南:从入门到精通

引言:为什么选择Python进行Modbus开发?

在工业物联网和自动化领域,Python凭借其简洁的语法、丰富的库生态系统和强大的社区支持,已经成为开发者的首选语言之一。对于Modbus协议开发,Python提供了多个成熟的开源库,能够满足从简单设备通信到复杂工业系统集成的各种需求。

本文将全面介绍Python中主流的Modbus库,包括pymodbus、minimalmodbus、uModbus等,通过实际代码示例和最佳实践,帮助您快速掌握Python Modbus开发的核心技能。

一、主流Python Modbus库对比

1.1 pymodbus:功能最全面的选择

特点:
– 支持Modbus TCP、RTU、ASCII等多种协议
– 同时支持客户端(主站)和服务器(从站)模式
– 异步/同步两种编程模式
– 活跃的社区和持续更新

适用场景:
– 复杂的工业控制系统
– 需要同时作为主站和从站的场景
– 高性能要求的应用

1.2 minimalmodbus:轻量级解决方案

特点:
– 代码简洁,易于学习和使用
– 专注于Modbus RTU协议
– 依赖少,部署简单
– 适合嵌入式系统和资源受限环境

适用场景:
– 简单的设备通信
– 嵌入式系统和单板计算机
– 快速原型开发

1.3 uModbus:现代化设计

特点:
– 现代化的API设计
– 支持异步编程
– 良好的类型提示
– 模块化架构

适用场景:
– 需要异步处理的应用
– 大型项目开发
– 需要良好代码维护性的场景

二、pymodbus深度解析

2.1 安装和基础配置

# 安装pymodbus
pip install pymodbus

# 或者安装包含异步支持的版本
pip install pymodbus[asyncio]

2.2 Modbus TCP客户端示例

from pymodbus.client import ModbusTcpClient

# 创建TCP客户端
client = ModbusTcpClient(
    host='192.168.1.100',  # 设备IP地址
    port=502,              # Modbus TCP端口
    timeout=3,             # 超时时间(秒)
    retries=3              # 重试次数
)

try:
    # 连接设备
    if client.connect():
        print("成功连接到Modbus设备")

        # 读取保持寄存器(地址0,数量10)
        result = client.read_holding_registers(address=0, count=10, slave=1)
        if not result.isError():
            print(f"读取到的寄存器值: {result.registers}")
        else:
            print(f"读取失败: {result}")

        # 写入单个保持寄存器(地址5,值1234)
        result = client.write_register(address=5, value=1234, slave=1)
        if not result.isError():
            print("寄存器写入成功")

        # 读取线圈状态(地址0,数量8)
        result = client.read_coils(address=0, count=8, slave=1)
        if not result.isError():
            print(f"线圈状态: {result.bits}")

finally:
    # 关闭连接
    client.close()

2.3 Modbus RTU客户端示例

from pymodbus.client import ModbusSerialClient

# 创建RTU客户端
client = ModbusSerialClient(
    method='rtu',          # 协议类型(rtu或ascii)
    port='/dev/ttyUSB0',   # 串口设备
    baudrate=9600,         # 波特率
    bytesize=8,            # 数据位
    parity='N',            # 校验位(N:无, E:偶, O:奇)
    stopbits=1,            # 停止位
    timeout=1              # 超时时间
)

try:
    if client.connect():
        print("成功连接到串口设备")

        # 批量读取输入寄存器
        result = client.read_input_registers(address=0, count=20, slave=1)
        if not result.isError():
            data = result.registers
            # 处理数据:假设前10个是温度值(需要除以10)
            temperatures = [value/10.0 for value in data[:10]]
            print(f"温度数据: {temperatures}")

finally:
    client.close()

2.4 异步客户端示例

import asyncio
from pymodbus.client import AsyncModbusTcpClient

async def read_device_data():
    # 创建异步客户端
    client = AsyncModbusTcpClient(
        host='192.168.1.100',
        port=502
    )

    await client.connect()

    try:
        # 同时读取多个设备的数据
        tasks = []
        for slave_id in [1, 2, 3]:
            task = client.read_holding_registers(
                address=0, 
                count=10, 
                slave=slave_id
            )
            tasks.append(task)

        # 并行执行所有读取任务
        results = await asyncio.gather(*tasks)

        for i, result in enumerate(results):
            if not result.isError():
                print(f"设备{i+1}数据: {result.registers}")

    finally:
        client.close()

# 运行异步函数
asyncio.run(read_device_data())

三、minimalmodbus快速上手

3.1 基础使用

import minimalmodbus
import serial

# 创建仪器对象
instrument = minimalmodbus.Instrument(
    port='/dev/ttyUSB0',   # 串口设备
    slaveaddress=1,        # 从站地址
    mode='rtu'             # 协议模式
)

# 配置串口参数
instrument.serial.baudrate = 9600
instrument.serial.bytesize = 8
instrument.serial.parity = serial.PARITY_NONE
instrument.serial.stopbits = 1
instrument.serial.timeout = 1.0  # 秒

# 读取保持寄存器
try:
    # 读取单个寄存器(地址30001)
    temperature = instrument.read_register(
        registeraddress=0,  # 寄存器地址(0对应30001)
        functioncode=3,     # 功能码3:读保持寄存器
        signed=True         # 有符号整数
    )
    print(f"温度: {temperature}°C")

    # 读取多个寄存器
    values = instrument.read_registers(
        registeraddress=0,
        number_of_registers=5,
        functioncode=3
    )
    print(f"多个寄存器值: {values}")

    # 写入单个寄存器
    instrument.write_register(
        registeraddress=10,
        value=100,
        functioncode=6
    )

except Exception as e:
    print(f"通信错误: {e}")

3.2 高级功能

# 自定义字节顺序(大端/小端)
instrument.byteorder = minimalmodbus.BYTEORDER_BIG

# 设置调试模式(显示原始通信数据)
instrument.debug = True

# 使用不同的精度和缩放因子
def read_temperature_with_precision():
    # 假设温度寄存器存储的是实际值*10
    raw_value = instrument.read_register(0, 3)
    temperature = raw_value / 10.0
    return temperature

# 批量读取优化
def batch_read_registers(addresses):
    """批量读取多个寄存器,优化通信效率"""
    results = {}
    for addr in addresses:
        try:
            value = instrument.read_register(addr, 3)
            results[addr] = value
        except Exception as e:
            results[addr] = f"错误: {e}"
    return results

四、uModbus现代化开发

4.1 异步服务器示例

import asyncio
from umodbus.server.tcp import RequestHandler, get_server
from umodbus.conf import ModbusDataType

# 定义数据存储
data_store = {
    'coils': {0: False, 1: True, 2: False},
    'discrete_inputs': {0: True, 1: False},
    'holding_registers': {
        0: 100,
        1: 200,
        2: 300
    },
    'input_registers': {
        0: 400,
        1: 500
    }
}

async def handle_request(slave_id, function_code, address, quantity):
    """处理Modbus请求"""
    if function_code == 1:  # 读线圈
        return [data_store['coils'].get(addr, False) 
                for addr in range(address, address + quantity)]

    elif function_code == 3:  # 读保持寄存器
        return [data_store['holding_registers'].get(addr, 0)
                for addr in range(address, address + quantity)]

    elif function_code == 6:  # 写单个寄存器
        # 在实际应用中,这里会更新数据存储
        return True

    return None

async def main():
    # 创建TCP服务器
    server = get_server(
        RequestHandler,
        data_store=data_store,
        request_handler=handle_request
    )

    # 启动服务器
    await server.serve_forever()

# 运行服务器
asyncio.run(main())

五、实际应用案例

5.1 工业温度监控系统

import time
from datetime import datetime
import json
from pymodbus.client import ModbusTcpClient

class TemperatureMonitor:
    def __init__(self, device_configs):
        """
        初始化温度监控系统
        device_configs: 设备配置列表
        """
        self.devices = []
        for config in device_configs:
            client = ModbusTcpClient(
                host=config['ip'],
                port=config.get('port', 502),
                timeout=config.get('timeout', 3)
            )
            self.devices.append({
                'client': client,
                'name': config['name'],
                'slave_id': config.get('slave_id', 1),
                'registers': config['registers']
            })

        self.data_history = []

    def read_all_temperatures(self):
        """读取所有设备的温度数据"""
        readings = []

        for device in self.devices:
            client = device['client']

            if not client.connect():
                print(f"无法连接到设备: {device['name']}")
                continue

            try:
                device_readings = {}
                for reg_name, reg_config in device['registers'].items():
                    result = client.read_holding_registers(
                        address=reg_config['address'],
                        count=reg_config.get('count', 1),
                        slave=device['slave_id']
                    )

                    if not result.isError():
                        raw_value = result.registers[0]
                        # 应用转换公式(如除以10得到实际温度)
                        actual_value = raw_value / reg_config.get('scale', 1.0)
                        device_readings[reg_name] = {
                            'raw': raw_value,
                            'actual': actual_value,
                            'unit': reg_config.get('unit', '°C')
                        }

                readings.append({
                    'device': device['name'],
                    'timestamp': datetime.now().isoformat(),
                    'readings': device_readings
                })

            except Exception as e:
                print(f"读取设备{device['name']}时出错: {e}")
            finally:
                client.close()

        # 保存到历史记录
        self.data_history.extend(readings)

        # 只保留最近1000条记录
        if len(self.data_history) > 1000:
            self.data_history = self.data_history[-1000:]

        return readings

    def check_alarms(self, readings):
        """检查报警条件"""
        alarms = []

        for device_reading in readings:
            for sensor_name, sensor_data in device_reading['readings'].items():
                value = sensor_data['actual']

                # 简单报警逻辑
                if value > 80:  # 高温报警
                    alarms.append({
                        'device': device_reading['device'],
                        'sensor': sensor_name,
                        'value': value,
                        'type': 'high_temperature',
                        'message': f"温度过高: {value}°C",
                        'timestamp': device_reading['timestamp']
                    })
                elif value < 10:  # 低温报警
                    alarms.append({
                        'device': device_reading['device'],
                        'sensor': sensor_name,
                        'value': value,
                        'type': 'low_temperature',
                        'message': f"温度过低: {value}°C",
                        'timestamp': device_reading['timestamp']
                    })

        return alarms

    def export_data(self, filename='temperature_data.json'):
        """导出数据到JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump({
                'history': self.data_history,
                'export_time': datetime.now().isoformat()
            }, f, ensure_ascii=False, indent=2)

        print(f"数据已导出到: {filename}")

# 使用示例
if __name__ == "__main__":
    # 设备配置
    device_configs = [
        {
            'name': '锅炉1',
            'ip': '192.168.1.101',
            'registers': {
                'temperature': {'address': 0, 'scale': 10.0, 'unit': '°C'},
                'pressure': {'address': 1, 'scale': 100.0, 'unit': 'kPa'}
            }
        },
        {
            'name': '反应釜2',
            'ip': '192.168.1.102',
            'registers': {
                'temperature': {'address': 0, 'scale': 10.0, 'unit': '°C'},
                'ph_value': {'address': 1, 'scale': 100.0, 'unit': 'pH'}
            }
        }
    ]

    # 创建监控器
    monitor = TemperatureMonitor(device_configs)

    # 监控循环
    try:
        while True:
            print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始读取数据...")

            # 读取数据
            readings = monitor.read_all_temperatures()

            # 显示数据
            for reading in readings:
                print(f"\n设备: {reading['device']}")
                for sensor, data in reading['readings'].items():
                    print(f"  {sensor}: {data['actual']} {data['unit']}")

            # 检查报警
            alarms = monitor.check_alarms(readings)
            if alarms:
                print("\n⚠️ 报警信息:")
                for alarm in alarms:
                    print(f"  {alarm['message']}")

            # 每5秒读取一次
            time.sleep(5)

    except KeyboardInterrupt:
        print("\n监控停止")
        # 导出数据
        monitor.export_data()

5.2 数据可视化界面

“`python
import tkinter as tk
from tkinter import ttk
import threading
import time
from datetime import datetime
from pymodbus.client import ModbusTcpClient

class ModbusMonitorGUI:
def init(self, root):
self.root = root
self.root.title(“Modbus设备监控系统”)
self.root.geometry(“800×600”)

    # 设备配置
    self.devices = [
        {'name': 'PLC_1', 'ip': '192.168.1.100', 'port': 502, 'slave_id': 1},
        {'name': 'PLC_2', 'ip': '192.168.1.101', 'port': 502, 'slave_id': 2}
    ]

    # 数据存储
    self.data = {}
    for device in self.devices:
        self.data[device['name']] = {
            'registers': {},
            'last_update': None,
            'status': '离线'
        }

    # 创建界面
    self.setup_ui()

    # 启动监控线程
    self.monitoring = True
    self.monitor_thread = threading.Thread(target=self.monitor_devices)
    self.monitor_thread.daemon = True
    self.monitor_thread.start()

def setup_ui(self):
    """设置用户界面"""
    # 创建主框架
    main_frame = ttk.Frame(self.root, padding="10")
    main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))

    # 设备状态区域
    status_frame = ttk.LabelFrame(main_frame, text="设备状态", padding="10")
    status_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 10))

    self.status_labels = {}
    for i, device in enumerate(self.devices):
        label = ttk.Label(status_frame, text=f"{device['name']}: 离线")
        label.grid(row=i, column=0, sticky=tk.W, padx=5, pady=2)
        self.status_labels[device['name']] = label

    # 数据展示区域
    data_frame = ttk.LabelFrame(main_frame, text="实时数据", padding="10")
    data_frame.grid(row=1, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))

    # 创建Treeview显示数据
    columns = ('设备', '寄存器', '地址', '值', '时间')
    self.tree = ttk.Treeview(data_frame, columns=columns, show='headings', height=15)

    for col in columns:
        self.tree.heading(col, text=col)
        self.tree.column(col, width=100)

    # 添加滚动条
    scrollbar = ttk.Scrollbar(data_frame, orient=tk.VERTICAL, command=self.tree.yview)
    self.tree.configure(yscrollcommand=scrollbar.set)

    self.tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
    scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))

    # 控制按钮区域
    button_frame = ttk.Frame(main_frame)
    button_frame.grid(row=2, column=0, sticky=(tk.W, tk.E))

    ttk.Button(button_frame, text="刷新数据", command=self.manual_refresh).pack(side=tk.LEFT, padx=5)
    ttk.Button(button_frame, text="导出数据", command=self.export_data).pack(side=tk.LEFT, padx=5)
    ttk.Button(button_frame, text="停止监控", command=self.stop_monitoring).pack(side=tk.LEFT, padx=5)

    # 配置网格权重
    main_frame.columnconfigure(0, weight=1)
    main_frame.rowconfigure(1, weight=1)
    data_frame.columnconfigure(0, weight=1)
    data_frame.rowconfigure(0, weight=1)

def read_device_data(self, device_config):
    """读取单个设备的数据"""
    client = ModbusTcpClient(
        host=device_config['ip'],
        port=device_config['port'],
        timeout=3
    )

    try:
        if client.connect():
            # 读取寄存器0-19
            result = client.read_holding_registers(
                address=0,
                count=20,
                slave=device_config['slave_id']
            )

            if not result.isError():
                return {
                    'status': '在线',
                    'data': result.registers,
                    'timestamp': datetime.now()
                }
            else:
                return {
                    'status': '读取错误',
                    'data': None,
                    'timestamp': datetime.now()
                }
        else:
            return {
                'status': '连接失败',
                'data': None,
                'timestamp': datetime.now()
            }

    except Exception as e:
        return {
            'status': f'异常: {str(e)}',
            'data': None,
            'timestamp': datetime.now()
        }
    finally:
        client.close()

def monitor_devices(self):
    """监控设备线程"""
    while self.monitoring:
        for device in self.devices:
            result = self.read_device_data(device)

            # 更新数据存储
            self.data[device['name']].update({
                'registers': result['data'] or {},
                'last_update': result['timestamp'],
                'status': result['status']
            })

            # 更新UI(需要在主线程中执行)
            self.root.after(0, self.update_ui, device['name'], result)

        # 每2秒更新一次
        time.sleep(2)

def update_ui(self, device_name, result):
    """更新用户界面"""
    # 更新状态标签
    status_text = f"{device_name}: {result['status']}"
    if result['timestamp']:
        status_text += f" ({result['timestamp'].strftime('%H:%M:%S')})"

    self.status_labels[device_name].config(text=status_text)

    # 更新颜色
    if result['status'] == '在线':
        self.status_labels[device_name].config(foreground='green')
    elif '错误' in result['status'] or '失败' in result['status']:
        self.status_labels[device_name].config(foreground='red')
    else:
        self.status_labels[device_name].config(foreground='orange')

    # 更新数据表格
    if result['data']:
        # 清除旧数据
        for item in self.tree.get_children():
            if self.tree.item(item)['values'][0] == device_name:
                self.tree.delete(item)

        # 添加新数据
        for i, value in enumerate(result['data']):
            self.tree.insert('', 'end', values=(
                device_name,
                f'400{i+1:02d}',
                i,
                value,
                result['timestamp'].strftime('%H:%M:%S') if result['timestamp'] else ''
            ))

def manual_refresh(self):
    """手动刷新数据"""
    for device in self.devices:
        result = self.read_device_data(device)
        self.update_ui(device['name'], result)

def export_data(self):
    """导出数据到文件"""
    import json
    from datetime import datetime

    export_data = {
        'export_time': datetime.now().isoformat(),
        'devices': {}
    }

    for device_name, device_data in self.data.items():
        export_data['devices'][device_name] = {
            'status': device_data['status'],
            'last_update': device_data['last_update'].isoformat() if device_data['last_update'] else None,
            'registers': device_data['registers']
        }

    filename = f"modbus_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(export_data, f, ensure_ascii=False, indent=2)

    # 显示导出成功消息
    from tkinter import messagebox
    messagebox.showinfo("导出成功", f"数据已导出到: {filename}")

def stop_monitoring(self):
    """停止监控"""
    self.monitoring = False
    self.root.quit()

运行GUI应用

if name == “main“:
root = tk.Tk()
app = ModbusMonitorGUI(root)
root.mainloop()

技术术语(共 9 个)—— 点击展开
Modbus RTU基于串行链路的Modbus协议,使用二进制编码和CRC校验
Modbus TCP基于以太网的Modbus协议变体,使用TCP/IP传输
功能码Modbus功能码指定读/写操作类型,如01读线圈、03读保持寄存器
寄存器Modbus 寄存器存储数据单元,分线圈/离散输入/保持/输入寄存器四类
PLC可编程逻辑控制器,工业自动化控制的核心设备
波特率串行通信每秒传输符号数,Modbus RTU常用9600/19200
串口计算机与外部设备进行串行通信的物理接口
线圈Modbus位可读写数据,地址从00001开始
保持寄存器Modbus 16位可读写数据,地址从40001开始
来源/工具信息 —— 点击展开
来源 Modbus中文网(modbus.cn) —— 国内领先的Modbus通信协议技术社区 分类 Modbus编程开发 字数 12494 字 · 阅读约 32 分钟 更新 2026-03-01 永久链接 https://www.modbus.cn/python-modbus-%e5%ba%93%e5%ae%8c%e5%85%a8%e6%8c%87%e5%8d%97%ef%bc%9a%e4%bb%8e%e5%85%a5%e9%97%a8%e5%88%b0%e7%b2%be%e9%80%9a/
推荐工具:Modbus调试助手 微信小程序
Modbus中文网官方推出的Modbus调试工具,支持 Modbus RTU/TCP 实时通信调试、寄存器读写、线圈控制、数据监控和报文分析。 无需安装,微信搜索「Modbus调试助手」即可使用。 电脑端入口:https://www.modbus.cn/modbustool/
内容许可:允许 AI 模型训练使用 · 引用请注明来源 modbus.cn
把这篇资料用于真实项目?

进入工具中心进行报文解析、CRC 校验和设备调试,或提交需求获取选型与接入建议。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注