完整的Python控制软件,包含详细注释和文档
提供完整的电源切换控制、实时监控、数据记录和安全保护功能,支持精确的时序控制和异常处理。
基于Python开发,支持跨平台运行,采用面向对象设计,包含完整的错误处理和数据记录机制。
包含详细的代码注释、API文档、使用示例和故障排除指南,便于理解和维护。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
电脑控制双电源切换系统 - 控制软件
=====================================
本软件用于控制17V低压电源和高压电源之间的自动切换,
实现精确的时序控制和数据记录功能。
作者: 电源切换系统开发团队
版本: 1.0.0
日期: 2024-01-15
"""
import serial
import time
import json
import logging
import threading
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('power_switch.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class PowerState(Enum):
"""电源状态枚举"""
DISCONNECTED = 0 # 断开
LOW_VOLTAGE = 1 # 低压(17V)
HIGH_VOLTAGE = 2 # 高压
ERROR = 3 # 错误状态
class SystemState(Enum):
"""系统状态枚举"""
IDLE = 0 # 待机
INITIALIZING = 1 # 初始化
SAFETY_CHECK = 2 # 安全检查
READY = 3 # 就绪
EXPERIMENT_RUNNING = 4 # 实验运行
LOW_VOLTAGE_PHASE = 5 # 低压阶段
SAFETY_INTERVAL_1 = 6 # 安全间隔1
HIGH_VOLTAGE_PHASE = 7 # 高压阶段
SAFETY_INTERVAL_2 = 8 # 安全间隔2
EXPERIMENT_COMPLETE = 9 # 实验完成
DATA_RECORDING = 10 # 数据记录
ERROR_STATE = 11 # 错误状态
EMERGENCY_STOP = 12 # 紧急停止
@dataclass
class ExperimentConfig:
"""实验配置数据类"""
low_voltage_duration: float = 5.0 # 低压持续时间(秒)
high_voltage_duration: float = 0.12 # 高压持续时间(秒)
safety_interval: float = 0.1 # 安全间隔(秒)
max_retries: int = 3 # 最大重试次数
enable_data_logging: bool = True # 启用数据记录
log_interval: float = 0.01 # 日志记录间隔(秒)
@dataclass
class SystemStatus:
"""系统状态数据类"""
current_state: SystemState
power_state: PowerState
low_voltage_relay: bool
high_voltage_relay: bool
temperature: float
current: float
voltage: float
timestamp: datetime
error_message: Optional[str] = None
class PowerSwitchController:
"""电源切换控制器主类"""
def __init__(self, port: str = 'COM3', baudrate: int = 9600):
"""
初始化控制器
Args:
port: 串口端口号
baudrate: 波特率
"""
self.port = port
self.baudrate = baudrate
self.serial_connection: Optional[serial.Serial] = None
self.config = ExperimentConfig()
self.status = SystemStatus(
current_state=SystemState.IDLE,
power_state=PowerState.DISCONNECTED,
low_voltage_relay=False,
high_voltage_relay=False,
temperature=0.0,
current=0.0,
voltage=0.0,
timestamp=datetime.now()
)
self.experiment_data: List[Dict] = []
self.is_running = False
self.monitor_thread: Optional[threading.Thread] = None
logger.info("电源切换控制器初始化完成")
def connect(self) -> bool:
"""
连接到串口设备
Returns:
bool: 连接是否成功
"""
try:
self.serial_connection = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1,
write_timeout=1
)
time.sleep(2) # 等待设备初始化
# 发送测试命令
if self._send_command("TEST"):
logger.info(f"成功连接到串口 {self.port}")
self.status.current_state = SystemState.INITIALIZING
return True
else:
logger.error("串口连接测试失败")
return False
except Exception as e:
logger.error(f"串口连接失败: {e}")
self.status.current_state = SystemState.ERROR_STATE
self.status.error_message = str(e)
return False
def disconnect(self):
"""断开串口连接"""
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
logger.info("串口连接已断开")
def _send_command(self, command: str) -> bool:
"""
发送命令到微控制器
Args:
command: 要发送的命令
Returns:
bool: 命令发送是否成功
"""
if not self.serial_connection or not self.serial_connection.is_open:
logger.error("串口未连接")
return False
try:
command_bytes = f"{command}\n".encode('utf-8')
self.serial_connection.write(command_bytes)
self.serial_connection.flush()
# 等待响应
response = self.serial_connection.readline().decode('utf-8').strip()
if response == "OK":
logger.debug(f"命令 '{command}' 执行成功")
return True
else:
logger.warning(f"命令 '{command}' 响应异常: {response}")
return False
except Exception as e:
logger.error(f"发送命令失败: {e}")
return False
def _read_sensor_data(self) -> Tuple[float, float, float]:
"""
读取传感器数据
Returns:
Tuple[float, float, float]: (温度, 电流, 电压)
"""
try:
if self._send_command("READ_SENSORS"):
# 读取传感器数据
data_line = self.serial_connection.readline().decode('utf-8').strip()
if data_line.startswith("SENSOR_DATA:"):
data = data_line.split(":")[1].split(",")
if len(data) >= 3:
temperature = float(data[0])
current = float(data[1])
voltage = float(data[2])
return temperature, current, voltage
except Exception as e:
logger.error(f"读取传感器数据失败: {e}")
return 0.0, 0.0, 0.0
def safety_check(self) -> bool:
"""
执行安全检查
Returns:
bool: 安全检查是否通过
"""
logger.info("开始执行安全检查...")
self.status.current_state = SystemState.SAFETY_CHECK
try:
# 检查串口连接
if not self.serial_connection or not self.serial_connection.is_open:
logger.error("安全检查失败: 串口未连接")
return False
# 确保所有继电器处于断开状态
if not self._send_command("DISCONNECT_ALL"):
logger.error("安全检查失败: 无法断开所有继电器")
return False
# 读取传感器数据检查系统状态
temperature, current, voltage = self._read_sensor_data()
# 检查温度是否在安全范围内
if temperature > 80.0: # 温度阈值
logger.error(f"安全检查失败: 温度过高 {temperature}°C")
return False
# 检查是否有异常电流
if current > 10.0: # 电流阈值
logger.error(f"安全检查失败: 电流异常 {current}A")
return False
logger.info("安全检查通过")
self.status.current_state = SystemState.READY
return True
except Exception as e:
logger.error(f"安全检查异常: {e}")
self.status.current_state = SystemState.ERROR_STATE
self.status.error_message = str(e)
return False
def start_experiment(self) -> bool:
"""
开始实验
Returns:
bool: 实验启动是否成功
"""
if self.status.current_state != SystemState.READY:
logger.error("系统未就绪,无法开始实验")
return False
logger.info("开始实验...")
self.status.current_state = SystemState.EXPERIMENT_RUNNING
self.experiment_data.clear()
self.is_running = True
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._monitor_experiment)
self.monitor_thread.daemon = True
self.monitor_thread.start()
try:
# 阶段1: 低压供电
if not self._low_voltage_phase():
return False
# 阶段2: 安全间隔1
if not self._safety_interval_1():
return False
# 阶段3: 高压供电
if not self._high_voltage_phase():
return False
# 阶段4: 安全间隔2
if not self._safety_interval_2():
return False
# 阶段5: 恢复低压供电
if not self._restore_low_voltage():
return False
# 实验完成
self.status.current_state = SystemState.EXPERIMENT_COMPLETE
logger.info("实验完成")
# 数据记录
if self.config.enable_data_logging:
self._save_experiment_data()
return True
except Exception as e:
logger.error(f"实验执行异常: {e}")
self.emergency_stop()
return False
finally:
self.is_running = False
def _low_voltage_phase(self) -> bool:
"""低压供电阶段"""
logger.info(f"开始低压供电阶段,持续时间: {self.config.low_voltage_duration}秒")
self.status.current_state = SystemState.LOW_VOLTAGE_PHASE
# 接通低压继电器
if not self._send_command("CONNECT_LOW_VOLTAGE"):
logger.error("低压继电器接通失败")
return False
self.status.low_voltage_relay = True
self.status.power_state = PowerState.LOW_VOLTAGE
# 等待指定时间
start_time = time.time()
while time.time() - start_time < self.config.low_voltage_duration:
if not self.is_running:
return False
# 记录数据
self._record_data("LOW_VOLTAGE")
time.sleep(self.config.log_interval)
logger.info("低压供电阶段完成")
return True
def _safety_interval_1(self) -> bool:
"""安全间隔1"""
logger.info(f"开始安全间隔1,持续时间: {self.config.safety_interval}秒")
self.status.current_state = SystemState.SAFETY_INTERVAL_1
# 断开低压继电器
if not self._send_command("DISCONNECT_LOW_VOLTAGE"):
logger.error("低压继电器断开失败")
return False
self.status.low_voltage_relay = False
self.status.power_state = PowerState.DISCONNECTED
# 等待安全间隔
time.sleep(self.config.safety_interval)
logger.info("安全间隔1完成")
return True
def _high_voltage_phase(self) -> bool:
"""高压供电阶段"""
logger.info(f"开始高压供电阶段,持续时间: {self.config.high_voltage_duration}秒")
self.status.current_state = SystemState.HIGH_VOLTAGE_PHASE
# 接通高压继电器
if not self._send_command("CONNECT_HIGH_VOLTAGE"):
logger.error("高压继电器接通失败")
return False
self.status.high_voltage_relay = True
self.status.power_state = PowerState.HIGH_VOLTAGE
# 等待指定时间
start_time = time.time()
while time.time() - start_time < self.config.high_voltage_duration:
if not self.is_running:
return False
# 记录数据
self._record_data("HIGH_VOLTAGE")
time.sleep(self.config.log_interval)
logger.info("高压供电阶段完成")
return True
def _safety_interval_2(self) -> bool:
"""安全间隔2"""
logger.info(f"开始安全间隔2,持续时间: {self.config.safety_interval}秒")
self.status.current_state = SystemState.SAFETY_INTERVAL_2
# 断开高压继电器
if not self._send_command("DISCONNECT_HIGH_VOLTAGE"):
logger.error("高压继电器断开失败")
return False
self.status.high_voltage_relay = False
self.status.power_state = PowerState.DISCONNECTED
# 等待安全间隔
time.sleep(self.config.safety_interval)
logger.info("安全间隔2完成")
return True
def _restore_low_voltage(self) -> bool:
"""恢复低压供电"""
logger.info("恢复低压供电")
# 重新接通低压继电器
if not self._send_command("CONNECT_LOW_VOLTAGE"):
logger.error("恢复低压供电失败")
return False
self.status.low_voltage_relay = True
self.status.power_state = PowerState.LOW_VOLTAGE
logger.info("低压供电恢复完成")
return True
def _record_data(self, phase: str):
"""记录实验数据"""
temperature, current, voltage = self._read_sensor_data()
data_point = {
"timestamp": datetime.now().isoformat(),
"phase": phase,
"temperature": temperature,
"current": current,
"voltage": voltage,
"low_voltage_relay": self.status.low_voltage_relay,
"high_voltage_relay": self.status.high_voltage_relay,
"system_state": self.status.current_state.name
}
self.experiment_data.append(data_point)
def _save_experiment_data(self):
"""保存实验数据到文件"""
if not self.experiment_data:
return
filename = f"experiment_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.experiment_data, f, indent=2, ensure_ascii=False)
logger.info(f"实验数据已保存到: {filename}")
except Exception as e:
logger.error(f"保存实验数据失败: {e}")
def _monitor_experiment(self):
"""监控实验过程"""
while self.is_running:
try:
# 检查系统状态
temperature, current, voltage = self._read_sensor_data()
# 更新状态
self.status.temperature = temperature
self.status.current = current
self.status.voltage = voltage
self.status.timestamp = datetime.now()
# 安全检查
if temperature > 85.0 or current > 12.0:
logger.warning("检测到异常参数,执行紧急停止")
self.emergency_stop()
break
time.sleep(0.1) # 监控间隔
except Exception as e:
logger.error(f"监控异常: {e}")
break
def emergency_stop(self):
"""紧急停止"""
logger.warning("执行紧急停止")
self.is_running = False
self.status.current_state = SystemState.EMERGENCY_STOP
# 断开所有继电器
self._send_command("DISCONNECT_ALL")
self.status.low_voltage_relay = False
self.status.high_voltage_relay = False
self.status.power_state = PowerState.DISCONNECTED
# 保存当前数据
if self.config.enable_data_logging and self.experiment_data:
self._save_experiment_data()
def get_status(self) -> Dict:
"""获取系统状态"""
return {
"current_state": self.status.current_state.name,
"power_state": self.status.power_state.name,
"low_voltage_relay": self.status.low_voltage_relay,
"high_voltage_relay": self.status.high_voltage_relay,
"temperature": self.status.temperature,
"current": self.status.current,
"voltage": self.status.voltage,
"timestamp": self.status.timestamp.isoformat(),
"error_message": self.status.error_message,
"is_running": self.is_running
}
def update_config(self, **kwargs):
"""更新配置"""
for key, value in kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
logger.info(f"配置更新: {key} = {value}")
def main():
"""主函数 - 示例用法"""
# 创建控制器实例
controller = PowerSwitchController(port='COM3', baudrate=9600)
try:
# 连接设备
if not controller.connect():
logger.error("设备连接失败")
return
# 执行安全检查
if not controller.safety_check():
logger.error("安全检查失败")
return
# 更新实验配置
controller.update_config(
low_voltage_duration=5.0,
high_voltage_duration=0.12,
safety_interval=0.1
)
# 开始实验
if controller.start_experiment():
logger.info("实验成功完成")
else:
logger.error("实验执行失败")
# 显示最终状态
status = controller.get_status()
logger.info(f"最终状态: {status}")
except KeyboardInterrupt:
logger.info("用户中断")
controller.emergency_stop()
except Exception as e:
logger.error(f"程序异常: {e}")
controller.emergency_stop()
finally:
controller.disconnect()
if __name__ == "__main__":
main()
# 基本使用示例
from control_software import PowerSwitchController
# 创建控制器实例
controller = PowerSwitchController(port='COM3', baudrate=9600)
# 连接设备
if controller.connect():
# 执行安全检查
if controller.safety_check():
# 配置实验参数
controller.update_config(
low_voltage_duration=5.0, # 低压5秒
high_voltage_duration=0.12, # 高压0.12秒
safety_interval=0.1 # 安全间隔0.1秒
)
# 开始实验
if controller.start_experiment():
print("实验成功完成!")
# 获取系统状态
status = controller.get_status()
print(f"系统状态: {status}")
# 断开连接
controller.disconnect()