Files
Pony-Alpha-2-Dataset-Training/skills/skill-security-architecture.md
Pony Alpha 2 68453089ee feat: initial Alpha Brain 2 dataset release
Massive training corpus for AI coding models containing:
- 10 JSONL training datasets (641+ examples across coding, reasoning, planning, architecture, communication, debugging, security, workflows, error handling, UI/UX)
- 11 agent behavior specifications (explorer, planner, reviewer, debugger, executor, UI designer, Linux admin, kernel engineer, security architect, automation engineer, API architect)
- 6 skill definition files (coding, API engineering, kernel, Linux server, security architecture, server automation, UI/UX)
- Master README with project origin story and philosophy

Built by Pony Alpha 2 to help AI models learn expert-level coding approaches.
2026-03-13 16:26:29 +04:00

55 KiB

Security Architecture Expert Skill

Activation Criteria

Activate this skill when the user:

  • Designs secure system architectures
  • Implements zero trust networks
  • Configures cloud security (AWS/Azure/GCP)
  • Designs application security programs
  • Creates IAM/authorization systems
  • Implements security monitoring (SIEM, SOAR)
  • Develops incident response procedures
  • Needs compliance frameworks (PCI-DSS, HIPAA, GDPR, NIST, SOC 2)
  • Performs threat modeling
  • Designs supply chain security
  • Implements DevSecOps practices
  • Creates security policies and standards
  • Designs secure APIs and microservices
  • Implements data protection and encryption strategies

Core Methodology

1. Zero Trust Architecture

Zero Trust Principles

# Zero Trust Architecture Framework
zero_trust_principles:
  verification_required: "Never trust, always verify"
  least_privilege: "Minimum required access only"
  assume_breach: "Design assuming compromise"
  microsegmentation: "Segment everything"
  continuous_monitoring: "Monitor and validate continuously"

# Zero Trust Implementation Pillars
identity:
  - Multi-factor authentication (MFA)
  - Risk-based authentication
  - Single sign-on (SSO) consolidation
  - Privileged access management (PAM)
  - Just-in-time access provisioning

devices:
  - Device health attestation
  - OS and application patching
  - Endpoint detection and response (EDR)
  - Mobile device management (MDM)
  - Device inventory and classification

network:
  - Microsegmentation
  - Software-defined perimeter (SDP)
  - Next-generation firewalls (NGFW)
  - Zero trust network access (ZTNA)
  - East-west traffic inspection

applications:
  - Application security testing
  - API security and gateways
  - Container and Kubernetes security
  - Serverless function security
  - Runtime application self-protection (RASP)

data:
  - Data classification and labeling
  - Encryption at rest and in transit
  - Data loss prevention (DLP)
  - Rights management (IRM/DRM)
  - Database activity monitoring

Zero Trust Network Implementation

# Zero Trust Network Access (ZTNA) Implementation
# Policy Engine for Access Control

from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional, Set
from datetime import datetime, timedelta
import hashlib
import jwt
import ipaddress

class TrustLevel(Enum):
    """Trust levels for zero trust decisions"""
    NONE = 0
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class ResourceType(Enum):
    """Types of protected resources"""
    API = "api"
    DATABASE = "database"
    STORAGE = "storage"
    SERVICE = "service"
    FILE = "file"

@dataclass
class Subject:
    """Requesting entity (user, service, device)"""
    id: str
    type: str  # 'user', 'service', 'device'
    attributes: Dict
    trust_level: TrustLevel = TrustLevel.NONE

@dataclass
class Resource:
    """Protected resource"""
    id: str
    type: ResourceType
    sensitivity: str  # 'public', 'internal', 'confidential', 'restricted'
    owner: str
    tags: Set[str]

@dataclass
class Context:
    """Request context for decision making"""
    source_ip: str
    destination_ip: str
    timestamp: datetime
    user_agent: Optional[str] = None
    device_health_score: Optional[float] = None
    geo_location: Optional[str] = None
    network_trust: Optional[float] = None

class PolicyEngine:
    """Zero Trust Policy Engine"""

    def __init__(self):
        self.policies: List[Policy] = []
        self.trust_scores: Dict[str, float] = {}

    def evaluate_access(self,
                       subject: Subject,
                       resource: Resource,
                       action: str,
                       context: Context) -> AccessDecision:
        """
        Evaluate access request using zero trust principles
        """
        # Step 1: Verify identity
        if not self._verify_identity(subject):
            return AccessDecision(
                allowed=False,
                reason="Identity verification failed"
            )

        # Step 2: Check device health
        if not self._check_device_health(context):
            return AccessDecision(
                allowed=False,
                reason="Device does not meet health requirements"
            )

        # Step 3: Evaluate policies
        for policy in self.policies:
            if policy.matches(subject, resource, action, context):
                decision = policy.evaluate(subject, resource, action, context)
                if decision is not None:
                    return decision

        # Default deny
        return AccessDecision(
            allowed=False,
            reason="No matching policy - default deny"
        )

    def _verify_identity(self, subject: Subject) -> bool:
        """Verify subject identity with MFA"""
        # Check MFA status
        if not subject.attributes.get('mfa_verified', False):
            return False

        # Check if identity is not expired
        if 'expiry' in subject.attributes:
            expiry = subject.attributes['expiry']
            if datetime.now() > expiry:
                return False

        return True

    def _check_device_health(self, context: Context) -> bool:
        """Check device health score"""
        if context.device_health_score is None:
            return False

        return context.device_health_score >= 0.7

