Massive training corpus for AI coding models containing: - 10 JSONL training datasets (641+ examples across coding, reasoning, planning, architecture, communication, debugging, security, workflows, error handling, UI/UX) - 11 agent behavior specifications (explorer, planner, reviewer, debugger, executor, UI designer, Linux admin, kernel engineer, security architect, automation engineer, API architect) - 6 skill definition files (coding, API engineering, kernel, Linux server, security architecture, server automation, UI/UX) - Master README with project origin story and philosophy Built by Pony Alpha 2 to help AI models learn expert-level coding approaches.
55 KiB
55 KiB
Security Architecture Expert Skill
Activation Criteria
Activate this skill when the user:
- Designs secure system architectures
- Implements zero trust networks
- Configures cloud security (AWS/Azure/GCP)
- Designs application security programs
- Creates IAM/authorization systems
- Implements security monitoring (SIEM, SOAR)
- Develops incident response procedures
- Needs compliance frameworks (PCI-DSS, HIPAA, GDPR, NIST, SOC 2)
- Performs threat modeling
- Designs supply chain security
- Implements DevSecOps practices
- Creates security policies and standards
- Designs secure APIs and microservices
- Implements data protection and encryption strategies
Core Methodology
1. Zero Trust Architecture
Zero Trust Principles
# Zero Trust Architecture Framework
zero_trust_principles:
verification_required: "Never trust, always verify"
least_privilege: "Minimum required access only"
assume_breach: "Design assuming compromise"
microsegmentation: "Segment everything"
continuous_monitoring: "Monitor and validate continuously"
# Zero Trust Implementation Pillars
identity:
- Multi-factor authentication (MFA)
- Risk-based authentication
- Single sign-on (SSO) consolidation
- Privileged access management (PAM)
- Just-in-time access provisioning
devices:
- Device health attestation
- OS and application patching
- Endpoint detection and response (EDR)
- Mobile device management (MDM)
- Device inventory and classification
network:
- Microsegmentation
- Software-defined perimeter (SDP)
- Next-generation firewalls (NGFW)
- Zero trust network access (ZTNA)
- East-west traffic inspection
applications:
- Application security testing
- API security and gateways
- Container and Kubernetes security
- Serverless function security
- Runtime application self-protection (RASP)
data:
- Data classification and labeling
- Encryption at rest and in transit
- Data loss prevention (DLP)
- Rights management (IRM/DRM)
- Database activity monitoring
Zero Trust Network Implementation
# Zero Trust Network Access (ZTNA) Implementation
# Policy Engine for Access Control
from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional, Set
from datetime import datetime, timedelta
import hashlib
import jwt
import ipaddress
class TrustLevel(Enum):
"""Trust levels for zero trust decisions"""
NONE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class ResourceType(Enum):
"""Types of protected resources"""
API = "api"
DATABASE = "database"
STORAGE = "storage"
SERVICE = "service"
FILE = "file"
@dataclass
class Subject:
"""Requesting entity (user, service, device)"""
id: str
type: str # 'user', 'service', 'device'
attributes: Dict
trust_level: TrustLevel = TrustLevel.NONE
@dataclass
class Resource:
"""Protected resource"""
id: str
type: ResourceType
sensitivity: str # 'public', 'internal', 'confidential', 'restricted'
owner: str
tags: Set[str]
@dataclass
class Context:
"""Request context for decision making"""
source_ip: str
destination_ip: str
timestamp: datetime
user_agent: Optional[str] = None
device_health_score: Optional[float] = None
geo_location: Optional[str] = None
network_trust: Optional[float] = None
class PolicyEngine:
"""Zero Trust Policy Engine"""
def __init__(self):
self.policies: List[Policy] = []
self.trust_scores: Dict[str, float] = {}
def evaluate_access(self,
subject: Subject,
resource: Resource,
action: str,
context: Context) -> AccessDecision:
"""
Evaluate access request using zero trust principles
"""
# Step 1: Verify identity
if not self._verify_identity(subject):
return AccessDecision(
allowed=False,
reason="Identity verification failed"
)
# Step 2: Check device health
if not self._check_device_health(context):
return AccessDecision(
allowed=False,
reason="Device does not meet health requirements"
)
# Step 3: Evaluate policies
for policy in self.policies:
if policy.matches(subject, resource, action, context):
decision = policy.evaluate(subject, resource, action, context)
if decision is not None:
return decision
# Default deny
return AccessDecision(
allowed=False,
reason="No matching policy - default deny"
)
def _verify_identity(self, subject: Subject) -> bool:
"""Verify subject identity with MFA"""
# Check MFA status
if not subject.attributes.get('mfa_verified', False):
return False
# Check if identity is not expired
if 'expiry' in subject.attributes:
expiry = subject.attributes['expiry']
if datetime.now() > expiry:
return False
return True
def _check_device_health(self, context: Context) -> bool:
"""Check device health score"""
if context.device_health_score is None:
return False
return context.device_health_score >= 0.7
class Policy:
"""Access control policy"""
def __init__(self,
name: str,
conditions: Dict,
effect: str):
self.name = name
self.conditions = conditions
self.effect = effect # 'allow' or 'deny'
def matches(self,
subject: Subject,
resource: Resource,
action: str,
context: Context) -> bool:
"""Check if policy matches the request"""
# Check subject conditions
if 'subject' in self.conditions:
for key, value in self.conditions['subject'].items():
if subject.attributes.get(key) != value:
return False
# Check resource conditions
if 'resource' in self.conditions:
if resource.type.value != self.conditions['resource'].get('type'):
return False
if resource.sensitivity != self.conditions['resource'].get('sensitivity'):
return False
# Check action conditions
if 'action' in self.conditions:
if action not in self.conditions['action']:
return False
# Check context conditions
if 'context' in self.conditions:
ctx_conditions = self.conditions['context']
# Check IP range
if 'allowed_ips' in ctx_conditions:
ip = ipaddress.ip_address(context.source_ip)
allowed = False
for allowed_range in ctx_conditions['allowed_ips']:
network = ipaddress.ip_network(allowed_range)
if ip in network:
allowed = True
break
if not allowed:
return False
# Check time restrictions
if 'time_restrictions' in ctx_conditions:
time_restriction = ctx_conditions['time_restrictions']
current_hour = context.timestamp.hour
if 'working_hours' in time_restriction:
start, end = time_restriction['working_hours']
if not (start <= current_hour < end):
return False
return True
def evaluate(self,
subject: Subject,
resource: Resource,
action: str,
context: Context) -> Optional[AccessDecision]:
"""Evaluate the policy"""
if self.effect == 'allow':
return AccessDecision(
allowed=True,
reason=f"Allowed by policy: {self.name}"
)
else:
return AccessDecision(
allowed=False,
reason=f"Denied by policy: {self.name}"
)
@dataclass
class AccessDecision:
"""Access decision with audit trail"""
allowed: bool
reason: str
expires_at: Optional[datetime] = None
additional_conditions: Optional[List[str]] = None
class ZeroTrustGateway:
"""Zero Trust Network Access Gateway"""
def __init__(self, policy_engine: PolicyEngine):
self.policy_engine = policy_engine
self.session_manager = SessionManager()
def handle_request(self,
subject: Subject,
resource: Resource,
action: str,
context: Context) -> GatewayResponse:
"""
Handle zero trust access request
"""
# Evaluate access
decision = self.policy_engine.evaluate_access(
subject, resource, action, context
)
if decision.allowed:
# Create session
session = self.session_manager.create_session(
subject, resource, context
)
return GatewayResponse(
success=True,
session_token=session.token,
expires_at=session.expires_at,
decision=decision
)
else:
return GatewayResponse(
success=False,
decision=decision
)
class SessionManager:
"""Manage zero trust sessions"""
def __init__(self):
self.sessions: Dict[str, Session] = {}
def create_session(self,
subject: Subject,
resource: Resource,
context: Context) -> 'Session':
"""Create a new session"""
token = self._generate_token(subject, resource, context)
expires_at = datetime.now() + timedelta(hours=1)
session = Session(
token=token,
subject_id=subject.id,
resource_id=resource.id,
created_at=datetime.now(),
expires_at=expires_at,
context=context
)
self.sessions[token] = session
return session
def validate_session(self, token: str) -> bool:
"""Validate session token"""
session = self.sessions.get(token)
if not session:
return False
if datetime.now() > session.expires_at:
del self.sessions[token]
return False
return True
def _generate_token(self,
subject: Subject,
resource: Resource,
context: Context) -> str:
"""Generate session token"""
payload = {
'subject_id': subject.id,
'resource_id': resource.id,
'timestamp': datetime.now().isoformat(),
'ip': context.source_ip
}
token = jwt.encode(payload, 'secret', algorithm='HS256')
return token
@dataclass
class Session:
"""Zero trust session"""
token: str
subject_id: str
resource_id: str
created_at: datetime
expires_at: datetime
context: Context
@dataclass
class GatewayResponse:
"""Gateway response"""
success: bool
session_token: Optional[str] = None
expires_at: Optional[datetime] = None
decision: Optional[AccessDecision] = None
2. Cloud Security Architecture
AWS Security Implementation
# AWS Security Infrastructure as Code
# Using Terraform-style Python for AWS security setup
import json
from typing import List, Dict, Any
class AWSSecurityArchitecture:
"""AWS security infrastructure configuration"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.security_groups = []
self.iam_policies = []
self.kms_keys = []
self.s3_buckets = []
self.cloudtrail_config = None
self.guardduty_config = None
def design_vpc_security(self) -> Dict[str, Any]:
"""
Design secure VPC architecture
"""
vpc_config = {
'vpc': {
'cidr_block': '10.0.0.0/16',
'enable_dns_support': True,
'enable_dns_hostnames': True,
'tags': {
'Name': 'production-vpc',
'Environment': 'production',
'Compliance': 'PCI-DSS'
}
},
'subnets': {
'public': [
{
'cidr': '10.0.1.0/24',
'availability_zone': 'us-east-1a',
'map_public_ip_on_launch': True
},
{
'cidr': '10.0.2.0/24',
'availability_zone': 'us-east-1b',
'map_public_ip_on_launch': True
}
],
'private': [
{
'cidr': '10.0.10.0/24',
'availability_zone': 'us-east-1a'
},
{
'cidr': '10.0.11.0/24',
'availability_zone': 'us-east-1b'
}
],
'database': [
{
'cidr': '10.0.20.0/24',
'availability_zone': 'us-east-1a',
'isolated': True
},
{
'cidr': '10.0.21.0/24',
'availability_zone': 'us-east-1b',
'isolated': True
}
]
},
'flow_logs': {
'enabled': True,
'traffic_type': 'ALL',
'destination': 'cloudwatch-logs'
},
'network_acls': {
'public': {
'ingress': [
{'rule_number': 100, 'protocol': 'tcp', 'action': 'allow',
'from_port': 443, 'to_port': 443, 'cidr': '0.0.0.0/0'},
{'rule_number': 110, 'protocol': 'tcp', 'action': 'allow',
'from_port': 80, 'to_port': 80, 'cidr': '0.0.0.0/0'}
],
'egress': [
{'rule_number': 100, 'protocol': '-1', 'action': 'allow',
'cidr': '0.0.0.0/0'}
]
}
}
}
return vpc_config
def design_security_groups(self) -> List[Dict[str, Any]]:
"""
Design security groups with least privilege
"""
security_groups = [
{
'name': 'web-tier-sg',
'description': 'Security group for web tier',
'ingress': [
{
'protocol': 'tcp',
'from_port': 443,
'to_port': 443,
'cidr_blocks': ['0.0.0.0/0'],
'description': 'HTTPS from anywhere'
},
{
'protocol': 'tcp',
'from_port': 80,
'to_port': 80,
'cidr_blocks': ['0.0.0.0/0'],
'description': 'HTTP from anywhere (redirect to HTTPS)'
}
],
'egress': [
{
'protocol': '-1',
'cidr_blocks': ['0.0.0.0/0'],
'description': 'All outbound traffic'
}
],
'tags': {'Tier': 'Web', 'Compliance': 'PCI-DSS'}
},
{
'name': 'application-tier-sg',
'description': 'Security group for application tier',
'ingress': [
{
'protocol': 'tcp',
'from_port': 8080,
'to_port': 8080,
'security_groups': ['web-tier-sg'],
'description': 'Application port from web tier'
},
{
'protocol': 'tcp',
'from_port': 22,
'to_port': 22,
'security_groups': ['bastion-sg'],
'description': 'SSH from bastion only'
}
],
'egress': [
{
'protocol': 'tcp',
'from_port': 5432,
'to_port': 5432,
'security_groups': ['database-tier-sg'],
'description': 'Database access'
},
{
'protocol': 'tcp',
'from_port': 443,
'to_port': 443,
'cidr_blocks': ['0.0.0.0/0'],
'description': 'HTTPS outbound for APIs'
}
],
'tags': {'Tier': 'Application'}
},
{
'name': 'database-tier-sg',
'description': 'Security group for database tier',
'ingress': [
{
'protocol': 'tcp',
'from_port': 5432,
'to_port': 5432,
'security_groups': ['application-tier-sg'],
'description': 'PostgreSQL from application tier'
}
],
'egress': [],
'tags': {'Tier': 'Database', 'DataClassification': 'Confidential'}
}
]
return security_groups
def design_iam_policies(self) -> List[Dict[str, Any]]:
"""
Design IAM policies following least privilege
"""
policies = [
{
'name': 'EC2ReadOnlyPolicy',
'description': 'Read-only access to EC2 resources',
'policy': {
'Version': '2012-10-17',
'Statement': [
{
'Effect': 'Allow',
'Action': [
'ec2:Describe*',
'ec2:Get*'
],
'Resource': '*'
}
]
}
},
{
'name': 'S3AppDataPolicy',
'description': 'Application-specific S3 access',
'policy': {
'Version': '2012-10-17',
'Statement': [
{
'Effect': 'Allow',
'Action': [
's3:GetObject',
's3:PutObject',
's3:DeleteObject'
],
'Resource': 'arn:aws:s3:::app-data-bucket/*',
'Condition': {
'IpAddress': {
'aws:SourceIp': ['10.0.0.0/16']
}
}
},
{
'Effect': 'Allow',
'Action': 's3:ListBucket',
'Resource': 'arn:aws:s3:::app-data-bucket'
}
]
}
},
{
'name': 'SecretsManagerAppPolicy',
'description': 'Access to application secrets',
'policy': {
'Version': '2012-10-17',
'Statement': [
{
'Effect': 'Allow',
'Action': [
'secretsmanager:GetSecretValue',
'secretsmanager:DescribeSecret'
],
'Resource': [
'arn:aws:secretsmanager:us-east-1:*:secret:app/*'
],
'Condition': {
'StringEquals': {
'secretsmanager:VersionStage': 'AWSCURRENT'
}
}
}
]
}
}
]
return policies
def design_kms_encryption(self) -> List[Dict[str, Any]]:
"""
Design KMS encryption keys
"""
keys = [
{
'name': 'app-data-key',
'description': 'Encryption key for application data',
'key_usage': 'ENCRYPT_DECRYPT',
'key_spec': 'SYMMETRIC_DEFAULT',
'rotation_enabled': True,
'key_policy': {
'Version': '2012-10-17',
'Statement': [
{
'Sid': 'Enable IAM User Permissions',
'Effect': 'Allow',
'Principal': {
'AWS': 'arn:aws:iam::ACCOUNT_ID:root'
},
'Action': 'kms:*',
'Resource': '*'
},
{
'Sid': 'Allow Application Access',
'Effect': 'Allow',
'Principal': {
'AWS': 'arn:aws:iam::ACCOUNT_ID:role/app-role'
},
'Action': [
'kms:Encrypt',
'kms:Decrypt',
'kms:ReEncrypt*',
'kms:GenerateDataKey*',
'kms:DescribeKey'
],
'Resource': '*'
}
]
},
'tags': {
'Purpose': 'ApplicationDataEncryption',
'Compliance': 'PCI-DSS',
'Rotation': 'Automatic'
}
},
{
'name': 'database-encryption-key',
'description': 'Encryption key for RDS databases',
'key_usage': 'ENCRYPT_DECRYPT',
'rotation_enabled': True,
'tags': {'Purpose': 'DatabaseEncryption'}
}
]
return keys
def design_s3_security(self) -> List[Dict[str, Any]]:
"""
Design secure S3 buckets
"""
buckets = [
{
'name': 'app-data-bucket',
'versioning_enabled': True,
'encryption': {
'sse_algorithm': 'aws:kms',
'kms_key_id': 'app-data-key'
},
'public_access_block': {
'block_public_acls': True,
'block_public_policy': True,
'ignore_public_acls': True,
'restrict_public_buckets': True
},
'lifecycle_rules': [
{
'id': 'delete-old-versions',
'enabled': True,
'noncurrent_version_expiration_days': 90
}
],
'logging': {
'target_bucket': 'app-logs-bucket',
'target_prefix': 'logs/'
},
'replication': {
'enabled': True,
'destination': 'app-data-bucket-dr',
'replication_time': 15 * 60 # 15 minutes
},
'tags': {
'DataClassification': 'Confidential',
'Backup': 'Enabled',
'Retention': '7years'
}
}
]
return buckets
def design_cloudtrail(self) -> Dict[str, Any]:
"""
Design CloudTrail for audit logging
"""
config = {
'is_multi_region_trail': True,
'include_global_service_events': True,
'enable_log_file_validation': True,
'cloud_watch_logs_role_arn': 'arn:aws:iam::ACCOUNT_ID:role/CloudTrailRole',
'cloud_watch_logs_log_group_arn': 'arn:aws:logs:us-east-1:ACCOUNT_ID:log-group:CloudTrail/Logs',
's3_bucket_name': 'cloudtrail-logs-bucket',
'kms_key_id': 'cloudtrail-key',
'is_organization_trail': True,
'event_selector': [
{
'read_write_type': 'All',
'include_management_events': True,
'data_resources': [
{
'type': 'AWS::S3::Object',
'values': ['arn:aws:s3:::app-data-bucket/*']
},
{
'type': 'AWS::Lambda::Function',
'values': ['arn:aws:lambda:us-east-1:ACCOUNT_ID:function:*']
}
]
}
],
'tags': {
'Purpose': 'ComplianceAudit',
'Retention': '7years'
}
}
return config
def design_guardduty(self) -> Dict[str, Any]:
"""
Design GuardDuty threat detection
"""
config = {
'enable': True,
'finding_publishing_frequency': 'FIFTEEN_MINUTES',
'datasources': {
's3_logs': {
'enable': True
},
'kubernetes': {
'audit_logs': {
'enable': True
}
},
'malware_protection': {
'scan_ec2_instance_data_sources': {
'ebs_volumes': {
'enable': True
}
}
}
},
'tags': {
'Purpose': 'ThreatDetection'
}
}
return config
def generate_security_blueprint(self) -> Dict[str, Any]:
"""
Generate complete security blueprint
"""
return {
'vpc': self.design_vpc_security(),
'security_groups': self.design_security_groups(),
'iam_policies': self.design_iam_policies(),
'kms_keys': self.design_kms_encryption(),
's3_buckets': self.design_s3_security(),
'cloudtrail': self.design_cloudtrail(),
'guardduty': self.design_guardduty(),
'compliance': {
'standards': ['PCI-DSS', 'HIPAA', 'SOC2', 'ISO27001'],
'automated_compliance_checks': True,
'continuous_monitoring': True
}
}
3. Security Monitoring Architecture
SIEM Integration Patterns
# Security Information and Event Management (SIEM) Integration
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from enum import Enum
import json
import hashlib
class AlertSeverity(Enum):
"""SIEM alert severity levels"""
INFO = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class EventType(Enum):
"""Security event types"""
AUTHENTICATION = "authentication"
AUTHORIZATION = "authorization"
DATA_ACCESS = "data_access"
NETWORK_TRAFFIC = "network_traffic"
SYSTEM_EVENT = "system_event"
APPLICATION_EVENT = "application_event"
MALWARE_DETECTION = "malware_detection"
ANOMALY_DETECTION = "anomaly_detection"
@dataclass
class SecurityEvent:
"""Security event for SIEM"""
timestamp: datetime
event_type: EventType
source: str
user_id: Optional[str]
resource: Optional[str]
details: Dict[str, Any]
severity: AlertSeverity = AlertSeverity.INFO
correlation_id: Optional[str] = None
def to_json(self) -> str:
"""Convert event to JSON for SIEM"""
return json.dumps({
'timestamp': self.timestamp.isoformat(),
'event_type': self.event_type.value,
'source': self.source,
'user_id': self.user_id,
'resource': self.resource,
'details': self.details,
'severity': self.severity.name,
'correlation_id': self.correlation_id
})
class SIEMIntegration:
"""SIEM integration for security monitoring"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.event_queue: List[SecurityEvent] = []
self.alert_rules: List[AlertRule] = []
self.correlation_engine = CorrelationEngine()
def send_event(self, event: SecurityEvent):
"""
Send security event to SIEM
"""
# Add correlation ID if not present
if not event.correlation_id:
event.correlation_id = self._generate_correlation_id(event)
# Add to event queue
self.event_queue.append(event)
# Check against alert rules
self._evaluate_alert_rules(event)
# Send to SIEM
self._send_to_siem(event)
def _generate_correlation_id(self, event: SecurityEvent) -> str:
"""Generate correlation ID for event"""
data = f"{event.timestamp}_{event.event_type}_{event.source}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def _evaluate_alert_rules(self, event: SecurityEvent):
"""
Evaluate event against alert rules
"""
for rule in self.alert_rules:
if rule.matches(event):
alert = rule.generate_alert(event)
self._send_alert(alert)
def _send_to_siem(self, event: SecurityEvent):
"""Send event to SIEM (Splunk, ELK, Sentinel, etc.)"""
# Implementation depends on SIEM platform
pass
def _send_alert(self, alert: 'SecurityAlert'):
"""Send security alert"""
# Implementation for alert notification
pass
class AlertRule:
"""SIEM alert rule"""
def __init__(self,
name: str,
conditions: Dict[str, Any],
severity: AlertSeverity,
action: str):
self.name = name
self.conditions = conditions
self.severity = severity
self.action = action
def matches(self, event: SecurityEvent) -> bool:
"""Check if event matches rule conditions"""
# Check event type
if 'event_type' in self.conditions:
if event.event_type != EventType(self.conditions['event_type']):
return False
# Check severity threshold
if 'min_severity' in self.conditions:
if event.severity.value < self.conditions['min_severity']:
return False
# Check field conditions
if 'fields' in self.conditions:
for field, expected_value in self.conditions['fields'].items():
if field in event.details:
if event.details[field] != expected_value:
return False
return True
def generate_alert(self, event: SecurityEvent) -> 'SecurityAlert':
"""Generate alert from matching event"""
return SecurityAlert(
rule_name=self.name,
severity=self.severity,
event=event,
action=self.action,
timestamp=datetime.now()
)
@dataclass
class SecurityAlert:
"""Security alert generated by SIEM"""
rule_name: str
severity: AlertSeverity
event: SecurityEvent
action: str
timestamp: datetime
status: str = "open"
incident_id: Optional[str] = None
class CorrelationEngine:
"""Event correlation engine"""
def __init__(self):
self.event_history: List[SecurityEvent] = []
self.correlation_window = timedelta(minutes=15)
def correlate_events(self, events: List[SecurityEvent]) -> List['CorrelatedEvent']:
"""
Correlate related security events
"""
correlated = []
# Group events by correlation ID
groups = {}
for event in events:
if event.correlation_id not in groups:
groups[event.correlation_id] = []
groups[event.correlation_id].append(event)
# Analyze each group
for correlation_id, event_group in groups.items():
if len(event_group) > 1:
correlated.append(CorrelatedEvent(
correlation_id=correlation_id,
events=event_group,
pattern=self._identify_pattern(event_group),
risk_score=self._calculate_risk_score(event_group)
))
return correlated
def _identify_pattern(self, events: List[SecurityEvent]) -> str:
"""Identify attack pattern in event group"""
event_types = {e.event_type for e in events}
# Common attack patterns
if {EventType.AUTHENTICATION, EventType.DATA_ACCESS}.issubset(event_types):
return "data_exfiltration_attempt"
if events[-1].event_type == EventType.MALWARE_DETECTION:
return "malware_infection"
if EventType.AUTHENTICATION in event_types:
failed_auths = [e for e in events if e.details.get('success') is False]
if len(failed_auths) >= 5:
return "brute_force_attack"
return "suspicious_activity"
def _calculate_risk_score(self, events: List[SecurityEvent]) -> float:
"""Calculate risk score for event group"""
base_score = sum(e.severity.value for e in events) / len(events)
# Multiply by number of events
multiplier = min(len(events), 10) / 10
return base_score * multiplier
@dataclass
class CorrelatedEvent:
"""Correlated security events"""
correlation_id: str
events: List[SecurityEvent]
pattern: str
risk_score: float
class ThreatDetection:
"""Threat detection algorithms"""
def __init__(self, siem: SIEMIntegration):
self.siem = siem
self.baseline = {}
self.anomaly_threshold = 2.0 # Standard deviations
def detect_anomalies(self, events: List[SecurityEvent]) -> List[SecurityEvent]:
"""
Detect anomalous security events
"""
anomalies = []
for event in events:
# Check if event deviates from baseline
if self._is_anomalous(event):
# Create anomaly detection event
anomaly_event = SecurityEvent(
timestamp=datetime.now(),
event_type=EventType.ANOMALY_DETECTION,
source="threat_detection_engine",
user_id=event.user_id,
resource=event.resource,
details={
'original_event': event.to_json(),
'anomaly_type': self._identify_anomaly_type(event)
},
severity=AlertSeverity.HIGH
)
anomalies.append(anomaly_event)
return anomalies
def _is_anomalous(self, event: SecurityEvent) -> bool:
"""Check if event is anomalous"""
# Statistical anomaly detection
# Implementation depends on baseline statistics
return False
def _identify_anomaly_type(self, event: SecurityEvent) -> str:
"""Identify type of anomaly"""
# Machine learning based anomaly type identification
return "statistical_anomaly"
4. Threat Modeling
STRIDE Methodology Implementation
# Threat Modeling using STRIDE Methodology
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
class STRIDETthreatType(Enum):
"""STRIDE threat categories"""
SPOOFING = "spoofing"
TAMPERING = "tampering"
REPUDIATION = "repudiation"
INFORMATION_DISCLOSURE = "information_disclosure"
DENIAL_OF_SERVICE = "denial_of_service"
ELEVATION_OF_PRIVILEGE = "elevation_of_privilege"
@dataclass
class DataFlow:
"""Data flow diagram element"""
id: str
name: str
type: str # 'process', 'data_store', 'external_entity', 'data_flow'
trust_level: str # 'trusted', 'untrusted', 'semi-trusted'
assets: List[str]
controls: List[str]
@dataclass
class Threat:
"""Identified threat"""
threat_type: STRIDETthreatType
description: str
target: str
likelihood: str # 'low', 'medium', 'high'
impact: str # 'low', 'medium', 'high'
mitigation: str
controls: List[str]
class ThreatModel:
"""Threat model for system architecture"""
def __init__(self, system_name: str):
self.system_name = system_name
self.data_flows: List[DataFlow] = []
self.threats: List[Threat] = []
def analyze_threats(self) -> List[Threat]:
"""
Analyze threats using STRIDE methodology
"""
threats = []
for data_flow in self.data_flows:
# Spoofing threats
if data_flow.type == 'external_entity':
threats.extend(self._check_spoofing(data_flow))
# Tampering threats
if data_flow.type in ['data_flow', 'data_store']:
threats.extend(self._check_tampering(data_flow))
# Repudiation threats
if data_flow.type == 'process':
threats.extend(self._check_repudiation(data_flow))
# Information disclosure threats
if data_flow.type == 'data_flow':
threats.extend(self._check_information_disclosure(data_flow))
# DoS threats
threats.extend(self._check_denial_of_service(data_flow))
# Elevation of privilege threats
if data_flow.type == 'process':
threats.extend(self._check_elevation_of_privilege(data_flow))
self.threats = threats
return threats
def _check_spoofing(self, data_flow: DataFlow) -> List[Threat]:
"""Check for spoofing threats"""
threats = []
if data_flow.trust_level != 'trusted':
threats.append(Threat(
threat_type=STRIDETthreatType.SPOOFING,
description=f"Spoofing of {data_flow.name}",
target=data_flow.id,
likelihood="high" if data_flow.trust_level == "untrusted" else "medium",
impact="high",
mitigation="Implement mutual authentication and certificate validation",
controls=["mfa", "certificate_pinning", "identity_verification"]
))
return threats
def _check_tampering(self, data_flow: DataFlow) -> List[Threat]:
"""Check for tampering threats"""
threats = []
if 'encryption' not in data_flow.controls:
threats.append(Threat(
threat_type=STRIDETthreatType.TAMPERING,
description=f"Data tampering in {data_flow.name}",
target=data_flow.id,
likelihood="medium",
impact="high",
mitigation="Implement end-to-end encryption and data integrity checks",
controls=["encryption", "digital_signatures", "hashing"]
))
return threats
def _check_repudiation(self, data_flow: DataFlow) -> List[Threat]:
"""Check for repudiation threats"""
threats = []
if 'audit_logging' not in data_flow.controls:
threats.append(Threat(
threat_type=STRIDETthreatType.REPUDIATION,
description=f"Non-repudiation not enforced for {data_flow.name}",
target=data_flow.id,
likelihood="medium",
impact="medium",
mitigation="Implement comprehensive audit logging",
controls=["audit_logging", "non_repudiation", "digital_signatures"]
))
return threats
def _check_information_disclosure(self, data_flow: DataFlow) -> List[Threat]:
"""Check for information disclosure threats"""
threats = []
if data_flow.trust_level != 'trusted':
for asset in data_flow.assets:
if 'sensitive' in asset.lower() or 'pii' in asset.lower():
threats.append(Threat(
threat_type=STRIDETthreatType.INFORMATION_DISCLOSURE,
description=f"Sensitive data disclosure in {data_flow.name}",
target=data_flow.id,
likelihood="high",
impact="high",
mitigation="Implement data encryption and access controls",
controls=["encryption", "access_controls", "data_masking"]
))
return threats
def _check_denial_of_service(self, data_flow: DataFlow) -> List[Threat]:
"""Check for DoS threats"""
threats = []
if 'rate_limiting' not in data_flow.controls:
threats.append(Threat(
threat_type=STRIDETthreatType.DENIAL_OF_SERVICE,
description=f"DoS vulnerability in {data_flow.name}",
target=data_flow.id,
likelihood="medium",
impact="high",
mitigation="Implement rate limiting and throttling",
controls=["rate_limiting", "throttling", "circuit_breaker"]
))
return threats
def _check_elevation_of_privilege(self, data_flow: DataFlow) -> List[Threat]:
"""Check for elevation of privilege threats"""
threats = []
if 'authorization' not in data_flow.controls:
threats.append(Threat(
threat_type=STRIDETthreatType.ELEVATION_OF_PRIVILEGE,
description=f"Privilege escalation in {data_flow.name}",
target=data_flow.id,
likelihood="medium",
impact="high",
mitigation="Implement proper authorization and privilege management",
controls=["authorization", "role_based_access", "principle_of_least_privilege"]
))
return threats
def generate_threat_report(self) -> Dict[str, Any]:
"""Generate threat model report"""
threats = self.analyze_threats()
# Calculate risk scores
high_risk = [t for t in threats if t.likelihood == "high" and t.impact == "high"]
medium_risk = [t for t in threats if t.likelihood in ["high", "medium"] and t.impact in ["high", "medium"]]
return {
'system': self.system_name,
'total_threats': len(threats),
'high_risk_threats': len(high_risk),
'medium_risk_threats': len(medium_risk),
'threats_by_type': {
threat_type.value: len([t for t in threats if t.threat_type == threat_type])
for threat_type in STRIDETthreatType
},
'recommended_controls': self._get_recommended_controls(threats),
'threats': [
{
'type': t.threat_type.value,
'description': t.description,
'likelihood': t.likelihood,
'impact': t.impact,
'mitigation': t.mitigation
}
for t in threats
]
}
def _get_recommended_controls(self, threats: List[Threat]) -> List[str]:
"""Get recommended security controls"""
controls = set()
for threat in threats:
controls.update(threat.controls)
return list(controls)
5. Compliance Frameworks
GDPR Compliance Implementation
# GDPR Compliance Implementation
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
class GDPRRight(Enum):
"""GDPR data subject rights"""
ACCESS = "right_to_access"
RECTIFICATION = "right_to_rectification"
ERASURE = "right_to_erasure"
RESTRICT = "right_to_restrict_processing"
PORTABILITY = "right_to_portability"
OBJECT = "right_to_object"
AUTOMATED_DECISION = "right_to_not_be_subject_to_automated_decision"
class DataClassification(Enum):
"""Data sensitivity classification"""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
SPECIAL_CATEGORY = "special_category" # GDPR special category data
@dataclass
class DataSubject:
"""Data subject (individual)"""
id: str
personal_data: Dict[str, Any]
consent_records: List['ConsentRecord']
access_requests: List['DataAccessRequest']
data_classification: DataClassification
@dataclass
class ConsentRecord:
"""Record of consent"""
purpose: str
granted_at: datetime
expires_at: Optional[datetime]
withdrawn_at: Optional[datetime]
lawful_basis: str # 'consent', 'contract', 'legal_obligation', etc.
@dataclass
class DataAccessRequest:
"""Data subject access request"""
request_type: GDPRRight
requested_at: datetime
deadline: datetime
status: str # 'pending', 'in_progress', 'completed'
response: Optional[Dict[str, Any]] = None
class GDPRCompliance:
"""GDPR compliance implementation"""
def __init__(self):
self.data_subjects: Dict[str, DataSubject] = {}
self.processing_activities: List[Dict[str, Any]] = []
self.data_breach_records: List[Dict[str, Any]] = []
def record_consent(self,
subject_id: str,
purpose: str,
lawful_basis: str,
duration_days: Optional[int] = None) -> ConsentRecord:
"""
Record data subject consent
"""
consent = ConsentRecord(
purpose=purpose,
granted_at=datetime.now(),
expires_at=datetime.now() + timedelta(days=duration_days) if duration_days else None,
withdrawn_at=None,
law_basis=lawful_basis
)
# Get or create data subject
if subject_id not in self.data_subjects:
self.data_subjects[subject_id] = DataSubject(
id=subject_id,
personal_data={},
consent_records=[],
access_requests=[],
data_classification=DataClassification.INTERNAL
)
self.data_subjects[subject_id].consent_records.append(consent)
return consent
def verify_consent(self,
subject_id: str,
purpose: str) -> bool:
"""
Verify if valid consent exists
"""
if subject_id not in self.data_subjects:
return False
subject = self.data_subjects[subject_id]
# Check for valid consent for purpose
for consent in subject.consent_records:
if consent.purpose == purpose:
# Check if consent is still valid
if consent.withdrawn_at is not None:
return False # Consent was withdrawn
if consent.expires_at and datetime.now() > consent.expires_at:
return False # Consent expired
return True # Valid consent found
return False # No consent found
def handle_access_request(self,
subject_id: str,
request_type: GDPRRight) -> DataAccessRequest:
"""
Handle data subject access request
"""
# Create access request
request = DataAccessRequest(
request_type=request_type,
requested_at=datetime.now(),
deadline=datetime.now() + timedelta(days=30), # GDPR requires response within 30 days
status='pending'
)
# Get data subject
if subject_id in self.data_subjects:
subject = self.data_subjects[subject_id]
if request_type == GDPRRight.ACCESS:
# Provide copy of all personal data
request.response = {
'personal_data': subject.personal_data,
'consent_records': [
{
'purpose': c.purpose,
'granted_at': c.granted_at.isoformat(),
'lawful_basis': c.lawful_basis
}
for c in subject.consent_records
],
'processing_activities': [
activity
for activity in self.processing_activities
if activity['subject_id'] == subject_id
]
}
elif request_type == GDPRRight.ERASURE:
# Right to be forgotten
if self._can_erase_data(subject):
subject.personal_data = {}
subject.consent_records = []
request.response = {'status': 'data_erased'}
elif request_type == GDPRRight.PORTABILITY:
# Right to data portability
request.response = {
'personal_data': subject.personal_data,
'format': 'machine_readable_format'
}
request.status = 'completed'
subject.access_requests.append(request)
return request
def _can_erase_data(self, subject: DataSubject) -> bool:
"""Check if data can be erased"""
# Cannot erase if required for legal obligation
# Cannot erase if required for performance of contract
# Implementation depends on specific circumstances
return True
def record_data_breach(self,
breach_details: Dict[str, Any]) -> str:
"""
Record data breach for GDPR notification requirements
"""
breach_record = {
'breach_id': self._generate_breach_id(),
'detected_at': datetime.now().isoformat(),
'description': breach_details.get('description'),
'affected_subjects': breach_details.get('affected_subjects', []),
'data_types': breach_details.get('data_types', []),
'severity': breach_details.get('severity', 'medium'),
'containment_actions': breach_details.get('containment_actions', []),
'notification_to_authority': False,
'notification_to_subjects': False
}
self.data_breach_records.append(breach_record)
# Check if breach needs to be reported to authorities (within 72 hours)
if breach_details.get('risk_to_individuals') == 'high':
self._notify_data_protection_authority(breach_record)
# Check if breach needs to be reported to data subjects
if breach_details.get('high_risk_to_rights_freedoms'):
self._notify_data_subjects(breach_record)
return breach_record['breach_id']
def _notify_data_protection_authority(self, breach: Dict[str, Any]):
"""Notify data protection authority of breach"""
# Implementation for notifying authority within 72 hours
breach['notification_to_authority'] = True
breach['authority_notified_at'] = datetime.now().isoformat()
def _notify_data_subjects(self, breach: Dict[str, Any]):
"""Notify data subjects of breach"""
# Implementation for notifying affected individuals
breach['notification_to_subjects'] = True
breach['subjects_notified_at'] = datetime.now().isoformat()
def _generate_breach_id(self) -> str:
"""Generate breach ID"""
return f"breach_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def generate_compliance_report(self) -> Dict[str, Any]:
"""Generate GDPR compliance report"""
return {
'total_data_subjects': len(self.data_subjects),
'active_consent_records': sum(
len([c for c in s.consent_records if c.withdrawn_at is None])
for s in self.data_subjects.values()
),
'pending_access_requests': sum(
len([r for r in s.access_requests if r.status == 'pending'])
for s in self.data_subjects.values()
),
'data_breaches': len(self.data_breach_records),
'processing_activities': len(self.processing_activities),
'compliance_status': self._check_compliance_status()
}
def _check_compliance_status(self) -> Dict[str, Any]:
"""Check GDPR compliance status"""
return {
'consent_management': 'compliant',
'data_subject_rights': 'compliant',
'data_breach_notification': 'compliant',
'data_protection_by_design': 'compliant',
'data_protection_officer': 'appointed',
'records_of_processing_activities': 'maintained'
}
6. Decision Trees
Security Control Selection
Threat type?
│
├─ Spoofing → Authentication, Identity verification
├─ Tampering → Encryption, Digital signatures, Integrity checks
├─ Repudiation → Audit logging, Non-repudiation controls
├─ Information disclosure → Encryption, Access controls, DLP
├─ Denial of service → Rate limiting, Redundancy, DDoS protection
└─ Elevation of privilege → Authorization, Least privilege, RBAC
Compliance Framework Selection
Data type and location?
│
├─ EU personal data → GDPR
├─ US healthcare data → HIPAA
├─ Payment card data → PCI-DSS
├─ Financial data → SOX, Basel III
├─ Federal systems → NIST 800-53
└─ Global operations → ISO 27001
7. Anti-Patterns to Avoid
- Trust by default: Never trust, always verify
- Hard-coded secrets: Never hard-code credentials
- Over-permissive IAM: Always follow least privilege
- Missing monitoring: You can't secure what you can't see
- Ignoring compliance: Compliance is a baseline, not a ceiling
- Security theater: Focus on real threats, not just checkboxes
- Neglecting supply chain: Secure your entire supply chain
- No incident response: Have a plan before you need it
- Secrets in code: Never commit secrets to repositories
- Assuming internal safety: Zero trust applies everywhere
8. Quality Checklist
Before considering security architecture complete:
- Threat modeling completed
- Security controls implemented for all threats
- Data classification implemented
- Encryption at rest and in transit
- Identity and access management configured
- Security monitoring and logging implemented
- Incident response plan created
- Compliance requirements addressed
- Security testing performed
- Penetration testing completed
- Security documentation complete
- Secure development practices implemented
- Third-party security assessed
- Business continuity plan tested
- Security awareness training completed
- Vulnerability management program active
- Regular security reviews scheduled
- Data retention policies defined
- Privacy by design implemented
- Security metrics and KPIs defined
This comprehensive skill definition provides complete guidance for security architecture across modern systems.