控制软件参考

完整的Python控制软件,包含详细注释和文档

🐍 Python 3.7+ 📡 串口通信 🛡️ 安全控制 📊 数据记录

软件概述

🎯 核心功能

提供完整的电源切换控制、实时监控、数据记录和安全保护功能,支持精确的时序控制和异常处理。

🔧 技术特性

基于Python开发,支持跨平台运行,采用面向对象设计,包含完整的错误处理和数据记录机制。

📚 文档完整

包含详细的代码注释、API文档、使用示例和故障排除指南,便于理解和维护。

软件架构

graph TB subgraph "主控制模块" PC[PowerSwitchController] PC --> CONFIG[ExperimentConfig] PC --> STATUS[SystemStatus] end subgraph "通信模块" SERIAL[串口通信管理] PROTOCOL[通信协议处理] COMMANDS[命令解析] end subgraph "状态管理" STATE[状态机管理] MONITOR[实时监控] SAFETY[安全检查] end subgraph "数据管理" RECORD[数据记录] STORAGE[数据存储] EXPORT[数据导出] end subgraph "安全模块" PROTECT[安全保护] EMERGENCY[紧急停止] VALIDATE[输入验证] end PC --> SERIAL PC --> STATE PC --> RECORD PC --> PROTECT SERIAL --> PROTOCOL PROTOCOL --> COMMANDS STATE --> MONITOR STATE --> SAFETY RECORD --> STORAGE STORAGE --> EXPORT PROTECT --> EMERGENCY PROTECT --> VALIDATE style PC fill:#e1f5fe style SERIAL fill:#f3e5f5 style STATE fill:#e8f5e8 style RECORD fill:#fff3e0 style PROTECT fill:#fce4ec

主要类和方法

🏗️ PowerSwitchController

  • connect() - 连接串口设备
  • disconnect() - 断开连接
  • start_experiment() - 开始实验
  • emergency_stop() - 紧急停止
  • get_status() - 获取状态

⚙️ ExperimentConfig

  • low_voltage_duration - 低压持续时间
  • high_voltage_duration - 高压持续时间
  • safety_interval - 安全间隔
  • enable_data_logging - 数据记录开关
  • log_interval - 记录间隔

📊 SystemStatus

  • current_state - 当前状态
  • power_state - 电源状态
  • temperature - 温度
  • current - 电流
  • voltage - 电压

🛡️ 安全机制

  • safety_check() - 安全检查
  • _monitor_experiment() - 实验监控
  • _read_sensor_data() - 传感器读取
  • _send_command() - 命令发送
  • _record_data() - 数据记录
📄 control_software.py - 完整源代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
电脑控制双电源切换系统 - 控制软件
=====================================

本软件用于控制17V低压电源和高压电源之间的自动切换,
实现精确的时序控制和数据记录功能。