class Policy:
    """Access control policy"""

    def __init__(self,
                 name: str,
                 conditions: Dict,
                 effect: str):
        self.name = name
        self.conditions = conditions
        self.effect = effect  # 'allow' or 'deny'

    def matches(self,
                subject: Subject,
                resource: Resource,
                action: str,
                context: Context) -> bool:
        """Check if policy matches the request"""
        # Check subject conditions
        if 'subject' in self.conditions:
            for key, value in self.conditions['subject'].items():
                if subject.attributes.get(key) != value:
                    return False

        # Check resource conditions
        if 'resource' in self.conditions:
            if resource.type.value != self.conditions['resource'].get('type'):
                return False
            if resource.sensitivity != self.conditions['resource'].get('sensitivity'):
                return False

        # Check action conditions
        if 'action' in self.conditions:
            if action not in self.conditions['action']:
                return False

        # Check context conditions
        if 'context' in self.conditions:
            ctx_conditions = self.conditions['context']

            # Check IP range
            if 'allowed_ips' in ctx_conditions:
                ip = ipaddress.ip_address(context.source_ip)
                allowed = False
                for allowed_range in ctx_conditions['allowed_ips']:
                    network = ipaddress.ip_network(allowed_range)
                    if ip in network:
                        allowed = True
                        break
                if not allowed:
                    return False

            # Check time restrictions
            if 'time_restrictions' in ctx_conditions:
                time_restriction = ctx_conditions['time_restrictions']
                current_hour = context.timestamp.hour
                if 'working_hours' in time_restriction:
                    start, end = time_restriction['working_hours']
                    if not (start <= current_hour < end):
                        return False

        return True

    def evaluate(self,
                 subject: Subject,
                 resource: Resource,
                 action: str,
                 context: Context) -> Optional[AccessDecision]:
        """Evaluate the policy"""
        if self.effect == 'allow':
            return AccessDecision(
                allowed=True,
                reason=f"Allowed by policy: {self.name}"
            )
        else:
            return AccessDecision(
                allowed=False,
                reason=f"Denied by policy: {self.name}"
            )

@dataclass
class AccessDecision:
    """Access decision with audit trail"""
    allowed: bool
    reason: str
    expires_at: Optional[datetime] = None
    additional_conditions: Optional[List[str]] = None

class ZeroTrustGateway:
    """Zero Trust Network Access Gateway"""

    def __init__(self, policy_engine: PolicyEngine):
        self.policy_engine = policy_engine
        self.session_manager = SessionManager()

    def handle_request(self,
                      subject: Subject,
                      resource: Resource,
                      action: str,
                      context: Context) -> GatewayResponse:
        """
        Handle zero trust access request
        """
        # Evaluate access
        decision = self.policy_engine.evaluate_access(
            subject, resource, action, context
        )

        if decision.allowed:
            # Create session
            session = self.session_manager.create_session(
                subject, resource, context
            )

            return GatewayResponse(
                success=True,
                session_token=session.token,
                expires_at=session.expires_at,
                decision=decision
            )
        else:
            return GatewayResponse(
                success=False,
                decision=decision
            )

class SessionManager:
    """Manage zero trust sessions"""

    def __init__(self):
        self.sessions: Dict[str, Session] = {}

    def create_session(self,
                      subject: Subject,
                      resource: Resource,
                      context: Context) -> 'Session':
        """Create a new session"""
        token = self._generate_token(subject, resource, context)
        expires_at = datetime.now() + timedelta(hours=1)

        session = Session(
            token=token,
            subject_id=subject.id,
            resource_id=resource.id,
            created_at=datetime.now(),
            expires_at=expires_at,
            context=context
        )

        self.sessions[token] = session
        return session

    def validate_session(self, token: str) -> bool:
        """Validate session token"""
        session = self.sessions.get(token)
        if not session:
            return False

        if datetime.now() > session.expires_at:
            del self.sessions[token]
            return False

        return True

    def _generate_token(self,
                       subject: Subject,
                       resource: Resource,
                       context: Context) -> str:
        """Generate session token"""
        payload = {
            'subject_id': subject.id,
            'resource_id': resource.id,
            'timestamp': datetime.now().isoformat(),
            'ip': context.source_ip
        }

        token = jwt.encode(payload, 'secret', algorithm='HS256')
        return token

@dataclass
class Session:
    """Zero trust session"""
    token: str
    subject_id: str
    resource_id: str
    created_at: datetime
    expires_at: datetime
    context: Context

@dataclass
class GatewayResponse:
    """Gateway response"""
    success: bool
    session_token: Optional[str] = None
    expires_at: Optional[datetime] = None
    decision: Optional[AccessDecision] = None

2. Cloud Security Architecture

AWS Security Implementation

# AWS Security Infrastructure as Code
# Using Terraform-style Python for AWS security setup

import json
from typing import List, Dict, Any

