La stack ELK est un ensemble d'outils puissants pour la gestion des logs et l'analyse des données. Composée d'Elasticsearch pour la recherche et l'indexation, Logstash pour la collecte, l'agrégation et le traitement des logs, et Kibana pour la visualisation et l'analyse des données, la stack ELK offre une solution complète pour gérer les informations des systèmes.
Dans la suite nous allons utiliser vagrant pour créer une machine virtuelle et installer les différents outils nécessaires. Cette approche es utilisée afin d’abstraire l’inrastructure et de fonctionner avec une IaaC.
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/jammy64"
config.vm.network "private_network", ip: "192.168.56.10"
config.vm.network "public_network", bridge: "wlan0", ip: "192.168.1.70"
config.vm.synced_folder ".", "/vagrant", disabled: false
config.vm.synced_folder "./pipelines", "/etc/logstash/conf.d", disabled: false
config.vm.provider "virtualbox" do |vb|
vb.gui = false
vb.memory = "5192"
end
config.vm.provision "shell", inline: <<-SHELL
# Update the system
apt-get update
# Add the Elastic GPG key and repository
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
sudo apt-get install -y apt-transport-https
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
sudo apt-get update
# Install Elasticsearch and enable it
sudo apt-get install -y elasticsearch
sudo echo "-Xms1024m" > /etc/elasticsearch/jvm.options
sudo echo "-Xmx1024m" >> /etc/elasticsearch/jvm.options
sudo systemctl enable --now elasticsearch
sudo systemctl restart elasticsearch
# Install Kibana and enable it
sudo apt-get install -y kibana
sudo echo "-Xms512m" > /etc/kibana/jvm.options
sudo echo "-Xmx512m" >> /etc/kibana/jvm.options
sudo systemctl enable --now kibana
sudo systemctl restart kibana
sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml
# Install Logstash and enable it
sudo apt-get install -y logstash
sudo echo "-Xms1g" > /etc/logstash/jvm.options
sudo echo "-Xmx1g" >> /etc/logstash/jvm.options
sudo systemctl enable --now logstash
sudo systemctl restart logstash
SHELL
end
Ce fichier Vagrantfile va nous aider à provisionner la machine et de faire pointer dans les différents dossiers de configuration de la machine virtuelle nos configurations sur notre système hôte.
On initialise la machine avec vagrant puis on s’y connecte par ssh.
Maintenant suite à l’installation de snort, nous allons passer à sa configuration dans le shell interactif:
Ensuite on doit spécifier les interfaces réseaux sur lesquelles snort doit écouter les paquets.
Ici on choisit les trois interfaces disponibles sur la machine our écouter tous les paquets relatifs.
Snort est maintenant démarré sur la machine ! on peut le vérifier en inspectant son statut
Par défaut suricata utilise la méthode de traffic af-packet
pour capturer les packets.
Modifions la configuration pour mettre l’interface réseau physique de notre machine vituelle et permettre à suricata de capturer les packets.
Nous allons configurer deux interfaces :
Nous vérifions que suricata aussi tourne correctement sur la machine après l’avoir redémarré.
À cette étape, nous avons
snort
de configuré sur l’interface toutes les interfaces de notre machine etsuricata
de configuré sur les interfaces enp0s9 et enp0s8 pour capturer les packets du réseau interne à la machine hôte et du réseau externe local.
Nous avons défini la mémoire minimale et maximale de elasticsearch sur 1go
Cela peut entrainer des pertes de performances car nous sommes limités par la mémoire.
NB: Dans le vagrantfile la configurationa remplacé le wildcard des adresses ip pour permettre l’accès depuis l’extérieur à kibana; Cette option fut ici:
sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml
http://192.168.1.70:5601/
/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token --scope kibana
Entrons maintenant cette clé dans l’entrée de kibana
Lorsque la clé st entrée, kibana nous demande un code de vérification qui peut être obtenu en faisant
elastic
car on en avait pas spécifié dans l’installationPour réinitialiser le mot de passe ;
/usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic
Une fois que nous venons de terminer avec kibana nous allons maintenant passer à la configuration de logstash
Logstash utilise une configuration de pipeline pour spécifier les entrées les filtres et la sorties
Dans nous notre cas on a donc :
Entrées
: Logs de suricata, snort et d’une machine windowsFiltres
: Vont permettre le parsing des différents logs en données structurées pour notre instance elasticsearchSorties
: Ce sera notre instance elasticsearch et la sortie standard de la console afin de tester les grok (syntaxe de correspondance de modèle)./etc/logstash/conf.d/
input {
file {
path => "/var/log/snort/snort.alert.fast"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{MONTHNUM}/%{MONTHDAY}-%{TIME} %{DATA} \[%{DATA}\] \[%{DATA:signature}\] \[%{DATA}\] \[Priority: %{INT:priority}\] \{%{WORD:protocol}\} %{IP:source_address}:%{NUMBER:source_port} -> %{IP:destination_address}:%{NUMBER:destination_port}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
}
output {
elasticsearch {
index => "logstash-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "OctX*DH8WwORBTxnq88G"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
/etc/logstash/conf.d/
input {
file {
path => ["/var/log/suricata/eve.json"]
sincedb_path => "/var/lib/logstash/sincedb_suricata"
codec => json
type => "SuricataIDPS"
}
}
filter {
if [type] == "SuricataIDPS" {
date {
match => [ "timestamp", "ISO8601" ]
}
ruby {
code => "if event['event_type'] == 'fileinfo'; event['fileinfo']['type']=event['fileinfo']['magic'].to_s.split(',')[0]; end;"
}
}
if [src_ip] {
geoip {
source => "src_ip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
if ![geoip.ip] {
if [dest_ip] {
geoip {
source => "dest_ip"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
}
}
}
output {
elasticsearch {
index => "logstash-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "=M3wC=jl_ZsJogcQfTCH"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
codec => json
) : Ce filtre est utilisé pour interpréter les données JSON provenant du fichier de logs Suricata (/var/log/suricata/eve.json
). Cela permet à Logstash de comprendre la structure des données JSON et de les traiter correctement.if [type] == "SuricataIDPS"
) : Cette condition vérifie si le type de log est "SuricataIDPS". Si c'est le cas, les filtres à l'intérieur de cette condition seront appliqués aux logs correspondants.date
) : Ce filtre analyse et convertit les horodatages des logs au format ISO8601 en horodatages compatibles avec Elasticsearch. Cela permet une indexation et une recherche efficaces des événements basées sur le temps.ruby
) : Ce filtre utilise du code Ruby pour effectuer une manipulation personnalisée des données. Dans ce cas, il vérifie si le type d'événement est "fileinfo" et si c'est le cas, il extrait le type de fichier à partir de la clé "magic" et le place dans la clé "type" du champ "fileinfo".geoip
) : Ce filtre ajoute des informations de géolocalisation aux adresses IP présentes dans les logs. Il utilise la base de données GeoIP pour déterminer le pays, la ville et les coordonnées géographiques associées à chaque adresse IP. Les coordonnées géographiques sont ensuite ajoutées au champ "geoip.coordinates". Avant cela, le filtre vérifie si l'adresse IP source existe (src_ip
), et si elle n'existe pas, il vérifie l'adresse IP de destination (dest_ip
). Si une des deux adresses IP est trouvée, la géolocalisation est effectuée pour cette adresse.mutate
) : Ce filtre permet de modifier la structure des événements en ajoutant ou en modifiant des champs. Dans ce cas, il est utilisé pour convertir les coordonnées géographiques de chaînes de caractères en nombres décimaux, ce qui est nécessaire pour l'indexation correcte dans Elasticsearch.Pour effectuer les tests, nous allons retirer le plugin elasticsearch de la sortie et juste garder l’affichage en console.
Démarrons logstash avec la configuration de snort
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/snort-pipeline.conf
Nous voyons donc que les données sont bien structurées et correspondent à ce que l’on attend grâce à notre filtre.
Démarrons logstash avec la configuration de suricata
/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/suricata-pipeline.conf
Nous voyons donc que les données sont bien structurées et correspondent à ce que l’on attend grâce à notre filtre dans la pipeline de suricata.
Après un succès des tests pour vérifier la cohérence et la collecte des logs par logstash, on peut maintenant les envoyer dans notre instance elasticsearch.
## Pour snort
output {
elasticsearch {
index => "snort-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "OctX*DH8WwORBTxnq88G"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
## Pour suricata
output {
elasticsearch {
index => "suricata-%{+YYYY.MM.dd}"
hosts => ["https://localhost:9200"]
user => "elastic"
password => "OctX*DH8WwORBTxnq88G"
ssl => true
cacert => "/etc/elasticsearch/certs/http_ca.crt"
ssl_certificate_verification => true
manage_template => false
}
stdout { codec => rubydebug }
}
Kibana.`
de manière conviviale avec Kibana.
Pour créer des dashboards, on se rend dans l’onglet analytics puis Dashboards.
Pour l’occasion nous allons créér un tableau contenant quelques informations et des graphiques pour les logs de snort.
Ensuite nous avons ajouté deux graphes montrant les protocoles les plus utilisés et les adresses de destination les plus contactées sur le réseau.
Maintenant utilisons les données recueillies par logstash pour créer des visuels dans Kibana.
Ici nous avons un résumé des différentes communications sur le réseau. On a une proportion de l’utilisation des interfaces de la machine, l’utilisation des différents protocoles TCP/UDP et un aperçu des USER AGENTS dans un tableau associé à leur adresses IP.
En résumé ces différents graphiques permettent de voir en temps réel ce qui se passe sur le réseau contenant notre machine. Ces informations pourraient servir de base pour une analyse plus poussée dans le cas échéant.
Celà va nous permettre d’avoir à disposition un agent léger qui surveille les journaux Windows en temps réel, extrait les événements spécifiés dans sa configuration (comme les journaux d'applications, de sécurité ou de système), les transforme en JSON et les envoie vers une destination spécifiée (Elasticsearch dans notre cas) pour l'indexation et l'analyse ultérieure, contribuant ainsi à la surveillance et à la gestion centralisées des logs Windows.
Voici la configuration de winlogbeat pour récupérer les logs.
setup.template.name: "winlogbeat"
setup.template.pattern: "winlogbeat-*"
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
- name: System
- name: Security
- name: Microsoft-Windows-Sysmon/Operational
- name: Windows PowerShell
event_id: 400, 403, 600, 800
- name: Microsoft-Windows-PowerShell/Operational
event_id: 4103, 4104, 4105, 4106
- name: ForwardedEvents
tags: [forwarded]
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
hosts: ["localhost:5601"]
output.elasticsearch:
index: "winlogbeat-%{[agent.version]}-%{+yyyy.MM.dd}"
hosts: ["192.168.1.70:9200"]
protocol: "https"
username: "elastic"
password: "OctX*DH8WwORBTxnq88G"
ssl.certificate_authorities: ["C:\\Users\\vagrant\\Downloads\\winlogbeat-8.12.2-windows-x86_64\\ca.cert"]
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
Nous avons défini la sortie sur elasticsearch. Ainsi nous pouvons récupérer les données et créer des dashboards dans kibana. Nous laissons la configuration des évenements pa défaut ou un essai de test.
Exécution de winlogbeats sous windows en invite de commande :
./winlogbeats.exe -c winlogbeats.yml -e
Nous récupérons biens les logs dans ElasticSearch
Configurons maintenant une source de données pour les vues
Stay in the loop with my latest projects and insights! Follow me on Twitter to catch all the updates as they happen. Don't miss out on the journey – let's connect and explore the world of tech together. Click to follow now!