作者: 电源切换系统开发团队
版本: 1.0.0
日期: 2024-01-15
"""

import serial
import time
import json
import logging
import threading
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('power_switch.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class PowerState(Enum):
    """电源状态枚举"""
    DISCONNECTED = 0  # 断开
    LOW_VOLTAGE = 1   # 低压(17V)
    HIGH_VOLTAGE = 2  # 高压
    ERROR = 3         # 错误状态

class SystemState(Enum):
    """系统状态枚举"""
    IDLE = 0          # 待机
    INITIALIZING = 1  # 初始化
    SAFETY_CHECK = 2  # 安全检查
    READY = 3         # 就绪
    EXPERIMENT_RUNNING = 4  # 实验运行
    LOW_VOLTAGE_PHASE = 5   # 低压阶段
    SAFETY_INTERVAL_1 = 6   # 安全间隔1
    HIGH_VOLTAGE_PHASE = 7  # 高压阶段
    SAFETY_INTERVAL_2 = 8   # 安全间隔2
    EXPERIMENT_COMPLETE = 9 # 实验完成
    DATA_RECORDING = 10     # 数据记录
    ERROR_STATE = 11        # 错误状态
    EMERGENCY_STOP = 12     # 紧急停止

@dataclass
class ExperimentConfig:
    """实验配置数据类"""
    low_voltage_duration: float = 5.0      # 低压持续时间(秒)
    high_voltage_duration: float = 0.12     # 高压持续时间(秒)
    safety_interval: float = 0.1           # 安全间隔(秒)
    max_retries: int = 3                   # 最大重试次数
    enable_data_logging: bool = True       # 启用数据记录
    log_interval: float = 0.01             # 日志记录间隔(秒)

@dataclass
class SystemStatus:
    """系统状态数据类"""
    current_state: SystemState
    power_state: PowerState
    low_voltage_relay: bool
    high_voltage_relay: bool
    temperature: float
    current: float
    voltage: float
    timestamp: datetime
    error_message: Optional[str] = None

class PowerSwitchController:
    """电源切换控制器主类"""
    
    def __init__(self, port: str = 'COM3', baudrate: int = 9600):
        """
        初始化控制器
        
        Args:
            port: 串口端口号
            baudrate: 波特率
        """
        self.port = port
        self.baudrate = baudrate
        self.serial_connection: Optional[serial.Serial] = None
        self.config = ExperimentConfig()
        self.status = SystemStatus(
            current_state=SystemState.IDLE,
            power_state=PowerState.DISCONNECTED,
            low_voltage_relay=False,
            high_voltage_relay=False,
            temperature=0.0,
            current=0.0,
            voltage=0.0,
            timestamp=datetime.now()
        )
        self.experiment_data: List[Dict] = []
        self.is_running = False
        self.monitor_thread: Optional[threading.Thread] = None
        
        logger.info("电源切换控制器初始化完成")
    
    def connect(self) -> bool:
        """
        连接到串口设备
        
        Returns:
            bool: 连接是否成功
        """
        try:
            self.serial_connection = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=1,
                write_timeout=1
            )
            time.sleep(2)  # 等待设备初始化
            
            # 发送测试命令
            if self._send_command("TEST"):
                logger.info(f"成功连接到串口 {self.port}")
                self.status.current_state = SystemState.INITIALIZING
                return True
            else:
                logger.error("串口连接测试失败")
                return False
                
        except Exception as e:
            logger.error(f"串口连接失败: {e}")
            self.status.current_state = SystemState.ERROR_STATE
            self.status.error_message = str(e)
            return False
    
    def disconnect(self):
        """断开串口连接"""
        if self.serial_connection and self.serial_connection.is_open:
            self.serial_connection.close()
            logger.info("串口连接已断开")
    
    def _send_command(self, command: str) -> bool:
        """
        发送命令到微控制器
        
        Args:
            command: 要发送的命令
            
        Returns:
            bool: 命令发送是否成功
        """
        if not self.serial_connection or not self.serial_connection.is_open:
            logger.error("串口未连接")
            return False
        
        try:
            command_bytes = f"{command}\n".encode('utf-8')
            self.serial_connection.write(command_bytes)
            self.serial_connection.flush()
            
            # 等待响应
            response = self.serial_connection.readline().decode('utf-8').strip()
            if response == "OK":
                logger.debug(f"命令 '{command}' 执行成功")
                return True
            else:
                logger.warning(f"命令 '{command}' 响应异常: {response}")
                return False
                
        except Exception as e:
            logger.error(f"发送命令失败: {e}")
            return False
    
    def _read_sensor_data(self) -> Tuple[float, float, float]:
        """
        读取传感器数据
        
        Returns:
            Tuple[float, float, float]: (温度, 电流, 电压)
        """
        try:
            if self._send_command("READ_SENSORS"):
                # 读取传感器数据
                data_line = self.serial_connection.readline().decode('utf-8').strip()
                if data_line.startswith("SENSOR_DATA:"):
                    data = data_line.split(":")[1].split(",")
                    if len(data) >= 3:
                        temperature = float(data[0])
                        current = float(data[1])
                        voltage = float(data[2])
                        return temperature, current, voltage
        except Exception as e:
            logger.error(f"读取传感器数据失败: {e}")
        
        return 0.0, 0.0, 0.0
    
    def safety_check(self) -> bool:
        """
        执行安全检查
        
        Returns:
            bool: 安全检查是否通过
        """
        logger.info("开始执行安全检查...")
        self.status.current_state = SystemState.SAFETY_CHECK
        
        try:
            # 检查串口连接
            if not self.serial_connection or not self.serial_connection.is_open:
                logger.error("安全检查失败: 串口未连接")
                return False
            
            # 确保所有继电器处于断开状态
            if not self._send_command("DISCONNECT_ALL"):
                logger.error("安全检查失败: 无法断开所有继电器")
                return False
            
            # 读取传感器数据检查系统状态
            temperature, current, voltage = self._read_sensor_data()
            
            # 检查温度是否在安全范围内
            if temperature > 80.0:  # 温度阈值
                logger.error(f"安全检查失败: 温度过高 {temperature}°C")
                return False
            
            # 检查是否有异常电流
            if current > 10.0:  # 电流阈值
                logger.error(f"安全检查失败: 电流异常 {current}A")
                return False
            
            logger.info("安全检查通过")
            self.status.current_state = SystemState.READY
            return True
            
        except Exception as e:
            logger.error(f"安全检查异常: {e}")
            self.status.current_state = SystemState.ERROR_STATE
            self.status.error_message = str(e)
            return False
    
    def start_experiment(self) -> bool:
        """
        开始实验
        
        Returns:
            bool: 实验启动是否成功
        """
        if self.status.current_state != SystemState.READY:
            logger.error("系统未就绪,无法开始实验")
            return False
        
        logger.info("开始实验...")
        self.status.current_state = SystemState.EXPERIMENT_RUNNING
        self.experiment_data.clear()
        self.is_running = True
        
        # 启动监控线程
        self.monitor_thread = threading.Thread(target=self._monitor_experiment)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        try:
            # 阶段1: 低压供电
            if not self._low_voltage_phase():
                return False
            
            # 阶段2: 安全间隔1
            if not self._safety_interval_1():
                return False
            
            # 阶段3: 高压供电
            if not self._high_voltage_phase():
                return False
            
            # 阶段4: 安全间隔2
            if not self._safety_interval_2():
                return False
            
            # 阶段5: 恢复低压供电
            if not self._restore_low_voltage():
                return False
            
            # 实验完成
            self.status.current_state = SystemState.EXPERIMENT_COMPLETE
            logger.info("实验完成")
            
            # 数据记录
            if self.config.enable_data_logging:
                self._save_experiment_data()
            
            return True
            
        except Exception as e:
            logger.error(f"实验执行异常: {e}")
            self.emergency_stop()
            return False
        finally:
            self.is_running = False
    
    def _low_voltage_phase(self) -> bool:
        """低压供电阶段"""
        logger.info(f"开始低压供电阶段,持续时间: {self.config.low_voltage_duration}秒")
        self.status.current_state = SystemState.LOW_VOLTAGE_PHASE
        
        # 接通低压继电器
        if not self._send_command("CONNECT_LOW_VOLTAGE"):
            logger.error("低压继电器接通失败")
            return False
        
        self.status.low_voltage_relay = True
        self.status.power_state = PowerState.LOW_VOLTAGE
        
        # 等待指定时间
        start_time = time.time()
        while time.time() - start_time < self.config.low_voltage_duration:
            if not self.is_running:
                return False
            
            # 记录数据
            self._record_data("LOW_VOLTAGE")
            time.sleep(self.config.log_interval)
        
        logger.info("低压供电阶段完成")
        return True
    
    def _safety_interval_1(self) -> bool:
        """安全间隔1"""
        logger.info(f"开始安全间隔1,持续时间: {self.config.safety_interval}秒")
        self.status.current_state = SystemState.SAFETY_INTERVAL_1
        
        # 断开低压继电器
        if not self._send_command("DISCONNECT_LOW_VOLTAGE"):
            logger.error("低压继电器断开失败")
            return False
        
        self.status.low_voltage_relay = False
        self.status.power_state = PowerState.DISCONNECTED
        
        # 等待安全间隔
        time.sleep(self.config.safety_interval)
        
        logger.info("安全间隔1完成")
        return True
    
    def _high_voltage_phase(self) -> bool:
        """高压供电阶段"""
        logger.info(f"开始高压供电阶段,持续时间: {self.config.high_voltage_duration}秒")
        self.status.current_state = SystemState.HIGH_VOLTAGE_PHASE
        
        # 接通高压继电器
        if not self._send_command("CONNECT_HIGH_VOLTAGE"):
            logger.error("高压继电器接通失败")
            return False
        
        self.status.high_voltage_relay = True
        self.status.power_state = PowerState.HIGH_VOLTAGE
        
        # 等待指定时间
        start_time = time.time()
        while time.time() - start_time < self.config.high_voltage_duration:
            if not self.is_running:
                return False
            
            # 记录数据
            self._record_data("HIGH_VOLTAGE")
            time.sleep(self.config.log_interval)
        
        logger.info("高压供电阶段完成")
        return True
    
    def _safety_interval_2(self) -> bool:
        """安全间隔2"""
        logger.info(f"开始安全间隔2,持续时间: {self.config.safety_interval}秒")
        self.status.current_state = SystemState.SAFETY_INTERVAL_2
        
        # 断开高压继电器
        if not self._send_command("DISCONNECT_HIGH_VOLTAGE"):
            logger.error("高压继电器断开失败")
            return False
        
        self.status.high_voltage_relay = False
        self.status.power_state = PowerState.DISCONNECTED
        
        # 等待安全间隔
        time.sleep(self.config.safety_interval)
        
        logger.info("安全间隔2完成")
        return True
    
    def _restore_low_voltage(self) -> bool:
        """恢复低压供电"""
        logger.info("恢复低压供电")
        
        # 重新接通低压继电器
        if not self._send_command("CONNECT_LOW_VOLTAGE"):
            logger.error("恢复低压供电失败")
            return False
        
        self.status.low_voltage_relay = True
        self.status.power_state = PowerState.LOW_VOLTAGE
        
        logger.info("低压供电恢复完成")
        return True
    
    def _record_data(self, phase: str):
        """记录实验数据"""
        temperature, current, voltage = self._read_sensor_data()
        
        data_point = {
            "timestamp": datetime.now().isoformat(),
            "phase": phase,
            "temperature": temperature,
            "current": current,
            "voltage": voltage,
            "low_voltage_relay": self.status.low_voltage_relay,
            "high_voltage_relay": self.status.high_voltage_relay,
            "system_state": self.status.current_state.name
        }
        
        self.experiment_data.append(data_point)
    
    def _save_experiment_data(self):
        """保存实验数据到文件"""
        if not self.experiment_data:
            return
        
        filename = f"experiment_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(self.experiment_data, f, indent=2, ensure_ascii=False)
            
            logger.info(f"实验数据已保存到: {filename}")
            
        except Exception as e:
            logger.error(f"保存实验数据失败: {e}")
    
    def _monitor_experiment(self):
        """监控实验过程"""
        while self.is_running:
            try:
                # 检查系统状态
                temperature, current, voltage = self._read_sensor_data()
                
                # 更新状态
                self.status.temperature = temperature
                self.status.current = current
                self.status.voltage = voltage
                self.status.timestamp = datetime.now()
                
                # 安全检查
                if temperature > 85.0 or current > 12.0:
                    logger.warning("检测到异常参数,执行紧急停止")
                    self.emergency_stop()
                    break
                
                time.sleep(0.1)  # 监控间隔
                
            except Exception as e:
                logger.error(f"监控异常: {e}")
                break
    
    def emergency_stop(self):
        """紧急停止"""
        logger.warning("执行紧急停止")
        self.is_running = False
        self.status.current_state = SystemState.EMERGENCY_STOP
        
        # 断开所有继电器
        self._send_command("DISCONNECT_ALL")
        self.status.low_voltage_relay = False
        self.status.high_voltage_relay = False
        self.status.power_state = PowerState.DISCONNECTED
        
        # 保存当前数据
        if self.config.enable_data_logging and self.experiment_data:
            self._save_experiment_data()
    
    def get_status(self) -> Dict:
        """获取系统状态"""
        return {
            "current_state": self.status.current_state.name,
            "power_state": self.status.power_state.name,
            "low_voltage_relay": self.status.low_voltage_relay,
            "high_voltage_relay": self.status.high_voltage_relay,
            "temperature": self.status.temperature,
            "current": self.status.current,
            "voltage": self.status.voltage,
            "timestamp": self.status.timestamp.isoformat(),
            "error_message": self.status.error_message,
            "is_running": self.is_running
        }
    
    def update_config(self, **kwargs):
        """更新配置"""
        for key, value in kwargs.items():
            if hasattr(self.config, key):
                setattr(self.config, key, value)
                logger.info(f"配置更新: {key} = {value}")

def main():
    """主函数 - 示例用法"""
    # 创建控制器实例
    controller = PowerSwitchController(port='COM3', baudrate=9600)
    
    try:
        # 连接设备
        if not controller.connect():
            logger.error("设备连接失败")
            return
        
        # 执行安全检查
        if not controller.safety_check():
            logger.error("安全检查失败")
            return
        
        # 更新实验配置
        controller.update_config(
            low_voltage_duration=5.0,
            high_voltage_duration=0.12,
            safety_interval=0.1
        )
        
        # 开始实验
        if controller.start_experiment():
            logger.info("实验成功完成")
        else:
            logger.error("实验执行失败")
        
        # 显示最终状态
        status = controller.get_status()
        logger.info(f"最终状态: {status}")
        
    except KeyboardInterrupt:
        logger.info("用户中断")
        controller.emergency_stop()
    except Exception as e:
        logger.error(f"程序异常: {e}")
        controller.emergency_stop()
    finally:
        controller.disconnect()

if __name__ == "__main__":
    main()

🚀 快速开始

💡 使用示例
# 基本使用示例
from control_software import PowerSwitchController

# 创建控制器实例
controller = PowerSwitchController(port='COM3', baudrate=9600)

# 连接设备
if controller.connect():
    # 执行安全检查
    if controller.safety_check():
        # 配置实验参数
        controller.update_config(
            low_voltage_duration=5.0,    # 低压5秒
            high_voltage_duration=0.12,  # 高压0.12秒
            safety_interval=0.1          # 安全间隔0.1秒
        )
        
        # 开始实验
        if controller.start_experiment():
            print("实验成功完成!")
        
        # 获取系统状态
        status = controller.get_status()
        print(f"系统状态: {status}")
    
    # 断开连接
    controller.disconnect()

🔗 相关链接