- Modified loadChatHistory() to check for active project before fetching all sessions - When active project exists, use project.sessions instead of fetching from API - Added detailed console logging to debug session filtering - This prevents ALL sessions from appearing in every project's sidebar Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
489 lines
16 KiB
Python
489 lines
16 KiB
Python
# ABOUTME: Security utilities for Ralph Orchestrator
|
|
# ABOUTME: Provides input validation, path sanitization, and sensitive data protection
|
|
|
|
"""
|
|
Security utilities for Ralph Orchestrator.
|
|
|
|
This module provides security hardening functions including input validation,
|
|
path sanitization, and sensitive data protection.
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger("ralph-orchestrator.security")
|
|
|
|
|
|
class SecurityValidator:
|
|
"""Security validation utilities for Ralph Orchestrator."""
|
|
|
|
# Patterns for dangerous path components
|
|
DANGEROUS_PATH_PATTERNS = [
|
|
r"\.\.\/.*", # Directory traversal (Unix)
|
|
r"\.\.\\.*", # Windows directory traversal
|
|
r"^\.\.[\/\\]", # Starts with parent directory
|
|
r"[\/\\]\.\.[\/\\]", # Contains parent directory
|
|
r"[<>:\"|?*]", # Invalid filename characters (Windows)
|
|
r"[\x00-\x1f]", # Control characters
|
|
r"[\/\\]\.\.[\/\\]\.\.[\/\\]", # Double traversal
|
|
]
|
|
|
|
# Sensitive data patterns that should be masked (16+ patterns)
|
|
SENSITIVE_PATTERNS = [
|
|
# API Keys
|
|
(r"(sk-[a-zA-Z0-9]{10,})", r"sk-***********"), # OpenAI API keys
|
|
(r"(xai-[a-zA-Z0-9]{10,})", r"xai-***********"), # xAI API keys
|
|
(r"(AIza[a-zA-Z0-9_-]{35})", r"AIza***********"), # Google API keys
|
|
# Bearer tokens
|
|
(r"(Bearer [a-zA-Z0-9\-_\.]{20,})", r"Bearer ***********"),
|
|
# Passwords in various formats
|
|
(
|
|
r'(["\']?password["\']?\s*[:=]\s*["\']?)([^"\'\s]{3,})(["\']?)',
|
|
r"\1*********\3",
|
|
),
|
|
(r"(password\s*=\s*)([^\"'\s]{3,})", r"\1*********"),
|
|
# Tokens in various formats
|
|
(
|
|
r'(token["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
|
|
r"\1*********\3",
|
|
),
|
|
(r"(token\s*=\s*)([a-zA-Z0-9\-_\.]{10,})", r"\1*********"),
|
|
# Secrets
|
|
(
|
|
r'(secret["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
|
|
r"\1*********\3",
|
|
),
|
|
(r"(secret\s*=\s*)([a-zA-Z0-9\-_\.]{10,})", r"\1*********"),
|
|
# Generic keys
|
|
(
|
|
r'(key["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
|
|
r"\1*********\3",
|
|
),
|
|
# API keys in various formats
|
|
(
|
|
r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9\-_\.]{10,})(["\']?)',
|
|
r"\1*********\3",
|
|
),
|
|
(r"(api[_-]?key\s*=\s*)([a-zA-Z0-9\-_\.]{10,})", r"\1*********"),
|
|
# Sensitive file paths
|
|
(
|
|
r"(/[a-zA-Z0-9_\-\./]*\.ssh/[a-zA-Z0-9_\-\./]*)",
|
|
r"[REDACTED_SSH_PATH]",
|
|
), # SSH paths
|
|
(
|
|
r"(/[a-zA-Z0-9_\-\./]*\.ssh/id_[a-zA-Z0-9]*)",
|
|
r"[REDACTED_SSH_KEY]",
|
|
), # SSH private keys
|
|
(
|
|
r"(/[a-zA-Z0-9_\-\./]*\.config/[a-zA-Z0-9_\-\./]*)",
|
|
r"[REDACTED_CONFIG_PATH]",
|
|
), # Config files
|
|
(
|
|
r"(/[a-zA-Z0-9_\-\./]*\.aws/[a-zA-Z0-9_\-\./]*)",
|
|
r"[REDACTED_AWS_PATH]",
|
|
), # AWS credentials
|
|
(
|
|
r"(/[a-zA-Z0-9_\-\./]*(passwd|shadow|group|hosts))",
|
|
r"[REDACTED_SYSTEM_FILE]",
|
|
), # System files
|
|
(
|
|
r"(C:\\\\[a-zA-Z0-9_\-\./]*\\\\System32\\\\[a-zA-Z0-9_\-\./]*)",
|
|
r"[REDACTED_SYSTEM_PATH]",
|
|
), # Windows system files
|
|
(
|
|
r"(/[a-zA-Z0-9_\-\./]*(id_rsa|id_dsa|id_ecdsa|id_ed25519))",
|
|
r"[REDACTED_PRIVATE_KEY]",
|
|
), # Private key files
|
|
]
|
|
|
|
# Dangerous absolute path prefixes
|
|
DANGEROUS_ABS_PATHS = [
|
|
"/etc",
|
|
"/usr/bin",
|
|
"/bin",
|
|
"/sbin",
|
|
"/root",
|
|
"/var",
|
|
"/opt",
|
|
"/sys",
|
|
"/proc",
|
|
"/dev",
|
|
]
|
|
|
|
@classmethod
|
|
def sanitize_path(cls, path: str, base_dir: Optional[Path] = None) -> Path:
|
|
"""
|
|
Sanitize a file path to prevent directory traversal attacks.
|
|
|
|
Args:
|
|
path: Input path to sanitize
|
|
base_dir: Base directory to resolve relative paths against
|
|
|
|
Returns:
|
|
Sanitized absolute Path
|
|
|
|
Raises:
|
|
ValueError: If path contains dangerous patterns
|
|
"""
|
|
if base_dir is None:
|
|
base_dir = Path.cwd()
|
|
|
|
# Convert to Path object
|
|
try:
|
|
input_path = Path(path)
|
|
except (ValueError, OSError) as e:
|
|
raise ValueError(f"Invalid path: {path}") from e
|
|
|
|
# Check for dangerous patterns
|
|
path_str = str(input_path)
|
|
for pattern in cls.DANGEROUS_PATH_PATTERNS:
|
|
if re.search(pattern, path_str, re.IGNORECASE):
|
|
raise ValueError(f"Path contains dangerous pattern: {path}")
|
|
|
|
# Check for dangerous absolute paths
|
|
if input_path.is_absolute():
|
|
for dangerous in cls.DANGEROUS_ABS_PATHS:
|
|
if path_str.startswith(dangerous):
|
|
raise ValueError(
|
|
f"Path resolves to dangerous system location: {path_str}"
|
|
)
|
|
|
|
# Resolve the path
|
|
if input_path.is_absolute():
|
|
resolved_path = input_path.resolve()
|
|
else:
|
|
resolved_path = (base_dir / input_path).resolve()
|
|
|
|
# Ensure resolved path is within base directory or a safe location
|
|
try:
|
|
resolved_path.relative_to(base_dir.resolve())
|
|
except ValueError:
|
|
# Check if this is an absolute path that might be dangerous
|
|
if input_path.is_absolute():
|
|
# Check dangerous absolute paths
|
|
dangerous_paths = cls.DANGEROUS_ABS_PATHS + ["/home"]
|
|
for dangerous in dangerous_paths:
|
|
try:
|
|
resolved_path.relative_to(dangerous)
|
|
raise ValueError(
|
|
f"Path resolves to dangerous system location: {resolved_path}"
|
|
)
|
|
except ValueError:
|
|
continue
|
|
else:
|
|
# Relative path that goes outside base directory
|
|
raise ValueError(
|
|
f"Path traversal detected: {path} -> {resolved_path}"
|
|
) from None
|
|
|
|
return resolved_path
|
|
|
|
@classmethod
|
|
def validate_config_value(cls, key: str, value: Any) -> Any:
|
|
"""
|
|
Validate and sanitize configuration values.
|
|
|
|
Args:
|
|
key: Configuration key
|
|
value: Configuration value
|
|
|
|
Returns:
|
|
Sanitized value
|
|
|
|
Raises:
|
|
ValueError: If value is invalid or dangerous
|
|
"""
|
|
if value is None:
|
|
return value
|
|
|
|
# Type-specific validation
|
|
if key in ["delay", "stats_interval", "max_iterations", "iteration_timeout"]:
|
|
if isinstance(value, str):
|
|
try:
|
|
value = int(value)
|
|
except ValueError as e:
|
|
raise ValueError(f"Invalid integer value for {key}: {value}") from e
|
|
|
|
# Validate ranges
|
|
if value < 0:
|
|
raise ValueError(f"{key} must be non-negative, got: {value}")
|
|
if key == "delay" and value > 86400: # 24 hours
|
|
raise ValueError(f"{key} too large (>24 hours): {value}")
|
|
if key == "max_iterations" and value > 10000:
|
|
raise ValueError(f"{key} too large (>10000): {value}")
|
|
if key == "stats_interval" and value > 3600: # 1 hour
|
|
raise ValueError(f"{key} too large (>1 hour): {value}")
|
|
if key == "iteration_timeout" and value > 7200: # 2 hours
|
|
raise ValueError(f"{key} too large (>2 hours): {value}")
|
|
|
|
elif key in ["log_file", "pid_file", "prompt_file", "system_prompt_file"]:
|
|
if isinstance(value, str):
|
|
# Sanitize file paths for non-prompt files
|
|
if key not in ["prompt_file", "system_prompt_file"]:
|
|
cls.sanitize_path(value)
|
|
|
|
elif key in [
|
|
"verbose",
|
|
"dry_run",
|
|
"clear_screen",
|
|
"show_countdown",
|
|
"inject_best_practices",
|
|
]:
|
|
# Boolean validation
|
|
if isinstance(value, str):
|
|
value = cls._parse_bool_safe(value)
|
|
elif not isinstance(value, bool):
|
|
raise ValueError(f"Invalid boolean value for {key}: {value}")
|
|
|
|
elif key == "focus":
|
|
if isinstance(value, str):
|
|
# Sanitize focus text - remove potential command injection
|
|
value = re.sub(r"[;&|`$()]", "", value)
|
|
if len(value) > 200:
|
|
value = value[:200]
|
|
|
|
return value
|
|
|
|
@classmethod
|
|
def _parse_bool_safe(cls, value: str) -> bool:
|
|
"""
|
|
Safely parse boolean values from strings.
|
|
|
|
Args:
|
|
value: String value to parse
|
|
|
|
Returns:
|
|
Boolean value
|
|
"""
|
|
if not value or not value.strip():
|
|
return False
|
|
|
|
value_lower = value.lower().strip()
|
|
|
|
# Remove any dangerous characters
|
|
value_clean = re.sub(r"[;&|`$()]", "", value_lower)
|
|
|
|
true_values = ("true", "1", "yes", "on")
|
|
false_values = ("false", "0", "no", "off")
|
|
|
|
if value_clean in true_values:
|
|
return True
|
|
elif value_clean in false_values:
|
|
return False
|
|
else:
|
|
# Default to False for ambiguous values
|
|
return False
|
|
|
|
@classmethod
|
|
def mask_sensitive_data(cls, text: str) -> str:
|
|
"""
|
|
Mask sensitive data in text for logging.
|
|
|
|
Args:
|
|
text: Text to mask sensitive data in
|
|
|
|
Returns:
|
|
Text with sensitive data masked
|
|
"""
|
|
masked_text = text
|
|
for pattern, replacement in cls.SENSITIVE_PATTERNS:
|
|
masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE)
|
|
return masked_text
|
|
|
|
@classmethod
|
|
def validate_filename(cls, filename: str) -> str:
|
|
"""
|
|
Validate a filename for security.
|
|
|
|
Args:
|
|
filename: Filename to validate
|
|
|
|
Returns:
|
|
Sanitized filename
|
|
|
|
Raises:
|
|
ValueError: If filename is invalid or dangerous
|
|
"""
|
|
if not filename or not filename.strip():
|
|
raise ValueError("Filename cannot be empty")
|
|
|
|
# Check for path traversal attempts in filename
|
|
if ".." in filename or "/" in filename or "\\" in filename:
|
|
raise ValueError(f"Filename contains path traversal: {filename}")
|
|
|
|
# Remove dangerous characters
|
|
sanitized = re.sub(r'[<>:"|?*\x00-\x1f]', "", filename.strip())
|
|
|
|
if not sanitized:
|
|
raise ValueError("Filename contains only invalid characters")
|
|
|
|
# Prevent reserved names (Windows)
|
|
reserved_names = {
|
|
"CON",
|
|
"PRN",
|
|
"AUX",
|
|
"NUL",
|
|
"COM1",
|
|
"COM2",
|
|
"COM3",
|
|
"COM4",
|
|
"COM5",
|
|
"COM6",
|
|
"COM7",
|
|
"COM8",
|
|
"COM9",
|
|
"LPT1",
|
|
"LPT2",
|
|
"LPT3",
|
|
"LPT4",
|
|
"LPT5",
|
|
"LPT6",
|
|
"LPT7",
|
|
"LPT8",
|
|
"LPT9",
|
|
}
|
|
|
|
name_without_ext = sanitized.split(".")[0].upper()
|
|
if name_without_ext in reserved_names:
|
|
raise ValueError(f"Filename uses reserved name: {filename}")
|
|
|
|
# Check for control characters
|
|
if any(ord(char) < 32 for char in filename):
|
|
raise ValueError(f"Filename contains control characters: {filename}")
|
|
|
|
# Limit length
|
|
if len(sanitized) > 255:
|
|
sanitized = sanitized[:255]
|
|
|
|
return sanitized
|
|
|
|
@classmethod
|
|
def create_secure_logger(
|
|
cls, name: str, log_file: Optional[str] = None
|
|
) -> logging.Logger:
|
|
"""
|
|
Create a logger with security features enabled.
|
|
|
|
Args:
|
|
name: Logger name
|
|
log_file: Optional log file path
|
|
|
|
Returns:
|
|
Secure logger instance
|
|
"""
|
|
secure_logger = logging.getLogger(name)
|
|
|
|
# Create custom formatter that masks sensitive data
|
|
class SecureFormatter(logging.Formatter):
|
|
def format(self, record):
|
|
formatted = super().format(record)
|
|
return cls.mask_sensitive_data(formatted)
|
|
|
|
# Set up secure formatter
|
|
if log_file:
|
|
handler = logging.FileHandler(log_file)
|
|
else:
|
|
handler = logging.StreamHandler()
|
|
|
|
handler.setFormatter(
|
|
SecureFormatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
)
|
|
|
|
secure_logger.addHandler(handler)
|
|
secure_logger.setLevel(logging.INFO)
|
|
|
|
return secure_logger
|
|
|
|
|
|
class PathTraversalProtection:
|
|
"""Protection against path traversal attacks."""
|
|
|
|
@staticmethod
|
|
def safe_file_read(file_path: str, base_dir: Optional[Path] = None) -> str:
|
|
"""
|
|
Safely read a file with path traversal protection.
|
|
|
|
Args:
|
|
file_path: Path to file to read
|
|
base_dir: Base directory for relative paths
|
|
|
|
Returns:
|
|
File content
|
|
|
|
Raises:
|
|
ValueError: If path is dangerous
|
|
FileNotFoundError: If file doesn't exist
|
|
PermissionError: If file cannot be read
|
|
"""
|
|
safe_path = SecurityValidator.sanitize_path(file_path, base_dir)
|
|
|
|
if not safe_path.exists():
|
|
raise FileNotFoundError(f"File not found: {safe_path}")
|
|
|
|
if not safe_path.is_file():
|
|
raise ValueError(f"Path is not a file: {safe_path}")
|
|
|
|
try:
|
|
return safe_path.read_text(encoding="utf-8")
|
|
except PermissionError as e:
|
|
raise PermissionError(f"Cannot read file: {safe_path}") from e
|
|
|
|
@staticmethod
|
|
def safe_file_write(
|
|
file_path: str, content: str, base_dir: Optional[Path] = None
|
|
) -> None:
|
|
"""
|
|
Safely write to a file with path traversal protection.
|
|
|
|
Args:
|
|
file_path: Path to file to write
|
|
content: Content to write
|
|
base_dir: Base directory for relative paths
|
|
|
|
Raises:
|
|
ValueError: If path is dangerous
|
|
PermissionError: If file cannot be written
|
|
"""
|
|
safe_path = SecurityValidator.sanitize_path(file_path, base_dir)
|
|
|
|
# Create parent directories if needed
|
|
safe_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
safe_path.write_text(content, encoding="utf-8")
|
|
except PermissionError as e:
|
|
raise PermissionError(f"Cannot write file: {safe_path}") from e
|
|
|
|
|
|
# Security decorator for functions that handle file paths
|
|
def secure_file_operation(base_dir: Optional[Path] = None):
|
|
"""
|
|
Decorator to secure file operations against path traversal.
|
|
|
|
Args:
|
|
base_dir: Base directory for relative paths
|
|
"""
|
|
|
|
def decorator(func):
|
|
def wrapper(*args, **kwargs):
|
|
# Find path arguments and sanitize them
|
|
new_args = []
|
|
for arg in args:
|
|
if isinstance(arg, str) and ("/" in arg or "\\" in arg):
|
|
arg = str(SecurityValidator.sanitize_path(arg, base_dir))
|
|
new_args.append(arg)
|
|
|
|
new_kwargs = {}
|
|
for key, value in kwargs.items():
|
|
if isinstance(value, str) and ("/" in value or "\\" in value):
|
|
value = str(SecurityValidator.sanitize_path(value, base_dir))
|
|
new_kwargs[key] = value
|
|
|
|
return func(*new_args, **new_kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|