class AWSSecurityArchitecture:
    """AWS security infrastructure configuration"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.security_groups = []
        self.iam_policies = []
        self.kms_keys = []
        self.s3_buckets = []
        self.cloudtrail_config = None
        self.guardduty_config = None

    def design_vpc_security(self) -> Dict[str, Any]:
        """
        Design secure VPC architecture
        """
        vpc_config = {
            'vpc': {
                'cidr_block': '10.0.0.0/16',
                'enable_dns_support': True,
                'enable_dns_hostnames': True,
                'tags': {
                    'Name': 'production-vpc',
                    'Environment': 'production',
                    'Compliance': 'PCI-DSS'
                }
            },
            'subnets': {
                'public': [
                    {
                        'cidr': '10.0.1.0/24',
                        'availability_zone': 'us-east-1a',
                        'map_public_ip_on_launch': True
                    },
                    {
                        'cidr': '10.0.2.0/24',
                        'availability_zone': 'us-east-1b',
                        'map_public_ip_on_launch': True
                    }
                ],
                'private': [
                    {
                        'cidr': '10.0.10.0/24',
                        'availability_zone': 'us-east-1a'
                    },
                    {
                        'cidr': '10.0.11.0/24',
                        'availability_zone': 'us-east-1b'
                    }
                ],
                'database': [
                    {
                        'cidr': '10.0.20.0/24',
                        'availability_zone': 'us-east-1a',
                        'isolated': True
                    },
                    {
                        'cidr': '10.0.21.0/24',
                        'availability_zone': 'us-east-1b',
                        'isolated': True
                    }
                ]
            },
            'flow_logs': {
                'enabled': True,
                'traffic_type': 'ALL',
                'destination': 'cloudwatch-logs'
            },
            'network_acls': {
                'public': {
                    'ingress': [
                        {'rule_number': 100, 'protocol': 'tcp', 'action': 'allow',
                         'from_port': 443, 'to_port': 443, 'cidr': '0.0.0.0/0'},
                        {'rule_number': 110, 'protocol': 'tcp', 'action': 'allow',
                         'from_port': 80, 'to_port': 80, 'cidr': '0.0.0.0/0'}
                    ],
                    'egress': [
                        {'rule_number': 100, 'protocol': '-1', 'action': 'allow',
                         'cidr': '0.0.0.0/0'}
                    ]
                }
            }
        }

        return vpc_config

    def design_security_groups(self) -> List[Dict[str, Any]]:
        """
        Design security groups with least privilege
        """
        security_groups = [
            {
                'name': 'web-tier-sg',
                'description': 'Security group for web tier',
                'ingress': [
                    {
                        'protocol': 'tcp',
                        'from_port': 443,
                        'to_port': 443,
                        'cidr_blocks': ['0.0.0.0/0'],
                        'description': 'HTTPS from anywhere'
                    },
                    {
                        'protocol': 'tcp',
                        'from_port': 80,
                        'to_port': 80,
                        'cidr_blocks': ['0.0.0.0/0'],
                        'description': 'HTTP from anywhere (redirect to HTTPS)'
                    }
                ],
                'egress': [
                    {
                        'protocol': '-1',
                        'cidr_blocks': ['0.0.0.0/0'],
                        'description': 'All outbound traffic'
                    }
                ],
                'tags': {'Tier': 'Web', 'Compliance': 'PCI-DSS'}
            },
            {
                'name': 'application-tier-sg',
                'description': 'Security group for application tier',
                'ingress': [
                    {
                        'protocol': 'tcp',
                        'from_port': 8080,
                        'to_port': 8080,
                        'security_groups': ['web-tier-sg'],
                        'description': 'Application port from web tier'
                    },
                    {
                        'protocol': 'tcp',
                        'from_port': 22,
                        'to_port': 22,
                        'security_groups': ['bastion-sg'],
                        'description': 'SSH from bastion only'
                    }
                ],
                'egress': [
                    {
                        'protocol': 'tcp',
                        'from_port': 5432,
                        'to_port': 5432,
                        'security_groups': ['database-tier-sg'],
                        'description': 'Database access'
                    },
                    {
                        'protocol': 'tcp',
                        'from_port': 443,
                        'to_port': 443,
                        'cidr_blocks': ['0.0.0.0/0'],
                        'description': 'HTTPS outbound for APIs'
                    }
                ],
                'tags': {'Tier': 'Application'}
            },
            {
                'name': 'database-tier-sg',
                'description': 'Security group for database tier',
                'ingress': [
                    {
                        'protocol': 'tcp',
                        'from_port': 5432,
                        'to_port': 5432,
                        'security_groups': ['application-tier-sg'],
                        'description': 'PostgreSQL from application tier'
                    }
                ],
                'egress': [],
                'tags': {'Tier': 'Database', 'DataClassification': 'Confidential'}
            }
        ]

        return security_groups

    def design_iam_policies(self) -> List[Dict[str, Any]]:
        """
        Design IAM policies following least privilege
        """
        policies = [
            {
                'name': 'EC2ReadOnlyPolicy',
                'description': 'Read-only access to EC2 resources',
                'policy': {
                    'Version': '2012-10-17',
                    'Statement': [
                        {
                            'Effect': 'Allow',
                            'Action': [
                                'ec2:Describe*',
                                'ec2:Get*'
                            ],
                            'Resource': '*'
                        }
                    ]
                }
            },
            {
                'name': 'S3AppDataPolicy',
                'description': 'Application-specific S3 access',
                'policy': {
                    'Version': '2012-10-17',
                    'Statement': [
                        {
                            'Effect': 'Allow',
                            'Action': [
                                's3:GetObject',
                                's3:PutObject',
                                's3:DeleteObject'
                            ],
                            'Resource': 'arn:aws:s3:::app-data-bucket/*',
                            'Condition': {
                                'IpAddress': {
                                    'aws:SourceIp': ['10.0.0.0/16']
                                }
                            }
                        },
                        {
                            'Effect': 'Allow',
                            'Action': 's3:ListBucket',
                            'Resource': 'arn:aws:s3:::app-data-bucket'
                        }
                    ]
                }
            },
            {
                'name': 'SecretsManagerAppPolicy',
                'description': 'Access to application secrets',
                'policy': {
                    'Version': '2012-10-17',
                    'Statement': [
                        {
                            'Effect': 'Allow',
                            'Action': [
                                'secretsmanager:GetSecretValue',
                                'secretsmanager:DescribeSecret'
                            ],
                            'Resource': [
                                'arn:aws:secretsmanager:us-east-1:*:secret:app/*'
                            ],
                            'Condition': {
                                'StringEquals': {
                                    'secretsmanager:VersionStage': 'AWSCURRENT'
                                }
                            }
                        }
                    ]
                }
            }
        ]

        return policies

    def design_kms_encryption(self) -> List[Dict[str, Any]]:
        """
        Design KMS encryption keys
        """
        keys = [
            {
                'name': 'app-data-key',
                'description': 'Encryption key for application data',
                'key_usage': 'ENCRYPT_DECRYPT',
                'key_spec': 'SYMMETRIC_DEFAULT',
                'rotation_enabled': True,
                'key_policy': {
                    'Version': '2012-10-17',
                    'Statement': [
                        {
                            'Sid': 'Enable IAM User Permissions',
                            'Effect': 'Allow',
                            'Principal': {
                                'AWS': 'arn:aws:iam::ACCOUNT_ID:root'
                            },
                            'Action': 'kms:*',
                            'Resource': '*'
                        },
                        {
                            'Sid': 'Allow Application Access',
                            'Effect': 'Allow',
                            'Principal': {
                                'AWS': 'arn:aws:iam::ACCOUNT_ID:role/app-role'
                            },
                            'Action': [
                                'kms:Encrypt',
                                'kms:Decrypt',
                                'kms:ReEncrypt*',
                                'kms:GenerateDataKey*',
                                'kms:DescribeKey'
                            ],
                            'Resource': '*'
                        }
                    ]
                },
                'tags': {
                    'Purpose': 'ApplicationDataEncryption',
                    'Compliance': 'PCI-DSS',
                    'Rotation': 'Automatic'
                }
            },
            {
                'name': 'database-encryption-key',
                'description': 'Encryption key for RDS databases',
                'key_usage': 'ENCRYPT_DECRYPT',
                'rotation_enabled': True,
                'tags': {'Purpose': 'DatabaseEncryption'}
            }
        ]

        return keys

    def design_s3_security(self) -> List[Dict[str, Any]]:
        """
        Design secure S3 buckets
        """
        buckets = [
            {
                'name': 'app-data-bucket',
                'versioning_enabled': True,
                'encryption': {
                    'sse_algorithm': 'aws:kms',
                    'kms_key_id': 'app-data-key'
                },
                'public_access_block': {
                    'block_public_acls': True,
                    'block_public_policy': True,
                    'ignore_public_acls': True,
                    'restrict_public_buckets': True
                },
                'lifecycle_rules': [
                    {
                        'id': 'delete-old-versions',
                        'enabled': True,
                        'noncurrent_version_expiration_days': 90
                    }
                ],
                'logging': {
                    'target_bucket': 'app-logs-bucket',
                    'target_prefix': 'logs/'
                },
                'replication': {
                    'enabled': True,
                    'destination': 'app-data-bucket-dr',
                    'replication_time': 15 * 60  # 15 minutes
                },
                'tags': {
                    'DataClassification': 'Confidential',
                    'Backup': 'Enabled',
                    'Retention': '7years'
                }
            }
        ]

        return buckets

    def design_cloudtrail(self) -> Dict[str, Any]:
        """
        Design CloudTrail for audit logging
        """
        config = {
            'is_multi_region_trail': True,
            'include_global_service_events': True,
            'enable_log_file_validation': True,
            'cloud_watch_logs_role_arn': 'arn:aws:iam::ACCOUNT_ID:role/CloudTrailRole',
            'cloud_watch_logs_log_group_arn': 'arn:aws:logs:us-east-1:ACCOUNT_ID:log-group:CloudTrail/Logs',
            's3_bucket_name': 'cloudtrail-logs-bucket',
            'kms_key_id': 'cloudtrail-key',
            'is_organization_trail': True,
            'event_selector': [
                {
                    'read_write_type': 'All',
                    'include_management_events': True,
                    'data_resources': [
                        {
                            'type': 'AWS::S3::Object',
                            'values': ['arn:aws:s3:::app-data-bucket/*']
                        },
                        {
                            'type': 'AWS::Lambda::Function',
                            'values': ['arn:aws:lambda:us-east-1:ACCOUNT_ID:function:*']
                        }
                    ]
                }
            ],
            'tags': {
                'Purpose': 'ComplianceAudit',
                'Retention': '7years'
            }
        }

        return config

    def design_guardduty(self) -> Dict[str, Any]:
        """
        Design GuardDuty threat detection
        """
        config = {
            'enable': True,
            'finding_publishing_frequency': 'FIFTEEN_MINUTES',
            'datasources': {
                's3_logs': {
                    'enable': True
                },
                'kubernetes': {
                    'audit_logs': {
                        'enable': True
                    }
                },
                'malware_protection': {
                    'scan_ec2_instance_data_sources': {
                        'ebs_volumes': {
                            'enable': True
                        }
                    }
                }
            },
            'tags': {
                'Purpose': 'ThreatDetection'
            }
        }

        return config

    def generate_security_blueprint(self) -> Dict[str, Any]:
        """
        Generate complete security blueprint
        """
        return {
            'vpc': self.design_vpc_security(),
            'security_groups': self.design_security_groups(),
            'iam_policies': self.design_iam_policies(),
            'kms_keys': self.design_kms_encryption(),
            's3_buckets': self.design_s3_security(),
            'cloudtrail': self.design_cloudtrail(),
            'guardduty': self.design_guardduty(),
            'compliance': {
                'standards': ['PCI-DSS', 'HIPAA', 'SOC2', 'ISO27001'],
                'automated_compliance_checks': True,
                'continuous_monitoring': True
            }
        }

3. Security Monitoring Architecture

SIEM Integration Patterns

# Security Information and Event Management (SIEM) Integration

from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from enum import Enum
import json
import hashlib

class AlertSeverity(Enum):
    """SIEM alert severity levels"""
    INFO = 0
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class EventType(Enum):
    """Security event types"""
    AUTHENTICATION = "authentication"
    AUTHORIZATION = "authorization"
    DATA_ACCESS = "data_access"
    NETWORK_TRAFFIC = "network_traffic"
    SYSTEM_EVENT = "system_event"
    APPLICATION_EVENT = "application_event"
    MALWARE_DETECTION = "malware_detection"
    ANOMALY_DETECTION = "anomaly_detection"

@dataclass
class SecurityEvent:
    """Security event for SIEM"""
    timestamp: datetime
    event_type: EventType
    source: str
    user_id: Optional[str]
    resource: Optional[str]
    details: Dict[str, Any]
    severity: AlertSeverity = AlertSeverity.INFO
    correlation_id: Optional[str] = None

    def to_json(self) -> str:
        """Convert event to JSON for SIEM"""
        return json.dumps({
            'timestamp': self.timestamp.isoformat(),
            'event_type': self.event_type.value,
            'source': self.source,
            'user_id': self.user_id,
            'resource': self.resource,
            'details': self.details,
            'severity': self.severity.name,
            'correlation_id': self.correlation_id
        })

class SIEMIntegration:
    """SIEM integration for security monitoring"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.event_queue: List[SecurityEvent] = []
        self.alert_rules: List[AlertRule] = []
        self.correlation_engine = CorrelationEngine()

    def send_event(self, event: SecurityEvent):
        """
        Send security event to SIEM
        """
        # Add correlation ID if not present
        if not event.correlation_id:
            event.correlation_id = self._generate_correlation_id(event)

        # Add to event queue
        self.event_queue.append(event)

        # Check against alert rules
        self._evaluate_alert_rules(event)

        # Send to SIEM
        self._send_to_siem(event)

    def _generate_correlation_id(self, event: SecurityEvent) -> str:
        """Generate correlation ID for event"""
        data = f"{event.timestamp}_{event.event_type}_{event.source}"
        return hashlib.sha256(data.encode()).hexdigest()[:16]

    def _evaluate_alert_rules(self, event: SecurityEvent):
        """
        Evaluate event against alert rules
        """
        for rule in self.alert_rules:
            if rule.matches(event):
                alert = rule.generate_alert(event)
                self._send_alert(alert)

    def _send_to_siem(self, event: SecurityEvent):
        """Send event to SIEM (Splunk, ELK, Sentinel, etc.)"""
        # Implementation depends on SIEM platform
        pass

    def _send_alert(self, alert: 'SecurityAlert'):
        """Send security alert"""
        # Implementation for alert notification
        pass

