Back to blog

Elasticsearch for SIEM and Centralized Logging at Scale

November 10, 2024
9 min read

Elasticsearch for SIEM and Centralized Logging at Scale

Elasticsearch powers modern SIEM and logging platforms. Learn how to build, scale, and secure an enterprise logging infrastructure.

Architecture Overview

ELK Stack Components

Data Sources → Beats/Logstash → Elasticsearch → Kibana ↓ Index Lifecycle ↓ Cold Storage

Components:

  • Elasticsearch: Search and analytics engine
  • Logstash: Data processing pipeline
  • Kibana: Visualization and dashboards
  • Beats: Lightweight data shippers

Elasticsearch Cluster Design

Production Cluster Architecture

┌─────────────────────────────────────┐ │ Load Balancer (HAProxy) │ └─────────────────────────────────────┘ ↓ ↓ ↓ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Coordinating │ │ Coordinating │ │ Coordinating │ │ Node │ │ Node │ │ Node │ └──────────────┘ └──────────────┘ └──────────────┘ ↓ ↓ ↓ ┌──────────────────────────────────────────────────┐ │ Master Nodes (3) │ │ - Cluster state management │ │ - Index creation/deletion │ └──────────────────────────────────────────────────┘ ↓ ↓ ↓ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Data Node │ │ Data Node │ │ Data Node │ │ (Hot) │ │ (Hot) │ │ (Hot) │ │ - NVMe SSD │ │ - NVMe SSD │ │ - NVMe SSD │ └──────────────┘ └──────────────┘ └──────────────┘ ↓ ↓ ↓ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Data Node │ │ Data Node │ │ Data Node │ │ (Warm) │ │ (Warm) │ │ (Warm) │ │ - SATA SSD │ │ - SATA SSD │ │ - SATA SSD │ └──────────────┘ └──────────────┘ └──────────────┘ ↓ ↓ ↓ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Data Node │ │ Data Node │ │ Data Node │ │ (Cold) │ │ (Cold) │ │ (Cold) │ │ - HDD │ │ - HDD │ │ - HDD │ └──────────────┘ └──────────────┘ └──────────────┘

Node Roles Configuration

# elasticsearch.yml # Master-eligible node node.roles: [ master ] node.name: master-01 cluster.name: security-logs # Data node (hot tier) node.roles: [ data_hot, data_content ] node.name: data-hot-01 node.attr.data: hot # Data node (warm tier) node.roles: [ data_warm, data_content ] node.name: data-warm-01 node.attr.data: warm # Data node (cold tier) node.roles: [ data_cold ] node.name: data-cold-01 node.attr.data: cold # Coordinating node node.roles: [] node.name: coord-01

Installation and Configuration

Elasticsearch Installation

# Install Elasticsearch wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.0-x86_64.rpm rpm -ivh elasticsearch-8.11.0-x86_64.rpm # Configure JVM heap (50% of RAM, max 32GB) cat >> /etc/elasticsearch/jvm.options.d/heap.options << EOF -Xms16g -Xmx16g EOF # Configure Elasticsearch cat > /etc/elasticsearch/elasticsearch.yml << EOF cluster.name: security-logs node.name: ${HOSTNAME} path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch network.host: 0.0.0.0 http.port: 9200 discovery.seed_hosts: ["master-01", "master-02", "master-03"] cluster.initial_master_nodes: ["master-01", "master-02", "master-03"] # Security xpack.security.enabled: true xpack.security.transport.ssl.enabled: true xpack.security.http.ssl.enabled: true # Monitoring xpack.monitoring.collection.enabled: true EOF # Start Elasticsearch systemctl enable --now elasticsearch

Index Lifecycle Management (ILM)

PUT _ilm/policy/security-logs-policy { "policy": { "phases": { "hot": { "min_age": "0ms", "actions": { "rollover": { "max_size": "50GB", "max_age": "1d", "max_docs": 100000000 }, "set_priority": { "priority": 100 } } }, "warm": { "min_age": "7d", "actions": { "allocate": { "require": { "data": "warm" } }, "forcemerge": { "max_num_segments": 1 }, "shrink": { "number_of_shards": 1 }, "set_priority": { "priority": 50 } } }, "cold": { "min_age": "30d", "actions": { "allocate": { "require": { "data": "cold" } }, "searchable_snapshot": { "snapshot_repository": "cold-snapshots" }, "set_priority": { "priority": 0 } } }, "delete": { "min_age": "90d", "actions": { "delete": {} } } } } }

Index Templates

