Dec 19, 2024 03:56
#!/usr/bin/env python3
"""
Enhanced Climate-Safe HVAC Controller
-----------------------------------
Optimized for safety, efficiency, and reliability with focus on:
- Predictive maintenance
- Multi-sensor redundancy
- Fail-safe operations
- Energy optimization
- Air quality monitoring
"""
import time
import sys
import os
import json
import math
import logging
from threading import Thread, Event, Lock
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import yaml
# Enhanced logging with rotating file handler
import logging.handlers
logger = logging.getLogger('hvac_controller')
logger.setLevel(logging.INFO)
handler = logging.handlers.RotatingFileHandler(
'hvac_controller.log', maxBytes=1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(handler)
class SystemState(Enum):
NORMAL = "normal"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
@dataclass
class EnvironmentData:
temperature: float
humidity: float
co2_level: Optional[float] = None
voc_level: Optional[float] = None
pm25_level: Optional[float] = None
class SafetyMonitor:
def __init__(self, config: dict):
self.config = config
self.state = SystemState.NORMAL
self.consecutive_warnings = 0
self.warning_threshold = config.get('safety.warning_threshold', 3)
def check_readings(self, data: EnvironmentData) -> SystemState:
# Enhanced safety checks
if data.temperature > self.config.get('safety.critical_temp', 45.0):
self.state = SystemState.EMERGENCY
logger.critical(f"EMERGENCY: Temperature critical at {data.temperature}°C")
return self.state
if data.co2_level and data.co2_level > self.config.get('safety.max_co2_ppm', 1500):
self.consecutive_warnings += 1
self.state = SystemState.WARNING
logger.warning(f"High CO2 levels detected: {data.co2_level} ppm")
else:
self.consecutive_warnings = max(0, self.consecutive_warnings - 1)
if self.consecutive_warnings >= self.warning_threshold:
self.state = SystemState.CRITICAL
return self.state
class EnhancedPIDController:
def __init__(self, kp: float, ki: float, kd: float, min_output: float = 0.0, max_output: float = 1.0):
self.kp = kp
self.ki = ki
self.kd = kd
self.min_output = min_output
self.max_output = max_output
self.integral = 0.0
self.last_error = None
self.last_output = None
def update(self, error: float, dt: float) -> float:
# Anti-windup logic
if self.last_output is not None:
if self.last_output >= self.max_output and error > 0:
self.integral = max(0, self.integral)
elif self.last_output <= self.min_output and error < 0:
self.integral = min(0, self.integral)
else:
self.integral += error * dt
derivative = 0.0 if self.last_error is None else (error - self.last_error) / dt
output = (self.kp * error +
self.ki * self.integral +
self.kd * derivative)
output = max(self.min_output, min(self.max_output, output))
self.last_error = error
self.last_output = output
return output
class EnergyOptimizer:
def __init__(self, config: dict):
self.config = config
self.comfort_range = config.get('comfort.temp_range', 2.0)
self.energy_mode = config.get('energy.mode', 'balanced')
def optimize_setpoint(self, current_temp: float, target_temp: float,
outside_temp: float) -> float:
if self.energy_mode == 'eco':
# Allow wider temperature swings to save energy
if abs(current_temp - target_temp) < self.comfort_range:
return current_temp
# Use outside temperature to optimize HVAC operation
if outside_temp < target_temp: # Heating mode
return min(target_temp, current_temp + 0.5)
else: # Cooling mode
return max(target_temp, current_temp - 0.5)
class EnhancedController:
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = yaml.safe_load(f)
self.safety = SafetyMonitor(self.config)
self.energy = EnergyOptimizer(self.config)
self.temp_pid = EnhancedPIDController(
kp=self.config.get('control.temp.kp', 1.0),
ki=self.config.get('control.temp.ki', 0.1),
kd=self.config.get('control.temp.kd', 0.05)
)
self.stop_event = Event()
self.lock = Lock()
def run(self):
while not self.stop_event.is_set():
try:
with self.lock:
env_data = self._get_environment_data()
state = self.safety.check_readings(env_data)
if state in (SystemState.CRITICAL, SystemState.EMERGENCY):
self._emergency_shutdown(f"Safety critical: {state}")
break
self._update_control_loop(env_data)
except Exception as e:
logger.error(f"Control loop error: {e}")
if self._should_emergency_stop(e):
self._emergency_shutdown(str(e))
break
time.sleep(self.config.get('control.interval', 1.0))
def _get_environment_data(self) -> EnvironmentData:
# Implement sensor reading with redundancy
pass
def _update_control_loop(self, env_data: EnvironmentData):
# Implement control logic with energy optimization
pass
def _emergency_shutdown(self, reason: str):
logger.critical(f"EMERGENCY SHUTDOWN: {reason}")
# Implement safe shutdown sequence
self.stop_event.set()
def _should_emergency_stop(self, error: Exception) -> bool:
# Define which errors should trigger emergency stop
return isinstance(error, (
IOError, # Hardware communication errors
ValueError, # Invalid sensor readings
RuntimeError # General runtime errors
))
if __name__ == "__main__":
controller = EnhancedController("config.yaml")
try:
controller.run()
except KeyboardInterrupt:
controller.stop_event.set()
logger.info("Graceful shutdown initiated")