class AlertRule:
    """SIEM alert rule"""

    def __init__(self,
                 name: str,
                 conditions: Dict[str, Any],
                 severity: AlertSeverity,
                 action: str):
        self.name = name
        self.conditions = conditions
        self.severity = severity
        self.action = action

    def matches(self, event: SecurityEvent) -> bool:
        """Check if event matches rule conditions"""
        # Check event type
        if 'event_type' in self.conditions:
            if event.event_type != EventType(self.conditions['event_type']):
                return False

        # Check severity threshold
        if 'min_severity' in self.conditions:
            if event.severity.value < self.conditions['min_severity']:
                return False

        # Check field conditions
        if 'fields' in self.conditions:
            for field, expected_value in self.conditions['fields'].items():
                if field in event.details:
                    if event.details[field] != expected_value:
                        return False

        return True

    def generate_alert(self, event: SecurityEvent) -> 'SecurityAlert':
        """Generate alert from matching event"""
        return SecurityAlert(
            rule_name=self.name,
            severity=self.severity,
            event=event,
            action=self.action,
            timestamp=datetime.now()
        )

@dataclass
class SecurityAlert:
    """Security alert generated by SIEM"""
    rule_name: str
    severity: AlertSeverity
    event: SecurityEvent
    action: str
    timestamp: datetime
    status: str = "open"
    incident_id: Optional[str] = None

