# Security Architecture Expert Skill ## Activation Criteria Activate this skill when the user: - Designs secure system architectures - Implements zero trust networks - Configures cloud security (AWS/Azure/GCP) - Designs application security programs - Creates IAM/authorization systems - Implements security monitoring (SIEM, SOAR) - Develops incident response procedures - Needs compliance frameworks (PCI-DSS, HIPAA, GDPR, NIST, SOC 2) - Performs threat modeling - Designs supply chain security - Implements DevSecOps practices - Creates security policies and standards - Designs secure APIs and microservices - Implements data protection and encryption strategies ## Core Methodology ### 1. Zero Trust Architecture #### Zero Trust Principles ```yaml # Zero Trust Architecture Framework zero_trust_principles: verification_required: "Never trust, always verify" least_privilege: "Minimum required access only" assume_breach: "Design assuming compromise" microsegmentation: "Segment everything" continuous_monitoring: "Monitor and validate continuously" # Zero Trust Implementation Pillars identity: - Multi-factor authentication (MFA) - Risk-based authentication - Single sign-on (SSO) consolidation - Privileged access management (PAM) - Just-in-time access provisioning devices: - Device health attestation - OS and application patching - Endpoint detection and response (EDR) - Mobile device management (MDM) - Device inventory and classification network: - Microsegmentation - Software-defined perimeter (SDP) - Next-generation firewalls (NGFW) - Zero trust network access (ZTNA) - East-west traffic inspection applications: - Application security testing - API security and gateways - Container and Kubernetes security - Serverless function security - Runtime application self-protection (RASP) data: - Data classification and labeling - Encryption at rest and in transit - Data loss prevention (DLP) - Rights management (IRM/DRM) - Database activity monitoring ``` #### Zero Trust Network Implementation ```python # Zero Trust Network Access (ZTNA) Implementation # Policy Engine for Access Control from dataclasses import dataclass from enum import Enum from typing import List, Dict, Optional, Set from datetime import datetime, timedelta import hashlib import jwt import ipaddress class TrustLevel(Enum): """Trust levels for zero trust decisions""" NONE = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 class ResourceType(Enum): """Types of protected resources""" API = "api" DATABASE = "database" STORAGE = "storage" SERVICE = "service" FILE = "file" @dataclass class Subject: """Requesting entity (user, service, device)""" id: str type: str # 'user', 'service', 'device' attributes: Dict trust_level: TrustLevel = TrustLevel.NONE @dataclass class Resource: """Protected resource""" id: str type: ResourceType sensitivity: str # 'public', 'internal', 'confidential', 'restricted' owner: str tags: Set[str] @dataclass class Context: """Request context for decision making""" source_ip: str destination_ip: str timestamp: datetime user_agent: Optional[str] = None device_health_score: Optional[float] = None geo_location: Optional[str] = None network_trust: Optional[float] = None class PolicyEngine: """Zero Trust Policy Engine""" def __init__(self): self.policies: List[Policy] = [] self.trust_scores: Dict[str, float] = {} def evaluate_access(self, subject: Subject, resource: Resource, action: str, context: Context) -> AccessDecision: """ Evaluate access request using zero trust principles """ # Step 1: Verify identity if not self._verify_identity(subject): return AccessDecision( allowed=False, reason="Identity verification failed" ) # Step 2: Check device health if not self._check_device_health(context): return AccessDecision( allowed=False, reason="Device does not meet health requirements" ) # Step 3: Evaluate policies for policy in self.policies: if policy.matches(subject, resource, action, context): decision = policy.evaluate(subject, resource, action, context) if decision is not None: return decision # Default deny return AccessDecision( allowed=False, reason="No matching policy - default deny" ) def _verify_identity(self, subject: Subject) -> bool: """Verify subject identity with MFA""" # Check MFA status if not subject.attributes.get('mfa_verified', False): return False # Check if identity is not expired if 'expiry' in subject.attributes: expiry = subject.attributes['expiry'] if datetime.now() > expiry: return False return True def _check_device_health(self, context: Context) -> bool: """Check device health score""" if context.device_health_score is None: return False return context.device_health_score >= 0.7 class Policy: """Access control policy""" def __init__(self, name: str, conditions: Dict, effect: str): self.name = name self.conditions = conditions self.effect = effect # 'allow' or 'deny' def matches(self, subject: Subject, resource: Resource, action: str, context: Context) -> bool: """Check if policy matches the request""" # Check subject conditions if 'subject' in self.conditions: for key, value in self.conditions['subject'].items(): if subject.attributes.get(key) != value: return False # Check resource conditions if 'resource' in self.conditions: if resource.type.value != self.conditions['resource'].get('type'): return False if resource.sensitivity != self.conditions['resource'].get('sensitivity'): return False # Check action conditions if 'action' in self.conditions: if action not in self.conditions['action']: return False # Check context conditions if 'context' in self.conditions: ctx_conditions = self.conditions['context'] # Check IP range if 'allowed_ips' in ctx_conditions: ip = ipaddress.ip_address(context.source_ip) allowed = False for allowed_range in ctx_conditions['allowed_ips']: network = ipaddress.ip_network(allowed_range) if ip in network: allowed = True break if not allowed: return False # Check time restrictions if 'time_restrictions' in ctx_conditions: time_restriction = ctx_conditions['time_restrictions'] current_hour = context.timestamp.hour if 'working_hours' in time_restriction: start, end = time_restriction['working_hours'] if not (start <= current_hour < end): return False return True def evaluate(self, subject: Subject, resource: Resource, action: str, context: Context) -> Optional[AccessDecision]: """Evaluate the policy""" if self.effect == 'allow': return AccessDecision( allowed=True, reason=f"Allowed by policy: {self.name}" ) else: return AccessDecision( allowed=False, reason=f"Denied by policy: {self.name}" ) @dataclass class AccessDecision: """Access decision with audit trail""" allowed: bool reason: str expires_at: Optional[datetime] = None additional_conditions: Optional[List[str]] = None class ZeroTrustGateway: """Zero Trust Network Access Gateway""" def __init__(self, policy_engine: PolicyEngine): self.policy_engine = policy_engine self.session_manager = SessionManager() def handle_request(self, subject: Subject, resource: Resource, action: str, context: Context) -> GatewayResponse: """ Handle zero trust access request """ # Evaluate access decision = self.policy_engine.evaluate_access( subject, resource, action, context ) if decision.allowed: # Create session session = self.session_manager.create_session( subject, resource, context ) return GatewayResponse( success=True, session_token=session.token, expires_at=session.expires_at, decision=decision ) else: return GatewayResponse( success=False, decision=decision ) class SessionManager: """Manage zero trust sessions""" def __init__(self): self.sessions: Dict[str, Session] = {} def create_session(self, subject: Subject, resource: Resource, context: Context) -> 'Session': """Create a new session""" token = self._generate_token(subject, resource, context) expires_at = datetime.now() + timedelta(hours=1) session = Session( token=token, subject_id=subject.id, resource_id=resource.id, created_at=datetime.now(), expires_at=expires_at, context=context ) self.sessions[token] = session return session def validate_session(self, token: str) -> bool: """Validate session token""" session = self.sessions.get(token) if not session: return False if datetime.now() > session.expires_at: del self.sessions[token] return False return True def _generate_token(self, subject: Subject, resource: Resource, context: Context) -> str: """Generate session token""" payload = { 'subject_id': subject.id, 'resource_id': resource.id, 'timestamp': datetime.now().isoformat(), 'ip': context.source_ip } token = jwt.encode(payload, 'secret', algorithm='HS256') return token @dataclass class Session: """Zero trust session""" token: str subject_id: str resource_id: str created_at: datetime expires_at: datetime context: Context @dataclass class GatewayResponse: """Gateway response""" success: bool session_token: Optional[str] = None expires_at: Optional[datetime] = None decision: Optional[AccessDecision] = None ``` ### 2. Cloud Security Architecture #### AWS Security Implementation ```python # AWS Security Infrastructure as Code # Using Terraform-style Python for AWS security setup import json from typing import List, Dict, Any class AWSSecurityArchitecture: """AWS security infrastructure configuration""" def __init__(self, config: Dict[str, Any]): self.config = config self.security_groups = [] self.iam_policies = [] self.kms_keys = [] self.s3_buckets = [] self.cloudtrail_config = None self.guardduty_config = None def design_vpc_security(self) -> Dict[str, Any]: """ Design secure VPC architecture """ vpc_config = { 'vpc': { 'cidr_block': '10.0.0.0/16', 'enable_dns_support': True, 'enable_dns_hostnames': True, 'tags': { 'Name': 'production-vpc', 'Environment': 'production', 'Compliance': 'PCI-DSS' } }, 'subnets': { 'public': [ { 'cidr': '10.0.1.0/24', 'availability_zone': 'us-east-1a', 'map_public_ip_on_launch': True }, { 'cidr': '10.0.2.0/24', 'availability_zone': 'us-east-1b', 'map_public_ip_on_launch': True } ], 'private': [ { 'cidr': '10.0.10.0/24', 'availability_zone': 'us-east-1a' }, { 'cidr': '10.0.11.0/24', 'availability_zone': 'us-east-1b' } ], 'database': [ { 'cidr': '10.0.20.0/24', 'availability_zone': 'us-east-1a', 'isolated': True }, { 'cidr': '10.0.21.0/24', 'availability_zone': 'us-east-1b', 'isolated': True } ] }, 'flow_logs': { 'enabled': True, 'traffic_type': 'ALL', 'destination': 'cloudwatch-logs' }, 'network_acls': { 'public': { 'ingress': [ {'rule_number': 100, 'protocol': 'tcp', 'action': 'allow', 'from_port': 443, 'to_port': 443, 'cidr': '0.0.0.0/0'}, {'rule_number': 110, 'protocol': 'tcp', 'action': 'allow', 'from_port': 80, 'to_port': 80, 'cidr': '0.0.0.0/0'} ], 'egress': [ {'rule_number': 100, 'protocol': '-1', 'action': 'allow', 'cidr': '0.0.0.0/0'} ] } } } return vpc_config def design_security_groups(self) -> List[Dict[str, Any]]: """ Design security groups with least privilege """ security_groups = [ { 'name': 'web-tier-sg', 'description': 'Security group for web tier', 'ingress': [ { 'protocol': 'tcp', 'from_port': 443, 'to_port': 443, 'cidr_blocks': ['0.0.0.0/0'], 'description': 'HTTPS from anywhere' }, { 'protocol': 'tcp', 'from_port': 80, 'to_port': 80, 'cidr_blocks': ['0.0.0.0/0'], 'description': 'HTTP from anywhere (redirect to HTTPS)' } ], 'egress': [ { 'protocol': '-1', 'cidr_blocks': ['0.0.0.0/0'], 'description': 'All outbound traffic' } ], 'tags': {'Tier': 'Web', 'Compliance': 'PCI-DSS'} }, { 'name': 'application-tier-sg', 'description': 'Security group for application tier', 'ingress': [ { 'protocol': 'tcp', 'from_port': 8080, 'to_port': 8080, 'security_groups': ['web-tier-sg'], 'description': 'Application port from web tier' }, { 'protocol': 'tcp', 'from_port': 22, 'to_port': 22, 'security_groups': ['bastion-sg'], 'description': 'SSH from bastion only' } ], 'egress': [ { 'protocol': 'tcp', 'from_port': 5432, 'to_port': 5432, 'security_groups': ['database-tier-sg'], 'description': 'Database access' }, { 'protocol': 'tcp', 'from_port': 443, 'to_port': 443, 'cidr_blocks': ['0.0.0.0/0'], 'description': 'HTTPS outbound for APIs' } ], 'tags': {'Tier': 'Application'} }, { 'name': 'database-tier-sg', 'description': 'Security group for database tier', 'ingress': [ { 'protocol': 'tcp', 'from_port': 5432, 'to_port': 5432, 'security_groups': ['application-tier-sg'], 'description': 'PostgreSQL from application tier' } ], 'egress': [], 'tags': {'Tier': 'Database', 'DataClassification': 'Confidential'} } ] return security_groups def design_iam_policies(self) -> List[Dict[str, Any]]: """ Design IAM policies following least privilege """ policies = [ { 'name': 'EC2ReadOnlyPolicy', 'description': 'Read-only access to EC2 resources', 'policy': { 'Version': '2012-10-17', 'Statement': [ { 'Effect': 'Allow', 'Action': [ 'ec2:Describe*', 'ec2:Get*' ], 'Resource': '*' } ] } }, { 'name': 'S3AppDataPolicy', 'description': 'Application-specific S3 access', 'policy': { 'Version': '2012-10-17', 'Statement': [ { 'Effect': 'Allow', 'Action': [ 's3:GetObject', 's3:PutObject', 's3:DeleteObject' ], 'Resource': 'arn:aws:s3:::app-data-bucket/*', 'Condition': { 'IpAddress': { 'aws:SourceIp': ['10.0.0.0/16'] } } }, { 'Effect': 'Allow', 'Action': 's3:ListBucket', 'Resource': 'arn:aws:s3:::app-data-bucket' } ] } }, { 'name': 'SecretsManagerAppPolicy', 'description': 'Access to application secrets', 'policy': { 'Version': '2012-10-17', 'Statement': [ { 'Effect': 'Allow', 'Action': [ 'secretsmanager:GetSecretValue', 'secretsmanager:DescribeSecret' ], 'Resource': [ 'arn:aws:secretsmanager:us-east-1:*:secret:app/*' ], 'Condition': { 'StringEquals': { 'secretsmanager:VersionStage': 'AWSCURRENT' } } } ] } } ] return policies def design_kms_encryption(self) -> List[Dict[str, Any]]: """ Design KMS encryption keys """ keys = [ { 'name': 'app-data-key', 'description': 'Encryption key for application data', 'key_usage': 'ENCRYPT_DECRYPT', 'key_spec': 'SYMMETRIC_DEFAULT', 'rotation_enabled': True, 'key_policy': { 'Version': '2012-10-17', 'Statement': [ { 'Sid': 'Enable IAM User Permissions', 'Effect': 'Allow', 'Principal': { 'AWS': 'arn:aws:iam::ACCOUNT_ID:root' }, 'Action': 'kms:*', 'Resource': '*' }, { 'Sid': 'Allow Application Access', 'Effect': 'Allow', 'Principal': { 'AWS': 'arn:aws:iam::ACCOUNT_ID:role/app-role' }, 'Action': [ 'kms:Encrypt', 'kms:Decrypt', 'kms:ReEncrypt*', 'kms:GenerateDataKey*', 'kms:DescribeKey' ], 'Resource': '*' } ] }, 'tags': { 'Purpose': 'ApplicationDataEncryption', 'Compliance': 'PCI-DSS', 'Rotation': 'Automatic' } }, { 'name': 'database-encryption-key', 'description': 'Encryption key for RDS databases', 'key_usage': 'ENCRYPT_DECRYPT', 'rotation_enabled': True, 'tags': {'Purpose': 'DatabaseEncryption'} } ] return keys def design_s3_security(self) -> List[Dict[str, Any]]: """ Design secure S3 buckets """ buckets = [ { 'name': 'app-data-bucket', 'versioning_enabled': True, 'encryption': { 'sse_algorithm': 'aws:kms', 'kms_key_id': 'app-data-key' }, 'public_access_block': { 'block_public_acls': True, 'block_public_policy': True, 'ignore_public_acls': True, 'restrict_public_buckets': True }, 'lifecycle_rules': [ { 'id': 'delete-old-versions', 'enabled': True, 'noncurrent_version_expiration_days': 90 } ], 'logging': { 'target_bucket': 'app-logs-bucket', 'target_prefix': 'logs/' }, 'replication': { 'enabled': True, 'destination': 'app-data-bucket-dr', 'replication_time': 15 * 60 # 15 minutes }, 'tags': { 'DataClassification': 'Confidential', 'Backup': 'Enabled', 'Retention': '7years' } } ] return buckets def design_cloudtrail(self) -> Dict[str, Any]: """ Design CloudTrail for audit logging """ config = { 'is_multi_region_trail': True, 'include_global_service_events': True, 'enable_log_file_validation': True, 'cloud_watch_logs_role_arn': 'arn:aws:iam::ACCOUNT_ID:role/CloudTrailRole', 'cloud_watch_logs_log_group_arn': 'arn:aws:logs:us-east-1:ACCOUNT_ID:log-group:CloudTrail/Logs', 's3_bucket_name': 'cloudtrail-logs-bucket', 'kms_key_id': 'cloudtrail-key', 'is_organization_trail': True, 'event_selector': [ { 'read_write_type': 'All', 'include_management_events': True, 'data_resources': [ { 'type': 'AWS::S3::Object', 'values': ['arn:aws:s3:::app-data-bucket/*'] }, { 'type': 'AWS::Lambda::Function', 'values': ['arn:aws:lambda:us-east-1:ACCOUNT_ID:function:*'] } ] } ], 'tags': { 'Purpose': 'ComplianceAudit', 'Retention': '7years' } } return config def design_guardduty(self) -> Dict[str, Any]: """ Design GuardDuty threat detection """ config = { 'enable': True, 'finding_publishing_frequency': 'FIFTEEN_MINUTES', 'datasources': { 's3_logs': { 'enable': True }, 'kubernetes': { 'audit_logs': { 'enable': True } }, 'malware_protection': { 'scan_ec2_instance_data_sources': { 'ebs_volumes': { 'enable': True } } } }, 'tags': { 'Purpose': 'ThreatDetection' } } return config def generate_security_blueprint(self) -> Dict[str, Any]: """ Generate complete security blueprint """ return { 'vpc': self.design_vpc_security(), 'security_groups': self.design_security_groups(), 'iam_policies': self.design_iam_policies(), 'kms_keys': self.design_kms_encryption(), 's3_buckets': self.design_s3_security(), 'cloudtrail': self.design_cloudtrail(), 'guardduty': self.design_guardduty(), 'compliance': { 'standards': ['PCI-DSS', 'HIPAA', 'SOC2', 'ISO27001'], 'automated_compliance_checks': True, 'continuous_monitoring': True } } ``` ### 3. Security Monitoring Architecture #### SIEM Integration Patterns ```python # Security Information and Event Management (SIEM) Integration from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from enum import Enum import json import hashlib class AlertSeverity(Enum): """SIEM alert severity levels""" INFO = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 class EventType(Enum): """Security event types""" AUTHENTICATION = "authentication" AUTHORIZATION = "authorization" DATA_ACCESS = "data_access" NETWORK_TRAFFIC = "network_traffic" SYSTEM_EVENT = "system_event" APPLICATION_EVENT = "application_event" MALWARE_DETECTION = "malware_detection" ANOMALY_DETECTION = "anomaly_detection" @dataclass class SecurityEvent: """Security event for SIEM""" timestamp: datetime event_type: EventType source: str user_id: Optional[str] resource: Optional[str] details: Dict[str, Any] severity: AlertSeverity = AlertSeverity.INFO correlation_id: Optional[str] = None def to_json(self) -> str: """Convert event to JSON for SIEM""" return json.dumps({ 'timestamp': self.timestamp.isoformat(), 'event_type': self.event_type.value, 'source': self.source, 'user_id': self.user_id, 'resource': self.resource, 'details': self.details, 'severity': self.severity.name, 'correlation_id': self.correlation_id }) class SIEMIntegration: """SIEM integration for security monitoring""" def __init__(self, config: Dict[str, Any]): self.config = config self.event_queue: List[SecurityEvent] = [] self.alert_rules: List[AlertRule] = [] self.correlation_engine = CorrelationEngine() def send_event(self, event: SecurityEvent): """ Send security event to SIEM """ # Add correlation ID if not present if not event.correlation_id: event.correlation_id = self._generate_correlation_id(event) # Add to event queue self.event_queue.append(event) # Check against alert rules self._evaluate_alert_rules(event) # Send to SIEM self._send_to_siem(event) def _generate_correlation_id(self, event: SecurityEvent) -> str: """Generate correlation ID for event""" data = f"{event.timestamp}_{event.event_type}_{event.source}" return hashlib.sha256(data.encode()).hexdigest()[:16] def _evaluate_alert_rules(self, event: SecurityEvent): """ Evaluate event against alert rules """ for rule in self.alert_rules: if rule.matches(event): alert = rule.generate_alert(event) self._send_alert(alert) def _send_to_siem(self, event: SecurityEvent): """Send event to SIEM (Splunk, ELK, Sentinel, etc.)""" # Implementation depends on SIEM platform pass def _send_alert(self, alert: 'SecurityAlert'): """Send security alert""" # Implementation for alert notification pass class AlertRule: """SIEM alert rule""" def __init__(self, name: str, conditions: Dict[str, Any], severity: AlertSeverity, action: str): self.name = name self.conditions = conditions self.severity = severity self.action = action def matches(self, event: SecurityEvent) -> bool: """Check if event matches rule conditions""" # Check event type if 'event_type' in self.conditions: if event.event_type != EventType(self.conditions['event_type']): return False # Check severity threshold if 'min_severity' in self.conditions: if event.severity.value < self.conditions['min_severity']: return False # Check field conditions if 'fields' in self.conditions: for field, expected_value in self.conditions['fields'].items(): if field in event.details: if event.details[field] != expected_value: return False return True def generate_alert(self, event: SecurityEvent) -> 'SecurityAlert': """Generate alert from matching event""" return SecurityAlert( rule_name=self.name, severity=self.severity, event=event, action=self.action, timestamp=datetime.now() ) @dataclass class SecurityAlert: """Security alert generated by SIEM""" rule_name: str severity: AlertSeverity event: SecurityEvent action: str timestamp: datetime status: str = "open" incident_id: Optional[str] = None class CorrelationEngine: """Event correlation engine""" def __init__(self): self.event_history: List[SecurityEvent] = [] self.correlation_window = timedelta(minutes=15) def correlate_events(self, events: List[SecurityEvent]) -> List['CorrelatedEvent']: """ Correlate related security events """ correlated = [] # Group events by correlation ID groups = {} for event in events: if event.correlation_id not in groups: groups[event.correlation_id] = [] groups[event.correlation_id].append(event) # Analyze each group for correlation_id, event_group in groups.items(): if len(event_group) > 1: correlated.append(CorrelatedEvent( correlation_id=correlation_id, events=event_group, pattern=self._identify_pattern(event_group), risk_score=self._calculate_risk_score(event_group) )) return correlated def _identify_pattern(self, events: List[SecurityEvent]) -> str: """Identify attack pattern in event group""" event_types = {e.event_type for e in events} # Common attack patterns if {EventType.AUTHENTICATION, EventType.DATA_ACCESS}.issubset(event_types): return "data_exfiltration_attempt" if events[-1].event_type == EventType.MALWARE_DETECTION: return "malware_infection" if EventType.AUTHENTICATION in event_types: failed_auths = [e for e in events if e.details.get('success') is False] if len(failed_auths) >= 5: return "brute_force_attack" return "suspicious_activity" def _calculate_risk_score(self, events: List[SecurityEvent]) -> float: """Calculate risk score for event group""" base_score = sum(e.severity.value for e in events) / len(events) # Multiply by number of events multiplier = min(len(events), 10) / 10 return base_score * multiplier @dataclass class CorrelatedEvent: """Correlated security events""" correlation_id: str events: List[SecurityEvent] pattern: str risk_score: float class ThreatDetection: """Threat detection algorithms""" def __init__(self, siem: SIEMIntegration): self.siem = siem self.baseline = {} self.anomaly_threshold = 2.0 # Standard deviations def detect_anomalies(self, events: List[SecurityEvent]) -> List[SecurityEvent]: """ Detect anomalous security events """ anomalies = [] for event in events: # Check if event deviates from baseline if self._is_anomalous(event): # Create anomaly detection event anomaly_event = SecurityEvent( timestamp=datetime.now(), event_type=EventType.ANOMALY_DETECTION, source="threat_detection_engine", user_id=event.user_id, resource=event.resource, details={ 'original_event': event.to_json(), 'anomaly_type': self._identify_anomaly_type(event) }, severity=AlertSeverity.HIGH ) anomalies.append(anomaly_event) return anomalies def _is_anomalous(self, event: SecurityEvent) -> bool: """Check if event is anomalous""" # Statistical anomaly detection # Implementation depends on baseline statistics return False def _identify_anomaly_type(self, event: SecurityEvent) -> str: """Identify type of anomaly""" # Machine learning based anomaly type identification return "statistical_anomaly" ``` ### 4. Threat Modeling #### STRIDE Methodology Implementation ```python # Threat Modeling using STRIDE Methodology from dataclasses import dataclass from typing import List, Dict, Any from enum import Enum class STRIDETthreatType(Enum): """STRIDE threat categories""" SPOOFING = "spoofing" TAMPERING = "tampering" REPUDIATION = "repudiation" INFORMATION_DISCLOSURE = "information_disclosure" DENIAL_OF_SERVICE = "denial_of_service" ELEVATION_OF_PRIVILEGE = "elevation_of_privilege" @dataclass class DataFlow: """Data flow diagram element""" id: str name: str type: str # 'process', 'data_store', 'external_entity', 'data_flow' trust_level: str # 'trusted', 'untrusted', 'semi-trusted' assets: List[str] controls: List[str] @dataclass class Threat: """Identified threat""" threat_type: STRIDETthreatType description: str target: str likelihood: str # 'low', 'medium', 'high' impact: str # 'low', 'medium', 'high' mitigation: str controls: List[str] class ThreatModel: """Threat model for system architecture""" def __init__(self, system_name: str): self.system_name = system_name self.data_flows: List[DataFlow] = [] self.threats: List[Threat] = [] def analyze_threats(self) -> List[Threat]: """ Analyze threats using STRIDE methodology """ threats = [] for data_flow in self.data_flows: # Spoofing threats if data_flow.type == 'external_entity': threats.extend(self._check_spoofing(data_flow)) # Tampering threats if data_flow.type in ['data_flow', 'data_store']: threats.extend(self._check_tampering(data_flow)) # Repudiation threats if data_flow.type == 'process': threats.extend(self._check_repudiation(data_flow)) # Information disclosure threats if data_flow.type == 'data_flow': threats.extend(self._check_information_disclosure(data_flow)) # DoS threats threats.extend(self._check_denial_of_service(data_flow)) # Elevation of privilege threats if data_flow.type == 'process': threats.extend(self._check_elevation_of_privilege(data_flow)) self.threats = threats return threats def _check_spoofing(self, data_flow: DataFlow) -> List[Threat]: """Check for spoofing threats""" threats = [] if data_flow.trust_level != 'trusted': threats.append(Threat( threat_type=STRIDETthreatType.SPOOFING, description=f"Spoofing of {data_flow.name}", target=data_flow.id, likelihood="high" if data_flow.trust_level == "untrusted" else "medium", impact="high", mitigation="Implement mutual authentication and certificate validation", controls=["mfa", "certificate_pinning", "identity_verification"] )) return threats def _check_tampering(self, data_flow: DataFlow) -> List[Threat]: """Check for tampering threats""" threats = [] if 'encryption' not in data_flow.controls: threats.append(Threat( threat_type=STRIDETthreatType.TAMPERING, description=f"Data tampering in {data_flow.name}", target=data_flow.id, likelihood="medium", impact="high", mitigation="Implement end-to-end encryption and data integrity checks", controls=["encryption", "digital_signatures", "hashing"] )) return threats def _check_repudiation(self, data_flow: DataFlow) -> List[Threat]: """Check for repudiation threats""" threats = [] if 'audit_logging' not in data_flow.controls: threats.append(Threat( threat_type=STRIDETthreatType.REPUDIATION, description=f"Non-repudiation not enforced for {data_flow.name}", target=data_flow.id, likelihood="medium", impact="medium", mitigation="Implement comprehensive audit logging", controls=["audit_logging", "non_repudiation", "digital_signatures"] )) return threats def _check_information_disclosure(self, data_flow: DataFlow) -> List[Threat]: """Check for information disclosure threats""" threats = [] if data_flow.trust_level != 'trusted': for asset in data_flow.assets: if 'sensitive' in asset.lower() or 'pii' in asset.lower(): threats.append(Threat( threat_type=STRIDETthreatType.INFORMATION_DISCLOSURE, description=f"Sensitive data disclosure in {data_flow.name}", target=data_flow.id, likelihood="high", impact="high", mitigation="Implement data encryption and access controls", controls=["encryption", "access_controls", "data_masking"] )) return threats def _check_denial_of_service(self, data_flow: DataFlow) -> List[Threat]: """Check for DoS threats""" threats = [] if 'rate_limiting' not in data_flow.controls: threats.append(Threat( threat_type=STRIDETthreatType.DENIAL_OF_SERVICE, description=f"DoS vulnerability in {data_flow.name}", target=data_flow.id, likelihood="medium", impact="high", mitigation="Implement rate limiting and throttling", controls=["rate_limiting", "throttling", "circuit_breaker"] )) return threats def _check_elevation_of_privilege(self, data_flow: DataFlow) -> List[Threat]: """Check for elevation of privilege threats""" threats = [] if 'authorization' not in data_flow.controls: threats.append(Threat( threat_type=STRIDETthreatType.ELEVATION_OF_PRIVILEGE, description=f"Privilege escalation in {data_flow.name}", target=data_flow.id, likelihood="medium", impact="high", mitigation="Implement proper authorization and privilege management", controls=["authorization", "role_based_access", "principle_of_least_privilege"] )) return threats def generate_threat_report(self) -> Dict[str, Any]: """Generate threat model report""" threats = self.analyze_threats() # Calculate risk scores high_risk = [t for t in threats if t.likelihood == "high" and t.impact == "high"] medium_risk = [t for t in threats if t.likelihood in ["high", "medium"] and t.impact in ["high", "medium"]] return { 'system': self.system_name, 'total_threats': len(threats), 'high_risk_threats': len(high_risk), 'medium_risk_threats': len(medium_risk), 'threats_by_type': { threat_type.value: len([t for t in threats if t.threat_type == threat_type]) for threat_type in STRIDETthreatType }, 'recommended_controls': self._get_recommended_controls(threats), 'threats': [ { 'type': t.threat_type.value, 'description': t.description, 'likelihood': t.likelihood, 'impact': t.impact, 'mitigation': t.mitigation } for t in threats ] } def _get_recommended_controls(self, threats: List[Threat]) -> List[str]: """Get recommended security controls""" controls = set() for threat in threats: controls.update(threat.controls) return list(controls) ``` ### 5. Compliance Frameworks #### GDPR Compliance Implementation ```python # GDPR Compliance Implementation from dataclasses import dataclass from typing import List, Dict, Any, Optional from datetime import datetime, timedelta from enum import Enum class GDPRRight(Enum): """GDPR data subject rights""" ACCESS = "right_to_access" RECTIFICATION = "right_to_rectification" ERASURE = "right_to_erasure" RESTRICT = "right_to_restrict_processing" PORTABILITY = "right_to_portability" OBJECT = "right_to_object" AUTOMATED_DECISION = "right_to_not_be_subject_to_automated_decision" class DataClassification(Enum): """Data sensitivity classification""" PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted" SPECIAL_CATEGORY = "special_category" # GDPR special category data @dataclass class DataSubject: """Data subject (individual)""" id: str personal_data: Dict[str, Any] consent_records: List['ConsentRecord'] access_requests: List['DataAccessRequest'] data_classification: DataClassification @dataclass class ConsentRecord: """Record of consent""" purpose: str granted_at: datetime expires_at: Optional[datetime] withdrawn_at: Optional[datetime] lawful_basis: str # 'consent', 'contract', 'legal_obligation', etc. @dataclass class DataAccessRequest: """Data subject access request""" request_type: GDPRRight requested_at: datetime deadline: datetime status: str # 'pending', 'in_progress', 'completed' response: Optional[Dict[str, Any]] = None class GDPRCompliance: """GDPR compliance implementation""" def __init__(self): self.data_subjects: Dict[str, DataSubject] = {} self.processing_activities: List[Dict[str, Any]] = [] self.data_breach_records: List[Dict[str, Any]] = [] def record_consent(self, subject_id: str, purpose: str, lawful_basis: str, duration_days: Optional[int] = None) -> ConsentRecord: """ Record data subject consent """ consent = ConsentRecord( purpose=purpose, granted_at=datetime.now(), expires_at=datetime.now() + timedelta(days=duration_days) if duration_days else None, withdrawn_at=None, law_basis=lawful_basis ) # Get or create data subject if subject_id not in self.data_subjects: self.data_subjects[subject_id] = DataSubject( id=subject_id, personal_data={}, consent_records=[], access_requests=[], data_classification=DataClassification.INTERNAL ) self.data_subjects[subject_id].consent_records.append(consent) return consent def verify_consent(self, subject_id: str, purpose: str) -> bool: """ Verify if valid consent exists """ if subject_id not in self.data_subjects: return False subject = self.data_subjects[subject_id] # Check for valid consent for purpose for consent in subject.consent_records: if consent.purpose == purpose: # Check if consent is still valid if consent.withdrawn_at is not None: return False # Consent was withdrawn if consent.expires_at and datetime.now() > consent.expires_at: return False # Consent expired return True # Valid consent found return False # No consent found def handle_access_request(self, subject_id: str, request_type: GDPRRight) -> DataAccessRequest: """ Handle data subject access request """ # Create access request request = DataAccessRequest( request_type=request_type, requested_at=datetime.now(), deadline=datetime.now() + timedelta(days=30), # GDPR requires response within 30 days status='pending' ) # Get data subject if subject_id in self.data_subjects: subject = self.data_subjects[subject_id] if request_type == GDPRRight.ACCESS: # Provide copy of all personal data request.response = { 'personal_data': subject.personal_data, 'consent_records': [ { 'purpose': c.purpose, 'granted_at': c.granted_at.isoformat(), 'lawful_basis': c.lawful_basis } for c in subject.consent_records ], 'processing_activities': [ activity for activity in self.processing_activities if activity['subject_id'] == subject_id ] } elif request_type == GDPRRight.ERASURE: # Right to be forgotten if self._can_erase_data(subject): subject.personal_data = {} subject.consent_records = [] request.response = {'status': 'data_erased'} elif request_type == GDPRRight.PORTABILITY: # Right to data portability request.response = { 'personal_data': subject.personal_data, 'format': 'machine_readable_format' } request.status = 'completed' subject.access_requests.append(request) return request def _can_erase_data(self, subject: DataSubject) -> bool: """Check if data can be erased""" # Cannot erase if required for legal obligation # Cannot erase if required for performance of contract # Implementation depends on specific circumstances return True def record_data_breach(self, breach_details: Dict[str, Any]) -> str: """ Record data breach for GDPR notification requirements """ breach_record = { 'breach_id': self._generate_breach_id(), 'detected_at': datetime.now().isoformat(), 'description': breach_details.get('description'), 'affected_subjects': breach_details.get('affected_subjects', []), 'data_types': breach_details.get('data_types', []), 'severity': breach_details.get('severity', 'medium'), 'containment_actions': breach_details.get('containment_actions', []), 'notification_to_authority': False, 'notification_to_subjects': False } self.data_breach_records.append(breach_record) # Check if breach needs to be reported to authorities (within 72 hours) if breach_details.get('risk_to_individuals') == 'high': self._notify_data_protection_authority(breach_record) # Check if breach needs to be reported to data subjects if breach_details.get('high_risk_to_rights_freedoms'): self._notify_data_subjects(breach_record) return breach_record['breach_id'] def _notify_data_protection_authority(self, breach: Dict[str, Any]): """Notify data protection authority of breach""" # Implementation for notifying authority within 72 hours breach['notification_to_authority'] = True breach['authority_notified_at'] = datetime.now().isoformat() def _notify_data_subjects(self, breach: Dict[str, Any]): """Notify data subjects of breach""" # Implementation for notifying affected individuals breach['notification_to_subjects'] = True breach['subjects_notified_at'] = datetime.now().isoformat() def _generate_breach_id(self) -> str: """Generate breach ID""" return f"breach_{datetime.now().strftime('%Y%m%d_%H%M%S')}" def generate_compliance_report(self) -> Dict[str, Any]: """Generate GDPR compliance report""" return { 'total_data_subjects': len(self.data_subjects), 'active_consent_records': sum( len([c for c in s.consent_records if c.withdrawn_at is None]) for s in self.data_subjects.values() ), 'pending_access_requests': sum( len([r for r in s.access_requests if r.status == 'pending']) for s in self.data_subjects.values() ), 'data_breaches': len(self.data_breach_records), 'processing_activities': len(self.processing_activities), 'compliance_status': self._check_compliance_status() } def _check_compliance_status(self) -> Dict[str, Any]: """Check GDPR compliance status""" return { 'consent_management': 'compliant', 'data_subject_rights': 'compliant', 'data_breach_notification': 'compliant', 'data_protection_by_design': 'compliant', 'data_protection_officer': 'appointed', 'records_of_processing_activities': 'maintained' } ``` ### 6. Decision Trees #### Security Control Selection ``` Threat type? │ ├─ Spoofing → Authentication, Identity verification ├─ Tampering → Encryption, Digital signatures, Integrity checks ├─ Repudiation → Audit logging, Non-repudiation controls ├─ Information disclosure → Encryption, Access controls, DLP ├─ Denial of service → Rate limiting, Redundancy, DDoS protection └─ Elevation of privilege → Authorization, Least privilege, RBAC ``` #### Compliance Framework Selection ``` Data type and location? │ ├─ EU personal data → GDPR ├─ US healthcare data → HIPAA ├─ Payment card data → PCI-DSS ├─ Financial data → SOX, Basel III ├─ Federal systems → NIST 800-53 └─ Global operations → ISO 27001 ``` ### 7. Anti-Patterns to Avoid 1. **Trust by default**: Never trust, always verify 2. **Hard-coded secrets**: Never hard-code credentials 3. **Over-permissive IAM**: Always follow least privilege 4. **Missing monitoring**: You can't secure what you can't see 5. **Ignoring compliance**: Compliance is a baseline, not a ceiling 6. **Security theater**: Focus on real threats, not just checkboxes 7. **Neglecting supply chain**: Secure your entire supply chain 8. **No incident response**: Have a plan before you need it 9. **Secrets in code**: Never commit secrets to repositories 10. **Assuming internal safety**: Zero trust applies everywhere ### 8. Quality Checklist Before considering security architecture complete: - [ ] Threat modeling completed - [ ] Security controls implemented for all threats - [ ] Data classification implemented - [ ] Encryption at rest and in transit - [ ] Identity and access management configured - [ ] Security monitoring and logging implemented - [ ] Incident response plan created - [ ] Compliance requirements addressed - [ ] Security testing performed - [ ] Penetration testing completed - [ ] Security documentation complete - [ ] Secure development practices implemented - [ ] Third-party security assessed - [ ] Business continuity plan tested - [ ] Security awareness training completed - [ ] Vulnerability management program active - [ ] Regular security reviews scheduled - [ ] Data retention policies defined - [ ] Privacy by design implemented - [ ] Security metrics and KPIs defined This comprehensive skill definition provides complete guidance for security architecture across modern systems.