PUT _index_template/security-logs { "index_patterns": ["security-logs-*"], "template": { "settings": { "number_of_shards": 3, "number_of_replicas": 1, "index.lifecycle.name": "security-logs-policy", "index.lifecycle.rollover_alias": "security-logs", "codec": "best_compression" }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "source_ip": { "type": "ip" }, "destination_ip": { "type": "ip" }, "user": { "type": "keyword" }, "action": { "type": "keyword" }, "severity": { "type": "keyword" }, "message": { "type": "text" }, "event_id": { "type": "keyword" }, "hostname": { "type": "keyword" }, "process": { "type": "keyword" }, "geoip": { "properties": { "location": { "type": "geo_point" }, "country_name": { "type": "keyword" }, "city_name": { "type": "keyword" } } } } } } }

Logstash Configuration

Input Plugins

# /etc/logstash/conf.d/01-inputs.conf input { # Beats input beats { port => 5044 ssl => true ssl_certificate => "/etc/logstash/certs/logstash.crt" ssl_key => "/etc/logstash/certs/logstash.key" } # Syslog input syslog { port => 514 type => "syslog" } # Windows Event Logs tcp { port => 5140 codec => json type => "windows" } # Firewall logs udp { port => 5141 type => "firewall" } # HTTP input for applications http { port => 8080 codec => json type => "application" } }

Filter Plugins

# /etc/logstash/conf.d/10-filters.conf filter { # Parse syslog if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:syslog_message}" } } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } # Parse SSH authentication if [program] == "sshd" { grok { match => { "syslog_message" => [ "Accepted %{WORD:auth_method} for %{USER:username} from %{IP:source_ip} port %{INT:source_port}", "Failed %{WORD:auth_method} for %{USER:username} from %{IP:source_ip} port %{INT:source_port}" ] } } if "Failed" in [syslog_message] { mutate { add_field => { "severity" => "warning" } add_field => { "event_type" => "authentication_failure" } } } } # GeoIP enrichment if [source_ip] { geoip { source => "source_ip" target => "geoip" } } # Add tags for security events if [event_type] == "authentication_failure" { mutate { add_tag => [ "security", "authentication" ] } } # Drop noisy events if [program] == "systemd" and "Started Session" in [message] { drop { } } }

Output Configuration

# /etc/logstash/conf.d/30-outputs.conf output { # Send to Elasticsearch elasticsearch { hosts => ["https://es-01:9200", "https://es-02:9200", "https://es-03:9200"] index => "security-logs-%{+YYYY.MM.dd}" user => "logstash_writer" password => "${LOGSTASH_ES_PASSWORD}" ssl => true cacert => "/etc/logstash/certs/ca.crt" } # Send critical alerts to separate index if "critical" in [tags] { elasticsearch { hosts => ["https://es-01:9200"] index => "security-alerts-%{+YYYY.MM.dd}" user => "logstash_writer" password => "${LOGSTASH_ES_PASSWORD}" } } # Debug output (disable in production) # stdout { codec => rubydebug } }

Beats Configuration

Filebeat for Log Collection

# /etc/filebeat/filebeat.yml filebeat.inputs: # System logs - type: log enabled: true paths: - /var/log/messages - /var/log/secure - /var/log/audit/audit.log fields: log_type: system environment: production # Application logs - type: log enabled: true paths: - /var/log/app/*.log multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' multiline.negate: true multiline.match: after fields: log_type: application # Processors processors: - add_host_metadata: netinfo.enabled: true - add_cloud_metadata: ~ - add_docker_metadata: ~ - drop_event: when: regexp: message: "^DEBUG" # Output to Logstash output.logstash: hosts: ["logstash-01:5044", "logstash-02:5044"] loadbalance: true ssl.certificate_authorities: ["/etc/filebeat/ca.crt"]

Metricbeat for System Metrics

# /etc/metricbeat/metricbeat.yml metricbeat.modules: - module: system metricsets: - cpu - load - memory - network - process - process_summary - diskio period: 10s - module: elasticsearch metricsets: - node - node_stats - cluster_stats period: 10s hosts: ["https://localhost:9200"] output.elasticsearch: hosts: ["https://es-01:9200"] index: "metricbeat-%{+yyyy.MM.dd}"

SIEM Use Cases

Security Monitoring Queries