class CorrelationEngine:
    """Event correlation engine"""

    def __init__(self):
        self.event_history: List[SecurityEvent] = []
        self.correlation_window = timedelta(minutes=15)

    def correlate_events(self, events: List[SecurityEvent]) -> List['CorrelatedEvent']:
        """
        Correlate related security events
        """
        correlated = []

        # Group events by correlation ID
        groups = {}
        for event in events:
            if event.correlation_id not in groups:
                groups[event.correlation_id] = []
            groups[event.correlation_id].append(event)

        # Analyze each group
        for correlation_id, event_group in groups.items():
            if len(event_group) > 1:
                correlated.append(CorrelatedEvent(
                    correlation_id=correlation_id,
                    events=event_group,
                    pattern=self._identify_pattern(event_group),
                    risk_score=self._calculate_risk_score(event_group)
                ))

        return correlated

    def _identify_pattern(self, events: List[SecurityEvent]) -> str:
        """Identify attack pattern in event group"""
        event_types = {e.event_type for e in events}

        # Common attack patterns
        if {EventType.AUTHENTICATION, EventType.DATA_ACCESS}.issubset(event_types):
            return "data_exfiltration_attempt"

        if events[-1].event_type == EventType.MALWARE_DETECTION:
            return "malware_infection"

        if EventType.AUTHENTICATION in event_types:
            failed_auths = [e for e in events if e.details.get('success') is False]
            if len(failed_auths) >= 5:
                return "brute_force_attack"

        return "suspicious_activity"

    def _calculate_risk_score(self, events: List[SecurityEvent]) -> float:
        """Calculate risk score for event group"""
        base_score = sum(e.severity.value for e in events) / len(events)

        # Multiply by number of events
        multiplier = min(len(events), 10) / 10

        return base_score * multiplier

@dataclass
class CorrelatedEvent:
    """Correlated security events"""
    correlation_id: str
    events: List[SecurityEvent]
    pattern: str
    risk_score: float

