Elasticsearch for SIEM and Centralized Logging at Scale
November 10, 2024
9 min read
Elasticsearch for SIEM and Centralized Logging at Scale
Elasticsearch powers modern SIEM and logging platforms. Learn how to build, scale, and secure an enterprise logging infrastructure.
Architecture Overview
ELK Stack Components
Data Sources → Beats/Logstash → Elasticsearch → Kibana
↓
Index Lifecycle
↓
Cold Storage
Components:
- Elasticsearch: Search and analytics engine
- Logstash: Data processing pipeline
- Kibana: Visualization and dashboards
- Beats: Lightweight data shippers
Elasticsearch Cluster Design
Production Cluster Architecture
┌─────────────────────────────────────┐
│ Load Balancer (HAProxy) │
└─────────────────────────────────────┘
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Coordinating │ │ Coordinating │ │ Coordinating │
│ Node │ │ Node │ │ Node │
└──────────────┘ └──────────────┘ └──────────────┘
↓ ↓ ↓
┌──────────────────────────────────────────────────┐
│ Master Nodes (3) │
│ - Cluster state management │
│ - Index creation/deletion │
└──────────────────────────────────────────────────┘
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Node │ │ Data Node │ │ Data Node │
│ (Hot) │ │ (Hot) │ │ (Hot) │
│ - NVMe SSD │ │ - NVMe SSD │ │ - NVMe SSD │
└──────────────┘ └──────────────┘ └──────────────┘
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Node │ │ Data Node │ │ Data Node │
│ (Warm) │ │ (Warm) │ │ (Warm) │
│ - SATA SSD │ │ - SATA SSD │ │ - SATA SSD │
└──────────────┘ └──────────────┘ └──────────────┘
↓ ↓ ↓
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Node │ │ Data Node │ │ Data Node │
│ (Cold) │ │ (Cold) │ │ (Cold) │
│ - HDD │ │ - HDD │ │ - HDD │
└──────────────┘ └──────────────┘ └──────────────┘
Node Roles Configuration
# elasticsearch.yml
# Master-eligible node
node.roles: [ master ]
node.name: master-01
cluster.name: security-logs
# Data node (hot tier)
node.roles: [ data_hot, data_content ]
node.name: data-hot-01
node.attr.data: hot
# Data node (warm tier)
node.roles: [ data_warm, data_content ]
node.name: data-warm-01
node.attr.data: warm
# Data node (cold tier)
node.roles: [ data_cold ]
node.name: data-cold-01
node.attr.data: cold
# Coordinating node
node.roles: []
node.name: coord-01Installation and Configuration
Elasticsearch Installation
# Install Elasticsearch
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.0-x86_64.rpm
rpm -ivh elasticsearch-8.11.0-x86_64.rpm
# Configure JVM heap (50% of RAM, max 32GB)
cat >> /etc/elasticsearch/jvm.options.d/heap.options << EOF
-Xms16g
-Xmx16g
EOF
# Configure Elasticsearch
cat > /etc/elasticsearch/elasticsearch.yml << EOF
cluster.name: security-logs
node.name: ${HOSTNAME}
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["master-01", "master-02", "master-03"]
cluster.initial_master_nodes: ["master-01", "master-02", "master-03"]
# Security
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true
# Monitoring
xpack.monitoring.collection.enabled: true
EOF
# Start Elasticsearch
systemctl enable --now elasticsearchIndex Lifecycle Management (ILM)
PUT _ilm/policy/security-logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d",
"max_docs": 100000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"allocate": {
"require": {
"data": "warm"
}
},
"forcemerge": {
"max_num_segments": 1
},
"shrink": {
"number_of_shards": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": {
"require": {
"data": "cold"
}
},
"searchable_snapshot": {
"snapshot_repository": "cold-snapshots"
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}Index Templates
PUT _index_template/security-logs
{
"index_patterns": ["security-logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "security-logs-policy",
"index.lifecycle.rollover_alias": "security-logs",
"codec": "best_compression"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"source_ip": { "type": "ip" },
"destination_ip": { "type": "ip" },
"user": { "type": "keyword" },
"action": { "type": "keyword" },
"severity": { "type": "keyword" },
"message": { "type": "text" },
"event_id": { "type": "keyword" },
"hostname": { "type": "keyword" },
"process": { "type": "keyword" },
"geoip": {
"properties": {
"location": { "type": "geo_point" },
"country_name": { "type": "keyword" },
"city_name": { "type": "keyword" }
}
}
}
}
}
}Logstash Configuration
Input Plugins
# /etc/logstash/conf.d/01-inputs.conf
input {
# Beats input
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
# Syslog input
syslog {
port => 514
type => "syslog"
}
# Windows Event Logs
tcp {
port => 5140
codec => json
type => "windows"
}
# Firewall logs
udp {
port => 5141
type => "firewall"
}
# HTTP input for applications
http {
port => 8080
codec => json
type => "application"
}
}Filter Plugins
# /etc/logstash/conf.d/10-filters.conf
filter {
# Parse syslog
if [type] == "syslog" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:syslog_message}"
}
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
# Parse SSH authentication
if [program] == "sshd" {
grok {
match => {
"syslog_message" => [
"Accepted %{WORD:auth_method} for %{USER:username} from %{IP:source_ip} port %{INT:source_port}",
"Failed %{WORD:auth_method} for %{USER:username} from %{IP:source_ip} port %{INT:source_port}"
]
}
}
if "Failed" in [syslog_message] {
mutate {
add_field => { "severity" => "warning" }
add_field => { "event_type" => "authentication_failure" }
}
}
}
# GeoIP enrichment
if [source_ip] {
geoip {
source => "source_ip"
target => "geoip"
}
}
# Add tags for security events
if [event_type] == "authentication_failure" {
mutate {
add_tag => [ "security", "authentication" ]
}
}
# Drop noisy events
if [program] == "systemd" and "Started Session" in [message] {
drop { }
}
}Output Configuration
# /etc/logstash/conf.d/30-outputs.conf
output {
# Send to Elasticsearch
elasticsearch {
hosts => ["https://es-01:9200", "https://es-02:9200", "https://es-03:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
user => "logstash_writer"
password => "${LOGSTASH_ES_PASSWORD}"
ssl => true
cacert => "/etc/logstash/certs/ca.crt"
}
# Send critical alerts to separate index
if "critical" in [tags] {
elasticsearch {
hosts => ["https://es-01:9200"]
index => "security-alerts-%{+YYYY.MM.dd}"
user => "logstash_writer"
password => "${LOGSTASH_ES_PASSWORD}"
}
}
# Debug output (disable in production)
# stdout { codec => rubydebug }
}Beats Configuration
Filebeat for Log Collection
# /etc/filebeat/filebeat.yml
filebeat.inputs:
# System logs
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/secure
- /var/log/audit/audit.log
fields:
log_type: system
environment: production
# Application logs
- type: log
enabled: true
paths:
- /var/log/app/*.log
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
fields:
log_type: application
# Processors
processors:
- add_host_metadata:
netinfo.enabled: true
- add_cloud_metadata: ~
- add_docker_metadata: ~
- drop_event:
when:
regexp:
message: "^DEBUG"
# Output to Logstash
output.logstash:
hosts: ["logstash-01:5044", "logstash-02:5044"]
loadbalance: true
ssl.certificate_authorities: ["/etc/filebeat/ca.crt"]Metricbeat for System Metrics
# /etc/metricbeat/metricbeat.yml
metricbeat.modules:
- module: system
metricsets:
- cpu
- load
- memory
- network
- process
- process_summary
- diskio
period: 10s
- module: elasticsearch
metricsets:
- node
- node_stats
- cluster_stats
period: 10s
hosts: ["https://localhost:9200"]
output.elasticsearch:
hosts: ["https://es-01:9200"]
index: "metricbeat-%{+yyyy.MM.dd}"SIEM Use Cases
Security Monitoring Queries
// Failed SSH attempts from same IP
GET security-logs-*/_search
{
"query": {
"bool": {
"must": [
{ "match": { "event_type": "authentication_failure" }},
{ "match": { "program": "sshd" }},
{ "range": { "@timestamp": { "gte": "now-1h" }}}
]
}
},
"aggs": {
"by_source_ip": {
"terms": {
"field": "source_ip",
"size": 10,
"min_doc_count": 5
}
}
}
}
// Privilege escalation attempts
GET security-logs-*/_search
{
"query": {
"bool": {
"should": [
{ "match": { "message": "sudo" }},
{ "match": { "message": "su -" }},
{ "match": { "command": "sudo" }}
],
"minimum_should_match": 1
}
}
}
// Unusual login times
GET security-logs-*/_search
{
"query": {
"bool": {
"must": [
{ "match": { "event_type": "authentication_success" }},
{
"script": {
"script": {
"source": "doc['@timestamp'].value.getHour() < 6 || doc['@timestamp'].value.getHour() > 22"
}
}
}
]
}
}
}Alerting Rules
// Create watcher for brute force detection
PUT _watcher/watch/ssh-brute-force
{
"trigger": {
"schedule": {
"interval": "5m"
}
},
"input": {
"search": {
"request": {
"indices": ["security-logs-*"],
"body": {
"query": {
"bool": {
"must": [
{ "match": { "event_type": "authentication_failure" }},
{ "range": { "@timestamp": { "gte": "now-5m" }}}
]
}
},
"aggs": {
"by_ip": {
"terms": {
"field": "source_ip",
"min_doc_count": 10
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.aggregations.by_ip.buckets.0.doc_count": {
"gte": 10
}
}
},
"actions": {
"send_email": {
"email": {
"to": "security@company.com",
"subject": "SSH Brute Force Detected",
"body": "Detected {{ctx.payload.aggregations.by_ip.buckets.0.doc_count}} failed SSH attempts from {{ctx.payload.aggregations.by_ip.buckets.0.key}}"
}
},
"block_ip": {
"webhook": {
"method": "POST",
"url": "https://firewall-api.local/block",
"body": "{\"ip\": \"{{ctx.payload.aggregations.by_ip.buckets.0.key}}\"}"
}
}
}
}Performance Tuning
Elasticsearch Optimization
# elasticsearch.yml
# Thread pools
thread_pool.write.queue_size: 1000
thread_pool.search.queue_size: 1000
# Circuit breakers
indices.breaker.total.limit: 70%
indices.breaker.request.limit: 40%
indices.breaker.fielddata.limit: 40%
# Cache settings
indices.queries.cache.size: 10%
indices.fielddata.cache.size: 20%
# Bulk settings
bulk.queue_size: 200Index Optimization
# Force merge old indices
POST /security-logs-2024.01.*/_forcemerge?max_num_segments=1
# Shrink index
POST /security-logs-2024.01.01/_shrink/security-logs-2024.01.01-shrink
{
"settings": {
"index.number_of_shards": 1
}
}
# Reindex with better mapping
POST _reindex
{
"source": {
"index": "old-logs-*"
},
"dest": {
"index": "security-logs-2024"
}
}Monitoring and Maintenance
Cluster Health Monitoring
# Cluster health
GET _cluster/health
# Node stats
GET _nodes/stats
# Index stats
GET _cat/indices?v&s=store.size:desc
# Pending tasks
GET _cluster/pending_tasks
# Hot threads
GET _nodes/hot_threadsBackup and Restore
// Register snapshot repository
PUT _snapshot/backup_repository
{
"type": "fs",
"settings": {
"location": "/mnt/backups/elasticsearch",
"compress": true
}
}
// Create snapshot
PUT _snapshot/backup_repository/snapshot_2024_01_01
{
"indices": "security-logs-*",
"ignore_unavailable": true,
"include_global_state": false
}
// Restore snapshot
POST _snapshot/backup_repository/snapshot_2024_01_01/_restore
{
"indices": "security-logs-2024.01.*",
"ignore_unavailable": true
}Security Best Practices
Access Control
# Role-based access control
PUT _security/role/security_analyst
{
"cluster": ["monitor"],
"indices": [
{
"names": ["security-logs-*"],
"privileges": ["read", "view_index_metadata"]
}
]
}
# Create user
PUT _security/user/analyst1
{
"password": "secure_password",
"roles": ["security_analyst"],
"full_name": "Security Analyst"
}Audit Logging
# elasticsearch.yml
xpack.security.audit.enabled: true
xpack.security.audit.logfile.events.include: [
"access_denied",
"authentication_failed",
"authentication_success",
"connection_denied"
]Conclusion
Elasticsearch provides a powerful platform for SIEM and centralized logging. Proper architecture, configuration, and monitoring ensure reliable security operations.
Key Takeaways:
- Design for scale with hot/warm/cold architecture
- Implement ILM for automatic data management
- Use Logstash for data enrichment
- Create meaningful alerts and dashboards
- Monitor cluster health continuously
- Secure access with RBAC
- Backup regularly
References:
- Elasticsearch Official Documentation
- Elastic SIEM Guide
- NIST Cybersecurity Framework
- CIS Elasticsearch Benchmark