import RPi.GPIO as GPIO
import numpy as np
import asyncio, aiohttp, logging, json, pickle
from datetime import datetime
from typing import Dict, Any, Tuple, Optional
from collections import deque
from tensorflow.lite.interpreter import Interpreter
from cryptography.fernet import Fernet
from logging.handlers import RotatingFileHandler
from scipy.signal import savgol_filter
from sklearn.preprocessing import RobustScaler
logging.basicConfig(level=logging.INFO, handlers=[RotatingFileHandler('secure_system.log', maxBytes=10485760, backupCount=5)])
class SecureLogger:
def __init__(self, key: bytes):
self.fernet = Fernet(key)
self.logger = logging.getLogger('SecureLogger')
def log(self, level: int, msg: str):
self.logger.log(level, self.fernet.encrypt(msg.encode()).decode())
class SignalProcessor:
def __init__(self):
self.scaler = RobustScaler()
def process(self, signal: np.ndarray) -> Tuple[np.ndarray, Dict[str, float]]:
smoothed = savgol_filter(signal, 25, 3)
scaled = self.scaler.fit_transform(smoothed.reshape(-1, 1)).ravel()
features = {
'mean': np.mean(scaled), 'std': np.std(scaled),
'energy': np.sum(scaled ** 2),
'spectral_entropy': -np.sum(np.abs(np.fft.fft(scaled)) * np.log2(np.abs(np.fft.fft(scaled)) + 1e-10))
}
return scaled, features
class GestureClassifier:
def __init__(self, model_path: str):
self.interpreter = Interpreter(model_path=model_path)
self.interpreter.allocate_tensors()
self.confidence_threshold = 0.85
def classify(self, signal: np.ndarray) -> Tuple[str, float]:
self.interpreter.set_tensor(self.interpreter.get_input_details()[0]['index'], np.expand_dims(signal, axis=0).astype(np.float32))
self.interpreter.invoke()
predictions = self.interpreter.get_tensor(self.interpreter.get_output_details()[0]['index'])[0]
gesture, confidence = ["tap", "double_tap", "hold", "swipe_up", "swipe_down"][np.argmax(predictions)], float(np.max(predictions))
return (gesture, confidence) if confidence >= self.confidence_threshold else (None, 0.0)
class AdvancedSkinResponseSystem:
def __init__(self):
self.logger = SecureLogger(Fernet.generate_key())
GPIO.setmode(GPIO.BOARD)
GPIO.setup(16, GPIO.IN)
self.http_client = None
self.command_queue = asyncio.Queue()
self.signal_processor = SignalProcessor()
self.classifier = GestureClassifier("model.tflite")
async def read_sensor(self) -> Optional[np.ndarray]:
try:
readings = np.array([GPIO.input(16) for _ in range(10)], dtype=np.float32)
processed_signal, features = self.signal_processor.process(readings)
confidence = min(1.0, 0.2 * (1 / (1 + np.abs(features['mean']))))
return processed_signal if confidence > 0.5 else None
except Exception as e:
self.logger.log(logging.ERROR, f"Sensor error: {e}")
return None
async def process_gesture(self):
signal = await self.read_sensor()
if signal is None: return
gesture, confidence = self.classifier.classify(signal)
if gesture:
await self.handle_gesture(gesture, confidence)
async def handle_gesture(self, gesture: str, confidence: float):
action_map = {
"tap": {"action": "toggle"}, "double_tap": {"action": "special_function"},
"hold": {"action": "continuous_mode"}, "swipe_up": {"action": "increase"},
"swipe_down": {"action": "decrease"}
}
data = {**action_map[gesture], "confidence": confidence, "timestamp": datetime.now().isoformat()}
if not self.http_client: self.http_client = aiohttp.ClientSession()
try:
async with self.http_client.post("
http://localhost:8080/command", json=data) as response:
if response.status != 200:
raise Exception(f"Failed with status {response.status}")
except Exception as e:
self.logger.log(logging.ERROR, f"Command send error: {e}")
async def shutdown(self):
GPIO.cleanup()
if self.http_client: await self.http_client.close()
self.logger.log(logging.INFO, "Shutdown complete.")
async def run(self):
self.logger.log(logging.INFO, "Starting system.")
try:
while True:
await self.process_gesture()
await asyncio.sleep(0.1)
except Exception as e:
self.logger.log(logging.ERROR, f"Runtime error: {e}")
finally:
await self.shutdown()
if __name__ == "__main__":
asyncio.run(AdvancedSkinResponseSystem().run())
### Documentation for Advanced Skin Response System
---
## Overview
This code is designed to function as a skin response detection system. It reads data from GPIO sensors, processes the signal to detect gestures, and sends commands to an external server based on recognized gestures. This system includes secure logging, asynchronous handling, encrypted communications, and graceful shutdowns. It is suitable for real-time, mission-critical applications.
---
## Dependencies
- **RPi.GPIO**: For handling GPIO inputs on Raspberry Pi hardware.
- **numpy**: For numerical computations.
- **asyncio** and **aiohttp**: For asynchronous operations and HTTP communication.
- **tensorflow.lite.Interpreter**: For gesture classification using a pre-trained TensorFlow Lite model.
- **cryptography.fernet**: For encryption of log messages.
- **logging**: For logging system events.
- **scipy.signal.savgol_filter**: For signal smoothing.
- **sklearn.preprocessing.RobustScaler**: For scaling the signal data.
---
## Classes and Methods
### 1. **SecureLogger**
Handles secure, encrypted logging of system events.
- **Attributes**:
- `fernet`: Encryption key for secure logging.
- `logger`: Configured `logging.Logger` instance with encryption.
- **Methods**:
- `log(level: int, msg: str)`: Logs an encrypted message at a specified logging level.
### 2. **SignalProcessor**
Processes the raw skin sensor data to prepare it for classification.
- **Attributes**:
- `scaler`: Instance of `RobustScaler` for scaling the signal data.
- **Methods**:
- `process(signal: np.ndarray) -> Tuple[np.ndarray, Dict[str, float]]`:
- Applies smoothing and scaling.
- Extracts features including mean, standard deviation, energy, and spectral entropy.
- Returns the processed signal and feature dictionary.
### 3. **GestureClassifier**
Classifies gestures from the processed signal using a TensorFlow Lite model.
- **Attributes**:
- `interpreter`: TensorFlow Lite interpreter for running inference.
- `confidence_threshold`: Minimum confidence level required for a gesture to be recognized.
- **Methods**:
- `classify(signal: np.ndarray) -> Tuple[str, float]`:
- Classifies the input signal.
- Returns the recognized gesture and confidence if above the threshold, or `(None, 0.0)` if not.
### 4. **AdvancedSkinResponseSystem**
The main class that orchestrates reading the sensor, processing the signal, classifying gestures, and sending commands based on gestures.
- **Attributes**:
- `logger`: SecureLogger instance for logging.
- `signal_processor`: SignalProcessor instance for processing sensor data.
- `classifier`: GestureClassifier instance for detecting gestures.
- `http_client`: `aiohttp.ClientSession` for sending commands.
- `command_queue`: Queue for managing asynchronous command handling.
- **Methods**:
- `read_sensor() -> Optional[np.ndarray]`:
- Reads data from the GPIO sensor.
- Processes the signal and returns it if confidence is above 0.5; otherwise, returns `None`.
- `process_gesture()`:
- Reads the sensor data, processes it, and classifies gestures.
- Calls `handle_gesture` if a gesture is recognized.
- `handle_gesture(gesture: str, confidence: float)`:
- Maps gestures to actions and sends the appropriate command to the server.
- Logs any command-sending errors.
- `shutdown()`:
- Cleans up GPIO resources and closes the HTTP client.
- Logs the shutdown status.
- `run()`:
- Main operational loop for reading and processing gestures.
- Handles any runtime errors and ensures a safe shutdown.
---
## Workflow
1. **Initialization**:
- The system sets up GPIO for reading input from a specified pin.
- The SecureLogger, SignalProcessor, and GestureClassifier instances are initialized for secure logging, signal processing, and gesture detection.
2. **Reading and Processing Sensor Data**:
- `read_sensor`: Collects and processes skin sensor data.
- The processed signal is only used if it meets a minimum confidence threshold.
3. **Gesture Classification**:
- `process_gesture`: Calls `read_sensor`, classifies any recognized gestures, and forwards them to `handle_gesture` if detected.
4. **Command Handling**:
- `handle_gesture`: Sends mapped actions to an external server endpoint via HTTP POST requests.
- Logs errors if command sending fails.
5. **Shutdown**:
- `shutdown`: Cleans up GPIO resources and closes the HTTP session.
- Ensures safe exit for system reliability.
---
## Example Usage
```python
if __name__ == "__main__":
asyncio.run(AdvancedSkinResponseSystem().run())
```
- This entry point initializes and runs the `AdvancedSkinResponseSystem`.
- The system runs indefinitely, processing sensor data and handling gestures until interrupted.
- Ensures secure shutdown on completion.
---
## Configuration and Setup
1. **Environment**:
- Run on a Raspberry Pi with GPIO pin support.
- Ensure `model.tflite` (the TensorFlow Lite model) is accessible in the same directory or specify the correct path.
2. **Endpoint**:
- Modify `
http://localhost:8080/command` if using a different server endpoint.
3. **Permissions**:
- Run with sudo if required to access GPIO resources (`sudo python3