class ThreatDetection:
    """Threat detection algorithms"""

    def __init__(self, siem: SIEMIntegration):
        self.siem = siem
        self.baseline = {}
        self.anomaly_threshold = 2.0  # Standard deviations

    def detect_anomalies(self, events: List[SecurityEvent]) -> List[SecurityEvent]:
        """
        Detect anomalous security events
        """
        anomalies = []

        for event in events:
            # Check if event deviates from baseline
            if self._is_anomalous(event):
                # Create anomaly detection event
                anomaly_event = SecurityEvent(
                    timestamp=datetime.now(),
                    event_type=EventType.ANOMALY_DETECTION,
                    source="threat_detection_engine",
                    user_id=event.user_id,
                    resource=event.resource,
                    details={
                        'original_event': event.to_json(),
                        'anomaly_type': self._identify_anomaly_type(event)
                    },
                    severity=AlertSeverity.HIGH
                )
                anomalies.append(anomaly_event)

        return anomalies

    def _is_anomalous(self, event: SecurityEvent) -> bool:
        """Check if event is anomalous"""
        # Statistical anomaly detection
        # Implementation depends on baseline statistics
        return False

    def _identify_anomaly_type(self, event: SecurityEvent) -> str:
        """Identify type of anomaly"""
        # Machine learning based anomaly type identification
        return "statistical_anomaly"

4. Threat Modeling

STRIDE Methodology Implementation

# Threat Modeling using STRIDE Methodology

from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum

class STRIDETthreatType(Enum):
    """STRIDE threat categories"""
    SPOOFING = "spoofing"
    TAMPERING = "tampering"
    REPUDIATION = "repudiation"
    INFORMATION_DISCLOSURE = "information_disclosure"
    DENIAL_OF_SERVICE = "denial_of_service"
    ELEVATION_OF_PRIVILEGE = "elevation_of_privilege"

@dataclass
class DataFlow:
    """Data flow diagram element"""
    id: str
    name: str
    type: str  # 'process', 'data_store', 'external_entity', 'data_flow'
    trust_level: str  # 'trusted', 'untrusted', 'semi-trusted'
    assets: List[str]
    controls: List[str]

@dataclass
class Threat:
    """Identified threat"""
    threat_type: STRIDETthreatType
    description: str
    target: str
    likelihood: str  # 'low', 'medium', 'high'
    impact: str  # 'low', 'medium', 'high'
    mitigation: str
    controls: List[str]

class ThreatModel:
    """Threat model for system architecture"""

    def __init__(self, system_name: str):
        self.system_name = system_name
        self.data_flows: List[DataFlow] = []
        self.threats: List[Threat] = []

    def analyze_threats(self) -> List[Threat]:
        """
        Analyze threats using STRIDE methodology
        """
        threats = []

        for data_flow in self.data_flows:
            # Spoofing threats
            if data_flow.type == 'external_entity':
                threats.extend(self._check_spoofing(data_flow))

            # Tampering threats
            if data_flow.type in ['data_flow', 'data_store']:
                threats.extend(self._check_tampering(data_flow))

            # Repudiation threats
            if data_flow.type == 'process':
                threats.extend(self._check_repudiation(data_flow))

            # Information disclosure threats
            if data_flow.type == 'data_flow':
                threats.extend(self._check_information_disclosure(data_flow))

            # DoS threats
            threats.extend(self._check_denial_of_service(data_flow))

            # Elevation of privilege threats
            if data_flow.type == 'process':
                threats.extend(self._check_elevation_of_privilege(data_flow))

        self.threats = threats
        return threats

    def _check_spoofing(self, data_flow: DataFlow) -> List[Threat]:
        """Check for spoofing threats"""
        threats = []

        if data_flow.trust_level != 'trusted':
            threats.append(Threat(
                threat_type=STRIDETthreatType.SPOOFING,
                description=f"Spoofing of {data_flow.name}",
                target=data_flow.id,
                likelihood="high" if data_flow.trust_level == "untrusted" else "medium",
                impact="high",
                mitigation="Implement mutual authentication and certificate validation",
                controls=["mfa", "certificate_pinning", "identity_verification"]
            ))

        return threats

    def _check_tampering(self, data_flow: DataFlow) -> List[Threat]:
        """Check for tampering threats"""
        threats = []

        if 'encryption' not in data_flow.controls:
            threats.append(Threat(
                threat_type=STRIDETthreatType.TAMPERING,
                description=f"Data tampering in {data_flow.name}",
                target=data_flow.id,
                likelihood="medium",
                impact="high",
                mitigation="Implement end-to-end encryption and data integrity checks",
                controls=["encryption", "digital_signatures", "hashing"]
            ))

        return threats

    def _check_repudiation(self, data_flow: DataFlow) -> List[Threat]:
        """Check for repudiation threats"""
        threats = []

        if 'audit_logging' not in data_flow.controls:
            threats.append(Threat(
                threat_type=STRIDETthreatType.REPUDIATION,
                description=f"Non-repudiation not enforced for {data_flow.name}",
                target=data_flow.id,
                likelihood="medium",
                impact="medium",
                mitigation="Implement comprehensive audit logging",
                controls=["audit_logging", "non_repudiation", "digital_signatures"]
            ))

        return threats

    def _check_information_disclosure(self, data_flow: DataFlow) -> List[Threat]:
        """Check for information disclosure threats"""
        threats = []

        if data_flow.trust_level != 'trusted':
            for asset in data_flow.assets:
                if 'sensitive' in asset.lower() or 'pii' in asset.lower():
                    threats.append(Threat(
                        threat_type=STRIDETthreatType.INFORMATION_DISCLOSURE,
                        description=f"Sensitive data disclosure in {data_flow.name}",
                        target=data_flow.id,
                        likelihood="high",
                        impact="high",
                        mitigation="Implement data encryption and access controls",
                        controls=["encryption", "access_controls", "data_masking"]
                    ))

        return threats

    def _check_denial_of_service(self, data_flow: DataFlow) -> List[Threat]:
        """Check for DoS threats"""
        threats = []

        if 'rate_limiting' not in data_flow.controls:
            threats.append(Threat(
                threat_type=STRIDETthreatType.DENIAL_OF_SERVICE,
                description=f"DoS vulnerability in {data_flow.name}",
                target=data_flow.id,
                likelihood="medium",
                impact="high",
                mitigation="Implement rate limiting and throttling",
                controls=["rate_limiting", "throttling", "circuit_breaker"]
            ))

        return threats

    def _check_elevation_of_privilege(self, data_flow: DataFlow) -> List[Threat]:
        """Check for elevation of privilege threats"""
        threats = []

        if 'authorization' not in data_flow.controls:
            threats.append(Threat(
                threat_type=STRIDETthreatType.ELEVATION_OF_PRIVILEGE,
                description=f"Privilege escalation in {data_flow.name}",
                target=data_flow.id,
                likelihood="medium",
                impact="high",
                mitigation="Implement proper authorization and privilege management",
                controls=["authorization", "role_based_access", "principle_of_least_privilege"]
            ))

        return threats

    def generate_threat_report(self) -> Dict[str, Any]:
        """Generate threat model report"""
        threats = self.analyze_threats()

        # Calculate risk scores
        high_risk = [t for t in threats if t.likelihood == "high" and t.impact == "high"]
        medium_risk = [t for t in threats if t.likelihood in ["high", "medium"] and t.impact in ["high", "medium"]]

        return {
            'system': self.system_name,
            'total_threats': len(threats),
            'high_risk_threats': len(high_risk),
            'medium_risk_threats': len(medium_risk),
            'threats_by_type': {
                threat_type.value: len([t for t in threats if t.threat_type == threat_type])
                for threat_type in STRIDETthreatType
            },
            'recommended_controls': self._get_recommended_controls(threats),
            'threats': [
                {
                    'type': t.threat_type.value,
                    'description': t.description,
                    'likelihood': t.likelihood,
                    'impact': t.impact,
                    'mitigation': t.mitigation
                }
                for t in threats
            ]
        }

    def _get_recommended_controls(self, threats: List[Threat]) -> List[str]:
        """Get recommended security controls"""
        controls = set()
        for threat in threats:
            controls.update(threat.controls)
        return list(controls)

