Files
SuperCharged-Claude-Code-Up…/.venv/lib/python3.11/site-packages/ralph_orchestrator/security.py
uroma 55aafbae9a Fix project isolation: Make loadChatHistory respect active project sessions
- Modified loadChatHistory() to check for active project before fetching all sessions
- When active project exists, use project.sessions instead of fetching from API
- Added detailed console logging to debug session filtering
- This prevents ALL sessions from appearing in every project's sidebar

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 14:43:05 +00:00

489 lines
16 KiB
Python

# ABOUTME: Security utilities for Ralph Orchestrator
# ABOUTME: Provides input validation, path sanitization, and sensitive data protection
"""
Security utilities for Ralph Orchestrator.
This module provides security hardening functions including input validation,
path sanitization, and sensitive data protection.
"""
import re
import logging
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger("ralph-orchestrator.security")
class SecurityValidator:
"""Security validation utilities for Ralph Orchestrator."""
# Patterns for dangerous path components
DANGEROUS_PATH_PATTERNS = [
r"\.\.\/.*", # Directory traversal (Unix)
r"\.\.\\.*", # Windows directory traversal
r"^\.\.[\/\\]", # Starts with parent directory
r"[\/\\]\.\.[\/\\]", # Contains parent directory
r"[<>:\"|?*]", # Invalid filename characters (Windows)
r"[\x00-\x1f]", # Control characters
r"[\/\\]\.\.[\/\\]\.\.[\/\\]", # Double traversal
]
# Sensitive data patterns that should be masked (16+ patterns)
SENSITIVE_PATTERNS = [
# API Keys
(r"(sk-[a-zA-Z0-9]{10,})", r"sk-***********"), # OpenAI API keys
(r"(xai-[a-zA-Z0-9]{10,})", r"xai-***********"), # xAI API keys
(r"(AIza[a-zA-Z0-9_-]{35})", r"AIza***********"), # Google API keys
# Bearer tokens
(r"(Bearer [a-zA-Z0-9\-_\.]{20,})", r"Bearer ***********"),
# Passwords in various formats
(
r'(["\']?password["\']?\s*[:=]\s*["\']?)([^"\'\s]{3,})(["\']?)',
r"\1*********\3",
),
(r"(password\s*=\s*)([^\"'\s]{3,})", r"\1*********"),
# Tokens in various formats
(
r'(token["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
r"\1*********\3",
),
(r"(token\s*=\s*)([a-zA-Z0-9\-_\.]{10,})", r"\1*********"),
# Secrets
(
r'(secret["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
r"\1*********\3",
),
(r"(secret\s*=\s*)([a-zA-Z0-9\-_\.]{10,})", r"\1*********"),
# Generic keys
(
r'(key["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
r"\1*********\3",
),
# API keys in various formats
(
r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
r"\1*********\3",
),
(r"(api[_-]?key\s*=\s*)([a-zA-Z0-9\-_\.]{10,})", r"\1*********"),
# Sensitive file paths
(
r"(/[a-zA-Z0-9_\-\./]*\.ssh/[a-zA-Z0-9_\-\./]*)",
r"[REDACTED_SSH_PATH]",
), # SSH paths
(
r"(/[a-zA-Z0-9_\-\./]*\.ssh/id_[a-zA-Z0-9]*)",
r"[REDACTED_SSH_KEY]",
), # SSH private keys
(
r"(/[a-zA-Z0-9_\-\./]*\.config/[a-zA-Z0-9_\-\./]*)",
r"[REDACTED_CONFIG_PATH]",
), # Config files
(
r"(/[a-zA-Z0-9_\-\./]*\.aws/[a-zA-Z0-9_\-\./]*)",
r"[REDACTED_AWS_PATH]",
), # AWS credentials
(
r"(/[a-zA-Z0-9_\-\./]*(passwd|shadow|group|hosts))",
r"[REDACTED_SYSTEM_FILE]",
), # System files
(
r"(C:\\\\[a-zA-Z0-9_\-\./]*\\\\System32\\\\[a-zA-Z0-9_\-\./]*)",
r"[REDACTED_SYSTEM_PATH]",
), # Windows system files
(
r"(/[a-zA-Z0-9_\-\./]*(id_rsa|id_dsa|id_ecdsa|id_ed25519))",
r"[REDACTED_PRIVATE_KEY]",
), # Private key files
]
# Dangerous absolute path prefixes
DANGEROUS_ABS_PATHS = [
"/etc",
"/usr/bin",
"/bin",
"/sbin",
"/root",
"/var",
"/opt",
"/sys",
"/proc",
"/dev",
]
@classmethod
def sanitize_path(cls, path: str, base_dir: Optional[Path] = None) -> Path:
"""
Sanitize a file path to prevent directory traversal attacks.
Args:
path: Input path to sanitize
base_dir: Base directory to resolve relative paths against
Returns:
Sanitized absolute Path
Raises:
ValueError: If path contains dangerous patterns
"""
if base_dir is None:
base_dir = Path.cwd()
# Convert to Path object
try:
input_path = Path(path)
except (ValueError, OSError) as e:
raise ValueError(f"Invalid path: {path}") from e
# Check for dangerous patterns
path_str = str(input_path)
for pattern in cls.DANGEROUS_PATH_PATTERNS:
if re.search(pattern, path_str, re.IGNORECASE):
raise ValueError(f"Path contains dangerous pattern: {path}")
# Check for dangerous absolute paths
if input_path.is_absolute():
for dangerous in cls.DANGEROUS_ABS_PATHS:
if path_str.startswith(dangerous):
raise ValueError(
f"Path resolves to dangerous system location: {path_str}"
)
# Resolve the path
if input_path.is_absolute():
resolved_path = input_path.resolve()
else:
resolved_path = (base_dir / input_path).resolve()
# Ensure resolved path is within base directory or a safe location
try:
resolved_path.relative_to(base_dir.resolve())
except ValueError:
# Check if this is an absolute path that might be dangerous
if input_path.is_absolute():
# Check dangerous absolute paths
dangerous_paths = cls.DANGEROUS_ABS_PATHS + ["/home"]
for dangerous in dangerous_paths:
try:
resolved_path.relative_to(dangerous)
raise ValueError(
f"Path resolves to dangerous system location: {resolved_path}"
)
except ValueError:
continue
else:
# Relative path that goes outside base directory
raise ValueError(
f"Path traversal detected: {path} -> {resolved_path}"
) from None
return resolved_path
@classmethod
def validate_config_value(cls, key: str, value: Any) -> Any:
"""
Validate and sanitize configuration values.
Args:
key: Configuration key
value: Configuration value
Returns:
Sanitized value
Raises:
ValueError: If value is invalid or dangerous
"""
if value is None:
return value
# Type-specific validation
if key in ["delay", "stats_interval", "max_iterations", "iteration_timeout"]:
if isinstance(value, str):
try:
value = int(value)
except ValueError as e:
raise ValueError(f"Invalid integer value for {key}: {value}") from e
# Validate ranges
if value < 0:
raise ValueError(f"{key} must be non-negative, got: {value}")
if key == "delay" and value > 86400: # 24 hours
raise ValueError(f"{key} too large (>24 hours): {value}")
if key == "max_iterations" and value > 10000:
raise ValueError(f"{key} too large (>10000): {value}")
if key == "stats_interval" and value > 3600: # 1 hour
raise ValueError(f"{key} too large (>1 hour): {value}")
if key == "iteration_timeout" and value > 7200: # 2 hours
raise ValueError(f"{key} too large (>2 hours): {value}")
elif key in ["log_file", "pid_file", "prompt_file", "system_prompt_file"]:
if isinstance(value, str):
# Sanitize file paths for non-prompt files
if key not in ["prompt_file", "system_prompt_file"]:
cls.sanitize_path(value)
elif key in [
"verbose",
"dry_run",
"clear_screen",
"show_countdown",
"inject_best_practices",
]:
# Boolean validation
if isinstance(value, str):
value = cls._parse_bool_safe(value)
elif not isinstance(value, bool):
raise ValueError(f"Invalid boolean value for {key}: {value}")
elif key == "focus":
if isinstance(value, str):
# Sanitize focus text - remove potential command injection
value = re.sub(r"[;&|`$()]", "", value)
if len(value) > 200:
value = value[:200]
return value
@classmethod
def _parse_bool_safe(cls, value: str) -> bool:
"""
Safely parse boolean values from strings.
Args:
value: String value to parse
Returns:
Boolean value
"""
if not value or not value.strip():
return False
value_lower = value.lower().strip()
# Remove any dangerous characters
value_clean = re.sub(r"[;&|`$()]", "", value_lower)
true_values = ("true", "1", "yes", "on")
false_values = ("false", "0", "no", "off")
if value_clean in true_values:
return True
elif value_clean in false_values:
return False
else:
# Default to False for ambiguous values
return False
@classmethod
def mask_sensitive_data(cls, text: str) -> str:
"""
Mask sensitive data in text for logging.
Args:
text: Text to mask sensitive data in
Returns:
Text with sensitive data masked
"""
masked_text = text
for pattern, replacement in cls.SENSITIVE_PATTERNS:
masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE)
return masked_text
@classmethod
def validate_filename(cls, filename: str) -> str:
"""
Validate a filename for security.
Args:
filename: Filename to validate
Returns:
Sanitized filename
Raises:
ValueError: If filename is invalid or dangerous
"""
if not filename or not filename.strip():
raise ValueError("Filename cannot be empty")
# Check for path traversal attempts in filename
if ".." in filename or "/" in filename or "\\" in filename:
raise ValueError(f"Filename contains path traversal: {filename}")
# Remove dangerous characters
sanitized = re.sub(r'[<>:"|?*\x00-\x1f]', "", filename.strip())
if not sanitized:
raise ValueError("Filename contains only invalid characters")
# Prevent reserved names (Windows)
reserved_names = {
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
}
name_without_ext = sanitized.split(".")[0].upper()
if name_without_ext in reserved_names:
raise ValueError(f"Filename uses reserved name: {filename}")
# Check for control characters
if any(ord(char) < 32 for char in filename):
raise ValueError(f"Filename contains control characters: {filename}")
# Limit length
if len(sanitized) > 255:
sanitized = sanitized[:255]
return sanitized
@classmethod
def create_secure_logger(
cls, name: str, log_file: Optional[str] = None
) -> logging.Logger:
"""
Create a logger with security features enabled.
Args:
name: Logger name
log_file: Optional log file path
Returns:
Secure logger instance
"""
secure_logger = logging.getLogger(name)
# Create custom formatter that masks sensitive data
class SecureFormatter(logging.Formatter):
def format(self, record):
formatted = super().format(record)
return cls.mask_sensitive_data(formatted)
# Set up secure formatter
if log_file:
handler = logging.FileHandler(log_file)
else:
handler = logging.StreamHandler()
handler.setFormatter(
SecureFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
secure_logger.addHandler(handler)
secure_logger.setLevel(logging.INFO)
return secure_logger
class PathTraversalProtection:
"""Protection against path traversal attacks."""
@staticmethod
def safe_file_read(file_path: str, base_dir: Optional[Path] = None) -> str:
"""
Safely read a file with path traversal protection.
Args:
file_path: Path to file to read
base_dir: Base directory for relative paths
Returns:
File content
Raises:
ValueError: If path is dangerous
FileNotFoundError: If file doesn't exist
PermissionError: If file cannot be read
"""
safe_path = SecurityValidator.sanitize_path(file_path, base_dir)
if not safe_path.exists():
raise FileNotFoundError(f"File not found: {safe_path}")
if not safe_path.is_file():
raise ValueError(f"Path is not a file: {safe_path}")
try:
return safe_path.read_text(encoding="utf-8")
except PermissionError as e:
raise PermissionError(f"Cannot read file: {safe_path}") from e
@staticmethod
def safe_file_write(
file_path: str, content: str, base_dir: Optional[Path] = None
) -> None:
"""
Safely write to a file with path traversal protection.
Args:
file_path: Path to file to write
content: Content to write
base_dir: Base directory for relative paths
Raises:
ValueError: If path is dangerous
PermissionError: If file cannot be written
"""
safe_path = SecurityValidator.sanitize_path(file_path, base_dir)
# Create parent directories if needed
safe_path.parent.mkdir(parents=True, exist_ok=True)
try:
safe_path.write_text(content, encoding="utf-8")
except PermissionError as e:
raise PermissionError(f"Cannot write file: {safe_path}") from e
# Security decorator for functions that handle file paths
def secure_file_operation(base_dir: Optional[Path] = None):
"""
Decorator to secure file operations against path traversal.
Args:
base_dir: Base directory for relative paths
"""
def decorator(func):
def wrapper(*args, **kwargs):
# Find path arguments and sanitize them
new_args = []
for arg in args:
if isinstance(arg, str) and ("/" in arg or "\\" in arg):
arg = str(SecurityValidator.sanitize_path(arg, base_dir))
new_args.append(arg)
new_kwargs = {}
for key, value in kwargs.items():
if isinstance(value, str) and ("/" in value or "\\" in value):
value = str(SecurityValidator.sanitize_path(value, base_dir))
new_kwargs[key] = value
return func(*new_args, **new_kwargs)
return wrapper
return decorator