Logging for Security: SIEM and SOAR
Detecting and Responding to Incidents
TL;DR
Security logging captures activity across all systems and applications. SIEM (Security Information and Event Management) aggregates, correlates, and analyzes logs to detect incidents. SOAR (Security Orchestration, Automation and Response) automates response actions (isolate host, revoke credentials, open tickets). Together they enable detection of attacks that signatures miss. Without logging, breaches go unnoticed. Without SIEM, logs are useless (billions of events, humans can't detect patterns). Without SOAR, mean time to response is hours, not minutes.
Learning Objectives
- Identify critical events requiring logging
- Implement centralized log aggregation
- Deploy SIEM for threat detection
- Create detection rules and alerts
- Automate incident response with SOAR
- Ensure logs are tamper-evident
Core Concepts
What to Log
Critical Events:
- Authentication and authorization (logins, permission changes)
- Data access and modification
- Configuration changes
- Security control changes (firewall rules, policies)
- Privilege escalation attempts
- Network connections to suspicious IPs
- Malware detection
- Policy violations
Log Format: Structured logs (JSON, syslog) for easier parsing
{
"timestamp": "2025-02-14T10:30:45Z",
"event_type": "auth_failure",
"user": "john.doe",
"source_ip": "203.0.113.45",
"target": "database.prod",
"result": "failure",
"reason": "invalid_password",
"severity": "medium"
}
SIEM Architecture
Applications Firewalls Hosts Endpoints
| | | |
└────────────────┴──────────┴──────────┘
|
Log Collector / Agent
|
Log Aggregation Layer
|
┌────────────┴───────────┐
| |
Indexing Correlation
(ELK, Splunk) (Rules, ML)
| |
└────────────┬───────────┘
|
Alert & Detection Engine
|
SOAR
┌────────────┴───────────┐
| |
Notification Automation
(Email, Slack) (Isolate, Revoke)
SOAR Playbooks
Automated response to common incidents:
Example: Suspicious login detected
- Alert SIEM rule
- Gather context (location, time, device)
- If risk score > threshold:
- Require additional MFA
- Create incident ticket
- Notify security team
- Optional: Isolate user session
When to Use / When Not to Use
- Log all security-relevant events
- Centralized log aggregation
- Immutable audit trail (append-only)
- Correlate logs across systems
- Create rules for known attack patterns
- Use ML for anomaly detection
- Automated response via SOAR
- Regular tuning to reduce false positives
- Logging everything (alert fatigue)
- Logs stored locally (easy to delete)
- No correlation between events
- No alerting (logs exist but unread)
- Manual response to every alert
- Never tuning rules (false positives)
- Not logging privileged actions
- Insufficient log retention
Practical Examples
- SIEM Detection Rule
- SOAR Playbook
- Log Format & Collection
{
"name": "Multiple Failed Logins to Admin Account",
"description": "Detect brute force attack on admin account",
"rule_type": "frequency",
"condition": {
"event_type": "auth_failure",
"user": "admin*",
"time_window": "5m",
"threshold": 5
},
"enrichment": [
{
"source_ip": "lookup_geoip"
},
{
"source_ip": "lookup_threat_intel"
}
],
"correlation": [
{
"name": "successful_admin_login",
"time_after": "30m",
"action": "ESCALATE_TO_HIGH"
}
],
"response": {
"alert": true,
"severity": "MEDIUM",
"notification": ["security-team@company.com", "slack:#security"],
"soar_playbook": "brute_force_admin_attack"
}
}
Rule logic:
- 5+ failed logins to admin* in 5 minutes → Potential brute force
- Enrich with GeoIP (where attacks from?) and threat intel (known bad IP?)
- If followed by successful login within 30 min → Escalate to HIGH (attacker got in!)
- Create alert, notify team, trigger SOAR playbook
playbook_name: brute_force_admin_attack
description: Respond to admin account brute force attempts
triggers:
- alert_rule: "Multiple Failed Logins to Admin Account"
actions:
- name: gather_context
type: enrichment
steps:
- lookup_user_details: admin_account
- get_active_sessions: admin_account
- check_recent_actions: admin_account
- query_threat_intel: source_ip
- name: assess_risk
type: decision
condition:
- if: successful_login_detected AND high_risk_location
then: execute: isolate_account
- if: only_failed_attempts
then: execute: strengthen_auth
- name: isolate_account
type: response
steps:
- revoke_all_sessions: admin_account
- enforce_mfa: admin_account
- force_password_reset: admin_account
- log_action: "Account isolation due to brute force"
- name: strengthen_auth
type: response
steps:
- enable_ip_whitelist: admin_account
- increase_mfa_level: admin_account
- log_action: "Authentication strengthened"
- name: create_incident
type: notification
steps:
- create_ticket:
title: "Brute Force Attempt on Admin Account"
priority: HIGH
description: "Alert triggered at {{ alert_time }}"
- notify_soc_team: "New incident created"
- name: escalate_if_compromised
type: conditional
condition:
- if: successful_admin_login AND unauthorized_actions_detected
then: declare_security_incident
actions:
- isolate_all_admin_systems
- preserve_logs
- initiate_incident_response_team
{
"timestamp": "2025-02-14T10:30:45.123Z",
"event_type": "auth_failure",
"severity": "medium",
"source": "auth_service",
"user": "admin_user",
"user_id": 42,
"source_ip": "203.0.113.45",
"source_ip_country": "CN",
"target_system": "database.prod",
"target_service": "postgresql",
"result": "failure",
"reason": "invalid_password",
"attempt_number": 3,
"mfa_enabled": true,
"mfa_verified": false,
"user_agent": "Mozilla/5.0...",
"session_id": "sess_abc123",
"request_id": "req_xyz789",
"trace_id": "trace_def456"
}
Log collection approaches:
Agent-based (Filebeat, Fluentd):
- Runs on each host
- Tails log files, sends to aggregation
- Works with legacy systems
Syslog:
- Standard protocol (RFC 5424)
- Lightweight, compatible
- Centralizes logs to syslog server
Direct API logging:
- Application sends logs directly to SIEM
- Low latency, controlled format
- Requires SIEM endpoint, credentials
Design Review Checklist
- All systems logging security events?
- Logs in structured format (JSON)?
- Timestamps synchronized (NTP)?
- Log verbosity appropriate (not too much)?
- Centralized log aggregation?
- Logs immutable (append-only)?
- Adequate retention (compliance requirement)?
- Encryption in transit and at rest?
- SIEM rules for known attack patterns?
- Alerts configured with appropriate severity?
- Alert fatigue addressed (tuning)?
- Escalation process defined?
- SOAR playbooks for common incidents?
- Automated actions safe (no data loss)?
- Human approval for destructive actions?
- Playbook effectiveness tracked?
Self-Check
- What events should be logged for security?
- How does SIEM differ from just collecting logs?
- What's the purpose of enrichment in SIEM rules?
- How does SOAR reduce mean time to response?
- What log format is best for SIEM consumption?
Advanced SIEM/SOAR Implementation
Multi-Tenant SIEM Architecture
Supporting multiple customers in SaaS environment:
┌─────────────────────────────────────────────────┐
│ Multi-Tenant SIEM │
├─────────────────────────────────────────────────┤
│ │
│ Tenant A: Logs → Collector → Aggregation │
│ Tenant B: Logs → Collector → Aggregation │
│ Tenant C: Logs → Collector → Aggregation │
│ ↓ │
│ ┌───────────────────────┐ │
│ │ Tenant Isolation Layer │ │
│ │ (Row-level security) │ │
│ └───────┬───────────────┘ │
│ ↓ │
│ ┌─────────────────────┐ │
│ │ Shared Index Store │ │
│ │ (Elasticsearch, etc) │ │
│ └─────────┬───────────┘ │
│ ↓ │
│ Tenant A: Query "my logs only" │
│ Tenant B: Query "my logs only" │
│ (Database enforces row filtering) │
│ │
└─────────────────────────────────────────────────┘
Anomaly Detection with Machine Learning
Use ML to detect unusual patterns:
import numpy as np
from sklearn.ensemble import IsolationForest
class AnomalyDetector:
"""Use ML to detect unusual login patterns."""
def __init__(self):
self.model = IsolationForest(contamination=0.1)
self.is_trained = False
def train(self, historical_logs: List[dict]):
"""Train on normal behavior."""
features = self.extract_features(historical_logs)
self.model.fit(features)
self.is_trained = True
def detect_anomaly(self, login_event: dict) -> bool:
"""Detect if login is anomalous."""
if not self.is_trained:
return False
features = self.extract_features([login_event])
anomaly_score = self.model.predict(features)[0]
return anomaly_score == -1 # -1 = anomaly
def extract_features(self, logs: List[dict]) -> np.ndarray:
"""Extract features for ML model."""
features = []
for log in logs:
feature_vector = [
self.hour_of_day(log["timestamp"]),
self.day_of_week(log["timestamp"]),
self.geoip_to_distance(log.get("source_ip")),
self.device_is_new(log.get("user_agent")),
self.login_failure_count(log.get("user")),
]
features.append(feature_vector)
return np.array(features)
def hour_of_day(self, timestamp: str) -> int:
from datetime import datetime
dt = datetime.fromisoformat(timestamp)
return dt.hour
def day_of_week(self, timestamp: str) -> int:
from datetime import datetime
dt = datetime.fromisoformat(timestamp)
return dt.weekday()
def geoip_to_distance(self, ip: str) -> float:
"""Distance from user's normal location."""
# Would use GeoIP database
return 0.0
def device_is_new(self, user_agent: str) -> int:
"""1 if device is unknown, 0 if known."""
return 1
def login_failure_count(self, user: str) -> int:
"""Recent failed login attempts."""
return 0
# Train on 1 year of normal logs
detector = AnomalyDetector()
detector.train(normal_login_logs)
# Detect anomalies in real-time
for log in incoming_logs:
if detector.detect_anomaly(log):
alert_soc_team(f"Anomalous login detected: {log['user']}")
SOAR Workflow Automation
Advanced playbook with decision trees:
name: "Insider Threat Detection"
description: "Detect and respond to insider threats"
variables:
risk_threshold: 80 # Risk score 0-100
admin_users: ["admin1", "admin2"]
protected_folders: ["/data", "/financial"]
triggers:
- event: "file_access"
condition: "target_path in protected_folders"
actions:
- name: "gather_context"
steps:
- lookup_user: "{{ event.user }}"
- get_user_department: "{{ event.user }}"
- get_normal_access_patterns: "{{ event.user }}"
- check_recent_termination: "{{ event.user }}"
- name: "calculate_risk_score"
type: "decision_tree"
rules:
- if: "user in admin_users"
weight: +20
- if: "access_time outside work_hours"
weight: +30
- if: "unusual_volume_access"
weight: +25
- if: "user_recently_terminated"
weight: +50
- if: "geography_mismatch (VPN home, accessing office)"
weight: +15
score: "{{ sum_weights }}"
- name: "respond_based_on_risk"
type: "conditional"
cases:
- when: "score >= 80"
then:
- alert_severity: "CRITICAL"
- isolate_user_session: "{{ event.user }}"
- revoke_temporary_credentials: "{{ event.user }}"
- create_incident: "Insider threat detected"
- notify: "ciso@company.com"
- when: "score >= 60"
then:
- alert_severity: "HIGH"
- require_mfa_reauthentication: "{{ event.user }}"
- monitor_activity_closely: "{{ event.user }}"
- create_incident: "Suspicious insider activity"
- when: "score >= 40"
then:
- alert_severity: "MEDIUM"
- log_event: "Potentially suspicious access"
- name: "investigation_workflow"
steps:
- create_case: "insider-threat-{{ timestamp }}"
- export_timeline: "access logs for user"
- preserve_evidence: "disk snapshots"
- notify_legal: "Document for potential investigation"
Integration with ITSM and Ticketing
SIEM → SOAR → Ticketing system:
class IncidentCreationService:
"""SOAR creates tickets in Jira when incidents detected."""
def on_high_severity_alert(self, alert: dict):
"""SOAR rule triggered; create Jira ticket."""
ticket = {
"project": "SEC", # Security project
"type": "Incident",
"summary": f"Security Alert: {alert['name']}",
"description": f"""
Severity: {alert['severity']}
Detection Time: {alert['timestamp']}
Affected Asset: {alert['asset']}
Log Pattern: {alert['pattern']}
Rules Triggered: {', '.join(alert['rules'])}
Context:
{json.dumps(alert['enrichment'], indent=2)}
""",
"priority": self.severity_to_priority(alert['severity']),
"labels": ["security", "incident", alert['category']],
"assignee": self.get_oncall_security_engineer()
}
# Create ticket
jira_ticket = self.jira_client.create_issue(**ticket)
# Link back to SIEM
self.siem.link_alert_to_ticket(alert['id'], jira_ticket.key)
# Start SLA clock
self.sla_tracker.start_sla(jira_ticket.key, alert['severity'])
return jira_ticket
def severity_to_priority(self, severity: str) -> str:
mapping = {
"CRITICAL": "P1",
"HIGH": "P2",
"MEDIUM": "P3",
"LOW": "P4"
}
return mapping.get(severity, "P4")
Compliance Reporting
SIEM/SOAR data for compliance audits:
class ComplianceReporter:
"""Generate compliance reports from SIEM data."""
def generate_pci_report(self, start_date: str, end_date: str):
"""PCI DSS compliance: logging and monitoring requirements."""
return {
"period": f"{start_date} to {end_date}",
"requirements": {
"10.1": {
"description": "Log all access to audit trails",
"status": "Compliant",
"evidence": self.siem.query("""
SELECT COUNT(*) as log_entries
FROM audit_logs
WHERE timestamp BETWEEN '{start_date}' AND '{end_date}'
""")
},
"10.2.1": {
"description": "Log all user access",
"status": "Compliant",
"evidence": self.siem.query("""
SELECT DISTINCT event_type
FROM logs
WHERE event_type LIKE '%access%'
AND timestamp BETWEEN '{start_date}' AND '{end_date}'
""")
},
"10.5": {
"description": "Log access restrictions modification",
"status": "Compliant",
"evidence": self.siem.query("""
SELECT COUNT(*) as changes
FROM logs
WHERE event_type = 'access_restriction_changed'
AND timestamp BETWEEN '{start_date}' AND '{end_date}'
""")
}
}
}
Next Steps
- Deploy logging infrastructure — Standardize log format, centralize collection
- Implement SIEM — Deploy tool (Splunk, ELK, Sumo Logic), configure rules
- Create detection rules — Known attack patterns, behavioral anomalies
- Build SOAR playbooks — Common incidents, automated response
- Tune and optimize — Reduce false positives, improve detection
- Train team — SOC analysts understand SIEM, incident responders understand playbooks
- Integrate with ticketing — Automatic ticket creation for incidents
- Compliance automation — Generate reports for audits automatically