5. Compliance Frameworks

GDPR Compliance Implementation

# GDPR Compliance Implementation

from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from enum import Enum

class GDPRRight(Enum):
    """GDPR data subject rights"""
    ACCESS = "right_to_access"
    RECTIFICATION = "right_to_rectification"
    ERASURE = "right_to_erasure"
    RESTRICT = "right_to_restrict_processing"
    PORTABILITY = "right_to_portability"
    OBJECT = "right_to_object"
    AUTOMATED_DECISION = "right_to_not_be_subject_to_automated_decision"

class DataClassification(Enum):
    """Data sensitivity classification"""
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"
    SPECIAL_CATEGORY = "special_category"  # GDPR special category data

@dataclass
class DataSubject:
    """Data subject (individual)"""
    id: str
    personal_data: Dict[str, Any]
    consent_records: List['ConsentRecord']
    access_requests: List['DataAccessRequest']
    data_classification: DataClassification

@dataclass
class ConsentRecord:
    """Record of consent"""
    purpose: str
    granted_at: datetime
    expires_at: Optional[datetime]
    withdrawn_at: Optional[datetime]
    lawful_basis: str  # 'consent', 'contract', 'legal_obligation', etc.

@dataclass
class DataAccessRequest:
    """Data subject access request"""
    request_type: GDPRRight
    requested_at: datetime
    deadline: datetime
    status: str  # 'pending', 'in_progress', 'completed'
    response: Optional[Dict[str, Any]] = None