// Failed SSH attempts from same IP GET security-logs-*/_search { "query": { "bool": { "must": [ { "match": { "event_type": "authentication_failure" }}, { "match": { "program": "sshd" }}, { "range": { "@timestamp": { "gte": "now-1h" }}} ] } }, "aggs": { "by_source_ip": { "terms": { "field": "source_ip", "size": 10, "min_doc_count": 5 } } } } // Privilege escalation attempts GET security-logs-*/_search { "query": { "bool": { "should": [ { "match": { "message": "sudo" }}, { "match": { "message": "su -" }}, { "match": { "command": "sudo" }} ], "minimum_should_match": 1 } } } // Unusual login times GET security-logs-*/_search { "query": { "bool": { "must": [ { "match": { "event_type": "authentication_success" }}, { "script": { "script": { "source": "doc['@timestamp'].value.getHour() < 6 || doc['@timestamp'].value.getHour() > 22" } } } ] } } }

Alerting Rules

// Create watcher for brute force detection PUT _watcher/watch/ssh-brute-force { "trigger": { "schedule": { "interval": "5m" } }, "input": { "search": { "request": { "indices": ["security-logs-*"], "body": { "query": { "bool": { "must": [ { "match": { "event_type": "authentication_failure" }}, { "range": { "@timestamp": { "gte": "now-5m" }}} ] } }, "aggs": { "by_ip": { "terms": { "field": "source_ip", "min_doc_count": 10 } } } } } } }, "condition": { "compare": { "ctx.payload.aggregations.by_ip.buckets.0.doc_count": { "gte": 10 } } }, "actions": { "send_email": { "email": { "to": "security@company.com", "subject": "SSH Brute Force Detected", "body": "Detected {{ctx.payload.aggregations.by_ip.buckets.0.doc_count}} failed SSH attempts from {{ctx.payload.aggregations.by_ip.buckets.0.key}}" } }, "block_ip": { "webhook": { "method": "POST", "url": "https://firewall-api.local/block", "body": "{\"ip\": \"{{ctx.payload.aggregations.by_ip.buckets.0.key}}\"}" } } } }

Performance Tuning

Elasticsearch Optimization

# elasticsearch.yml # Thread pools thread_pool.write.queue_size: 1000 thread_pool.search.queue_size: 1000 # Circuit breakers indices.breaker.total.limit: 70% indices.breaker.request.limit: 40% indices.breaker.fielddata.limit: 40% # Cache settings indices.queries.cache.size: 10% indices.fielddata.cache.size: 20% # Bulk settings bulk.queue_size: 200

Index Optimization

# Force merge old indices POST /security-logs-2024.01.*/_forcemerge?max_num_segments=1 # Shrink index POST /security-logs-2024.01.01/_shrink/security-logs-2024.01.01-shrink { "settings": { "index.number_of_shards": 1 } } # Reindex with better mapping POST _reindex { "source": { "index": "old-logs-*" }, "dest": { "index": "security-logs-2024" } }

Monitoring and Maintenance

Cluster Health Monitoring

# Cluster health GET _cluster/health # Node stats GET _nodes/stats # Index stats GET _cat/indices?v&s=store.size:desc # Pending tasks GET _cluster/pending_tasks # Hot threads GET _nodes/hot_threads

Backup and Restore

// Register snapshot repository PUT _snapshot/backup_repository { "type": "fs", "settings": { "location": "/mnt/backups/elasticsearch", "compress": true } } // Create snapshot PUT _snapshot/backup_repository/snapshot_2024_01_01 { "indices": "security-logs-*", "ignore_unavailable": true, "include_global_state": false } // Restore snapshot POST _snapshot/backup_repository/snapshot_2024_01_01/_restore { "indices": "security-logs-2024.01.*", "ignore_unavailable": true }

Security Best Practices

Access Control

# Role-based access control PUT _security/role/security_analyst { "cluster": ["monitor"], "indices": [ { "names": ["security-logs-*"], "privileges": ["read", "view_index_metadata"] } ] } # Create user PUT _security/user/analyst1 { "password": "secure_password", "roles": ["security_analyst"], "full_name": "Security Analyst" }

Audit Logging

# elasticsearch.yml xpack.security.audit.enabled: true xpack.security.audit.logfile.events.include: [ "access_denied", "authentication_failed", "authentication_success", "connection_denied" ]

Conclusion

Elasticsearch provides a powerful platform for SIEM and centralized logging. Proper architecture, configuration, and monitoring ensure reliable security operations.

Key Takeaways:

  • Design for scale with hot/warm/cold architecture
  • Implement ILM for automatic data management
  • Use Logstash for data enrichment
  • Create meaningful alerts and dashboards
  • Monitor cluster health continuously
  • Secure access with RBAC
  • Backup regularly

References:

  • Elasticsearch Official Documentation
  • Elastic SIEM Guide
  • NIST Cybersecurity Framework
  • CIS Elasticsearch Benchmark