class GDPRCompliance:
    """GDPR compliance implementation"""

    def __init__(self):
        self.data_subjects: Dict[str, DataSubject] = {}
        self.processing_activities: List[Dict[str, Any]] = []
        self.data_breach_records: List[Dict[str, Any]] = []

    def record_consent(self,
                      subject_id: str,
                      purpose: str,
                      lawful_basis: str,
                      duration_days: Optional[int] = None) -> ConsentRecord:
        """
        Record data subject consent
        """
        consent = ConsentRecord(
            purpose=purpose,
            granted_at=datetime.now(),
            expires_at=datetime.now() + timedelta(days=duration_days) if duration_days else None,
            withdrawn_at=None,
            law_basis=lawful_basis
        )

        # Get or create data subject
        if subject_id not in self.data_subjects:
            self.data_subjects[subject_id] = DataSubject(
                id=subject_id,
                personal_data={},
                consent_records=[],
                access_requests=[],
                data_classification=DataClassification.INTERNAL
            )

        self.data_subjects[subject_id].consent_records.append(consent)
        return consent

    def verify_consent(self,
                      subject_id: str,
                      purpose: str) -> bool:
        """
        Verify if valid consent exists
        """
        if subject_id not in self.data_subjects:
            return False

        subject = self.data_subjects[subject_id]

        # Check for valid consent for purpose
        for consent in subject.consent_records:
            if consent.purpose == purpose:
                # Check if consent is still valid
                if consent.withdrawn_at is not None:
                    return False  # Consent was withdrawn

                if consent.expires_at and datetime.now() > consent.expires_at:
                    return False  # Consent expired

                return True  # Valid consent found

        return False  # No consent found

    def handle_access_request(self,
                            subject_id: str,
                            request_type: GDPRRight) -> DataAccessRequest:
        """
        Handle data subject access request
        """
        # Create access request
        request = DataAccessRequest(
            request_type=request_type,
            requested_at=datetime.now(),
            deadline=datetime.now() + timedelta(days=30),  # GDPR requires response within 30 days
            status='pending'
        )

        # Get data subject
        if subject_id in self.data_subjects:
            subject = self.data_subjects[subject_id]

            if request_type == GDPRRight.ACCESS:
                # Provide copy of all personal data
                request.response = {
                    'personal_data': subject.personal_data,
                    'consent_records': [
                        {
                            'purpose': c.purpose,
                            'granted_at': c.granted_at.isoformat(),
                            'lawful_basis': c.lawful_basis
                        }
                        for c in subject.consent_records
                    ],
                    'processing_activities': [
                        activity
                        for activity in self.processing_activities
                        if activity['subject_id'] == subject_id
                    ]
                }

            elif request_type == GDPRRight.ERASURE:
                # Right to be forgotten
                if self._can_erase_data(subject):
                    subject.personal_data = {}
                    subject.consent_records = []
                    request.response = {'status': 'data_erased'}

            elif request_type == GDPRRight.PORTABILITY:
                # Right to data portability
                request.response = {
                    'personal_data': subject.personal_data,
                    'format': 'machine_readable_format'
                }

            request.status = 'completed'
            subject.access_requests.append(request)

        return request

    def _can_erase_data(self, subject: DataSubject) -> bool:
        """Check if data can be erased"""
        # Cannot erase if required for legal obligation
        # Cannot erase if required for performance of contract
        # Implementation depends on specific circumstances
        return True

    def record_data_breach(self,
                          breach_details: Dict[str, Any]) -> str:
        """
        Record data breach for GDPR notification requirements
        """
        breach_record = {
            'breach_id': self._generate_breach_id(),
            'detected_at': datetime.now().isoformat(),
            'description': breach_details.get('description'),
            'affected_subjects': breach_details.get('affected_subjects', []),
            'data_types': breach_details.get('data_types', []),
            'severity': breach_details.get('severity', 'medium'),
            'containment_actions': breach_details.get('containment_actions', []),
            'notification_to_authority': False,
            'notification_to_subjects': False
        }

        self.data_breach_records.append(breach_record)

        # Check if breach needs to be reported to authorities (within 72 hours)
        if breach_details.get('risk_to_individuals') == 'high':
            self._notify_data_protection_authority(breach_record)

        # Check if breach needs to be reported to data subjects
        if breach_details.get('high_risk_to_rights_freedoms'):
            self._notify_data_subjects(breach_record)

        return breach_record['breach_id']

    def _notify_data_protection_authority(self, breach: Dict[str, Any]):
        """Notify data protection authority of breach"""
        # Implementation for notifying authority within 72 hours
        breach['notification_to_authority'] = True
        breach['authority_notified_at'] = datetime.now().isoformat()

    def _notify_data_subjects(self, breach: Dict[str, Any]):
        """Notify data subjects of breach"""
        # Implementation for notifying affected individuals
        breach['notification_to_subjects'] = True
        breach['subjects_notified_at'] = datetime.now().isoformat()

    def _generate_breach_id(self) -> str:
        """Generate breach ID"""
        return f"breach_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

    def generate_compliance_report(self) -> Dict[str, Any]:
        """Generate GDPR compliance report"""
        return {
            'total_data_subjects': len(self.data_subjects),
            'active_consent_records': sum(
                len([c for c in s.consent_records if c.withdrawn_at is None])
                for s in self.data_subjects.values()
            ),
            'pending_access_requests': sum(
                len([r for r in s.access_requests if r.status == 'pending'])
                for s in self.data_subjects.values()
            ),
            'data_breaches': len(self.data_breach_records),
            'processing_activities': len(self.processing_activities),
            'compliance_status': self._check_compliance_status()
        }

    def _check_compliance_status(self) -> Dict[str, Any]:
        """Check GDPR compliance status"""
        return {
            'consent_management': 'compliant',
            'data_subject_rights': 'compliant',
            'data_breach_notification': 'compliant',
            'data_protection_by_design': 'compliant',
            'data_protection_officer': 'appointed',
            'records_of_processing_activities': 'maintained'
        }

6. Decision Trees

Security Control Selection

Threat type?
│
├─ Spoofing → Authentication, Identity verification
├─ Tampering → Encryption, Digital signatures, Integrity checks
├─ Repudiation → Audit logging, Non-repudiation controls
├─ Information disclosure → Encryption, Access controls, DLP
├─ Denial of service → Rate limiting, Redundancy, DDoS protection
└─ Elevation of privilege → Authorization, Least privilege, RBAC

Compliance Framework Selection

Data type and location?
│
├─ EU personal data → GDPR
├─ US healthcare data → HIPAA
├─ Payment card data → PCI-DSS
├─ Financial data → SOX, Basel III
├─ Federal systems → NIST 800-53
└─ Global operations → ISO 27001

7. Anti-Patterns to Avoid

  1. Trust by default: Never trust, always verify
  2. Hard-coded secrets: Never hard-code credentials
  3. Over-permissive IAM: Always follow least privilege
  4. Missing monitoring: You can't secure what you can't see
  5. Ignoring compliance: Compliance is a baseline, not a ceiling
  6. Security theater: Focus on real threats, not just checkboxes
  7. Neglecting supply chain: Secure your entire supply chain
  8. No incident response: Have a plan before you need it
  9. Secrets in code: Never commit secrets to repositories
  10. Assuming internal safety: Zero trust applies everywhere

8. Quality Checklist

Before considering security architecture complete:

  • Threat modeling completed
  • Security controls implemented for all threats
  • Data classification implemented
  • Encryption at rest and in transit
  • Identity and access management configured
  • Security monitoring and logging implemented
  • Incident response plan created
  • Compliance requirements addressed
  • Security testing performed
  • Penetration testing completed
  • Security documentation complete
  • Secure development practices implemented
  • Third-party security assessed
  • Business continuity plan tested
  • Security awareness training completed
  • Vulnerability management program active
  • Regular security reviews scheduled
  • Data retention policies defined
  • Privacy by design implemented
  • Security metrics and KPIs defined

This comprehensive skill definition provides complete guidance for security architecture across modern systems.