Blog mise-en-place-delk
securityMARCH 03, 2024

Mise en place d'ELK : ElasticSearch, Logstash et Kibana

kraaakilo's avatar

Mise en place de la stack ELK

La stack ELK est un ensemble d'outils puissants pour la gestion des logs et l'analyse des données. Composée d'Elasticsearch pour la recherche et l'indexation, Logstash pour la collecte, l'agrégation et le traitement des logs, et Kibana pour la visualisation et l'analyse des données, la stack ELK offre une solution complète pour gérer les informations des systèmes.

Commandes d’installation

Dans la suite nous allons utiliser vagrant pour créer une machine virtuelle et installer les différents outils nécessaires. Cette approche es utilisée afin d’abstraire l’inrastructure et de fonctionner avec une IaaC.

Aperçu du Vagrantfile

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu/jammy64"

  config.vm.network "private_network", ip: "192.168.56.10"
  config.vm.network "public_network", bridge: "wlan0", ip: "192.168.1.70"
  
  config.vm.synced_folder ".", "/vagrant", disabled: false
  config.vm.synced_folder "./pipelines", "/etc/logstash/conf.d", disabled: false

  config.vm.provider "virtualbox" do |vb|
    vb.gui = false
    vb.memory = "5192"
  end

  config.vm.provision "shell", inline: <<-SHELL

    # Update the system  
    apt-get update

    # Add the Elastic GPG key and repository
    wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
    sudo apt-get install -y apt-transport-https
    echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list
    sudo apt-get update

    # Install Elasticsearch and enable it 
    sudo apt-get install -y elasticsearch
    sudo echo "-Xms1024m" > /etc/elasticsearch/jvm.options
    sudo echo "-Xmx1024m" >> /etc/elasticsearch/jvm.options
    sudo systemctl enable --now elasticsearch
    sudo systemctl restart elasticsearch
    
    # Install Kibana and enable it
    sudo apt-get install -y kibana
    sudo echo "-Xms512m" > /etc/kibana/jvm.options
    sudo echo "-Xmx512m" >> /etc/kibana/jvm.options
    sudo systemctl enable --now kibana
    sudo systemctl restart kibana
    sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
    sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml

    # Install Logstash and enable it
    sudo apt-get install -y logstash
    sudo echo "-Xms1g" > /etc/logstash/jvm.options
    sudo echo "-Xmx1g" >> /etc/logstash/jvm.options
    sudo systemctl enable --now logstash
    sudo systemctl restart logstash

  SHELL
end

Ce fichier Vagrantfile va nous aider à provisionner la machine et de faire pointer dans les différents dossiers de configuration de la machine virtuelle nos configurations sur notre système hôte.

On initialise la machine avec vagrant puis on s’y connecte par ssh.

Untitled

Installation et activation au démarrage de snort

Untitled

Maintenant suite à l’installation de snort, nous allons passer à sa configuration dans le shell interactif:

  • On choisit la méthode boot pour démarrer snort à chaque démarrage. Cela évitera de redémarrer le service à chaque fois.

Untitled

  • Ensuite on doit spécifier les interfaces réseaux sur lesquelles snort doit écouter les paquets.

    Ici on choisit les trois interfaces disponibles sur la machine our écouter tous les paquets relatifs.

Untitled

  • Maintenant nous devons configurer l’annotation CIDR de la plage d’adresses pour spécifier à snort les adresses IP concernées par son analyse. ICI on choisit le 192.168.0.0/16 (65534 adresses incluses.)

Untitled

  • Maintenant nous allons activer le mode promiscuous de snort. Cela lui permet de surveiller tout le trafic sur le segment réseau auquel il est connecté, lui permettant d'analyser les paquets et de détecter toute activité malveillante ou anomalie réseau.

Untitled

  • Pour terminer la configuration de snort nous allons désactiver l’envoi de mails contenant les logs des captures de packets étant donné qu’on veut effectuer la gestion avec kibana et elasticsearch

Untitled

Snort est maintenant démarré sur la machine ! on peut le vérifier en inspectant son statut

Untitled

Installation et activation au démarrage de suricata

Untitled

Activation au démarrage de suricata

Untitled

  • Par défaut suricata utilise la méthode de traffic af-packet pour capturer les packets.

    Modifions la configuration pour mettre l’interface réseau physique de notre machine vituelle et permettre à suricata de capturer les packets.

    Nous allons configurer deux interfaces :

    • Le réeau interne de la machine hôte : enp0s8
    • Le réseau local principal : enp0s9

    Untitled

  • Nous vérifions que suricata aussi tourne correctement sur la machine après l’avoir redémarré.

Untitled

À cette étape, nous avons snort de configuré sur l’interface toutes les interfaces de notre machine et suricata de configuré sur les interfaces enp0s9 et enp0s8 pour capturer les packets du réseau interne à la machine hôte et du réseau externe local.

Configuration et démarrage de elasticsearch

Nous avons défini la mémoire minimale et maximale de elasticsearch sur 1go

Untitled

Cela peut entrainer des pertes de performances car nous sommes limités par la mémoire.

  • Elasticsearch est bien en marche sur la machine

Untitled

Configuration de Kibana

  • Mémoire maximale configurée à 512m

Untitled

  • Redémarrage du service et vérification du status

Untitled

NB: Dans le vagrantfile la configurationa remplacé le wildcard des adresses ip pour permettre l’accès depuis l’extérieur à kibana; Cette option fut ici:

    sed -i 's|#server.host: "localhost"|server.host: "0.0.0.0"|g' /etc/kibana/kibana.yml
    sed -i 's|#server.port: 5601|server.port: 5601|g' /etc/kibana/kibana.yml

Untitled

  • Génération du token d’enrollement depuis elasticsearch
/usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token --scope kibana

Untitled

  • Entrons maintenant cette clé dans l’entrée de kibana

    Lorsque la clé st entrée, kibana nous demande un code de vérification qui peut être obtenu en faisant

Untitled

Untitled

  • Le code de vérification marche et kibana commence sa configuration automatique

Untitled

  • Maintenant il faut se connecter à l’instance de kibana. Pour cela nous allons réinitialiser le mot de passe du superuser elastic car on en avait pas spécifié dans l’installation

Untitled

Pour réinitialiser le mot de passe ;

/usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic

Untitled

  • Connexion à kibana avec les identifiants

Untitled

Untitled

  • Une fois que nous venons de terminer avec kibana nous allons maintenant passer à la configuration de logstash

    Logstash utilise une configuration de pipeline pour spécifier les entrées les filtres et la sorties

    Dans nous notre cas on a donc :

    • Entrées : Logs de suricata, snort et d’une machine windows
    • Filtres : Vont permettre le parsing des différents logs en données structurées pour notre instance elasticsearch
    • Sorties : Ce sera notre instance elasticsearch et la sortie standard de la console afin de tester les grok (syntaxe de correspondance de modèle).

Pipeline Logstash de snort

  • Contenu de snort-pipeline.conf dans /etc/logstash/conf.d/
input {
    file {
        path => "/var/log/snort/snort.alert.fast"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    grok {
        match => { "message" => "%{MONTHNUM}/%{MONTHDAY}-%{TIME} %{DATA} \[%{DATA}\] \[%{DATA:signature}\] \[%{DATA}\] \[Priority: %{INT:priority}\] \{%{WORD:protocol}\} %{IP:source_address}:%{NUMBER:source_port} -> %{IP:destination_address}:%{NUMBER:destination_port}" }
    }
    
    date {
        match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
        target => "@timestamp"
    }
}

output {
    elasticsearch {
        index => "logstash-%{+YYYY.MM.dd}"
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "OctX*DH8WwORBTxnq88G"
        ssl => true
        cacert => "/etc/elasticsearch/certs/http_ca.crt"
        ssl_certificate_verification => true
        manage_template => false
    }
    stdout { codec => rubydebug }
}

  1. input: Définit la source des données à traiter, dans ce cas, un fichier log Snort.
  2. filter: Spécifie les étapes de transformation des données avant de les envoyer à la sortie. Dans ce cas, il utilise le plugin Grok pour extraire des champs de données et le plugin date pour analyser les horodatages.
  3. output: Indique où envoyer les données traitées, ici à Elasticsearch pour l'indexation. Il envoie également une copie des données vers la sortie standard (stdout) pour le débogage.

Pipeline Logstash de suricata

  • Contenu de suricata-pipeline.conf dans /etc/logstash/conf.d/
input {
  file { 
    path => ["/var/log/suricata/eve.json"]
    sincedb_path => "/var/lib/logstash/sincedb_suricata"
    codec =>   json 
    type => "SuricataIDPS" 
  }

}

filter {
  if [type] == "SuricataIDPS" {
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    ruby {
      code => "if event['event_type'] == 'fileinfo'; event['fileinfo']['type']=event['fileinfo']['magic'].to_s.split(',')[0]; end;" 
    }
  }

  if [src_ip]  {
    geoip {
      source => "src_ip" 
      target => "geoip" 
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
    mutate {
      convert => [ "[geoip][coordinates]", "float" ]
    }
    if ![geoip.ip] {
      if [dest_ip]  {
        geoip {
          source => "dest_ip" 
          target => "geoip" 
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
        mutate {
          convert => [ "[geoip][coordinates]", "float" ]
        }
      }
    }
  }
}

output {
  elasticsearch {
    index => "logstash-%{+YYYY.MM.dd}"
    hosts => ["https://localhost:9200"]
    user => "elastic"
    password => "=M3wC=jl_ZsJogcQfTCH"
    ssl => true
    cacert => "/etc/elasticsearch/certs/http_ca.crt"
    ssl_certificate_verification => true
    manage_template => false
  }
  stdout { codec => rubydebug }
}

  1. Filtre JSON (codec => json) : Ce filtre est utilisé pour interpréter les données JSON provenant du fichier de logs Suricata (/var/log/suricata/eve.json). Cela permet à Logstash de comprendre la structure des données JSON et de les traiter correctement.
  2. Condition de type (if [type] == "SuricataIDPS") : Cette condition vérifie si le type de log est "SuricataIDPS". Si c'est le cas, les filtres à l'intérieur de cette condition seront appliqués aux logs correspondants.
  3. Filtre Date (date) : Ce filtre analyse et convertit les horodatages des logs au format ISO8601 en horodatages compatibles avec Elasticsearch. Cela permet une indexation et une recherche efficaces des événements basées sur le temps.
  4. Filtre Ruby (ruby) : Ce filtre utilise du code Ruby pour effectuer une manipulation personnalisée des données. Dans ce cas, il vérifie si le type d'événement est "fileinfo" et si c'est le cas, il extrait le type de fichier à partir de la clé "magic" et le place dans la clé "type" du champ "fileinfo".
  5. Filtre GeoIP (geoip) : Ce filtre ajoute des informations de géolocalisation aux adresses IP présentes dans les logs. Il utilise la base de données GeoIP pour déterminer le pays, la ville et les coordonnées géographiques associées à chaque adresse IP. Les coordonnées géographiques sont ensuite ajoutées au champ "geoip.coordinates". Avant cela, le filtre vérifie si l'adresse IP source existe (src_ip), et si elle n'existe pas, il vérifie l'adresse IP de destination (dest_ip). Si une des deux adresses IP est trouvée, la géolocalisation est effectuée pour cette adresse.
  6. Filtre Mutate (mutate) : Ce filtre permet de modifier la structure des événements en ajoutant ou en modifiant des champs. Dans ce cas, il est utilisé pour convertir les coordonnées géographiques de chaînes de caractères en nombres décimaux, ce qui est nécessaire pour l'indexation correcte dans Elasticsearch.

Tests pour vérifier la collecte, le stockage et la visualisation des logs.

Pour effectuer les tests, nous allons retirer le plugin elasticsearch de la sortie et juste garder l’affichage en console.

Tests avec snort

Démarrons logstash avec la configuration de snort

/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/snort-pipeline.conf

Untitled

Nous voyons donc que les données sont bien structurées et correspondent à ce que l’on attend grâce à notre filtre.

Tests avec suricata

Démarrons logstash avec la configuration de suricata

/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/suricata-pipeline.conf
  • Exemple de paquets capturés par suricata

Untitled

Nous voyons donc que les données sont bien structurées et correspondent à ce que l’on attend grâce à notre filtre dans la pipeline de suricata.

Indexation des données dans elasticsearch.

Après un succès des tests pour vérifier la cohérence et la collecte des logs par logstash, on peut maintenant les envoyer dans notre instance elasticsearch.

  • Modifions l’output de chacun des logs afin de s’y retrouver rapidement lors de la réalisation des dashboards kibana
## Pour snort 
output {
    elasticsearch {
        index => "snort-%{+YYYY.MM.dd}"
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "OctX*DH8WwORBTxnq88G"
        ssl => true
        cacert => "/etc/elasticsearch/certs/http_ca.crt"
        ssl_certificate_verification => true
        manage_template => false
    }
    stdout { codec => rubydebug }
}

## Pour suricata
output {
    elasticsearch {
        index => "suricata-%{+YYYY.MM.dd}"
        hosts => ["https://localhost:9200"]
        user => "elastic"
        password => "OctX*DH8WwORBTxnq88G"
        ssl => true
        cacert => "/etc/elasticsearch/certs/http_ca.crt"
        ssl_certificate_verification => true
        manage_template => false
    }
    stdout { codec => rubydebug }
}

Index créés par logstash dans elasticsearch

Untitled

`Les logs collectés sont indexés et disponibles pour la visualisation dans

Kibana.`

Création des tableaux de bord et de visualisations pour afficher les données de logs

de manière conviviale avec Kibana.

Pour créer des dashboards, on se rend dans l’onglet analytics puis Dashboards.

Untitled

  • Ensuite on entre un pattern (dans notre cas : snort-*); Cela va alors associer à notre dashboard tous les index correspondants.

Untitled

  • Ensuite on pourra configurer les différentes visualisations à notre guise pour afficher les données.

CRÉATION DES VISUALISATIONS POUR SNORT

Untitled

Pour l’occasion nous allons créér un tableau contenant quelques informations et des graphiques pour les logs de snort.

Untitled

Ensuite nous avons ajouté deux graphes montrant les protocoles les plus utilisés et les adresses de destination les plus contactées sur le réseau.

Untitled

CRÉATION DES VISUALISATIONS POUR SURICATA

Maintenant utilisons les données recueillies par logstash pour créer des visuels dans Kibana.

Untitled

Ici nous avons un résumé des différentes communications sur le réseau. On a une proportion de l’utilisation des interfaces de la machine, l’utilisation des différents protocoles TCP/UDP et un aperçu des USER AGENTS dans un tableau associé à leur adresses IP.

En résumé ces différents graphiques permettent de voir en temps réel ce qui se passe sur le réseau contenant notre machine. Ces informations pourraient servir de base pour une analyse plus poussée dans le cas échéant.

Récupération des logs d’une machine Windows.

Dans cette section nous allons utiliser l’outil Winlogbeat de la suite Elastic.

Celà va nous permettre d’avoir à disposition un agent léger qui surveille les journaux Windows en temps réel, extrait les événements spécifiés dans sa configuration (comme les journaux d'applications, de sécurité ou de système), les transforme en JSON et les envoie vers une destination spécifiée (Elasticsearch dans notre cas) pour l'indexation et l'analyse ultérieure, contribuant ainsi à la surveillance et à la gestion centralisées des logs Windows.

Installation de Winlogbeat

Untitled

Voici la configuration de winlogbeat pour récupérer les logs.

setup.template.name: "winlogbeat"
setup.template.pattern: "winlogbeat-*"

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
  - name: System
  - name: Security
  - name: Microsoft-Windows-Sysmon/Operational
  - name: Windows PowerShell
    event_id: 400, 403, 600, 800
  - name: Microsoft-Windows-PowerShell/Operational
    event_id: 4103, 4104, 4105, 4106
  - name: ForwardedEvents
    tags: [forwarded]

setup.template.settings:
  index.number_of_shards: 1

setup.kibana:
  hosts: ["localhost:5601"]

output.elasticsearch:
  index: "winlogbeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  hosts: ["192.168.1.70:9200"]
  protocol: "https"
  username: "elastic"
  password: "OctX*DH8WwORBTxnq88G"  
  ssl.certificate_authorities: ["C:\\Users\\vagrant\\Downloads\\winlogbeat-8.12.2-windows-x86_64\\ca.cert"]

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~

Nous avons défini la sortie sur elasticsearch. Ainsi nous pouvons récupérer les données et créer des dashboards dans kibana. Nous laissons la configuration des évenements pa défaut ou un essai de test.

Untitled

Exécution de winlogbeats sous windows en invite de commande :

./winlogbeats.exe -c winlogbeats.yml -e

Untitled

Nous récupérons biens les logs dans ElasticSearch

Création d’un dashboard avec kibana

Configurons maintenant une source de données pour les vues

Untitled

Let's connect

Stay in the loop with my latest projects and insights! Follow me on Twitter to catch all the updates as they happen. Don't miss out on the journey – let's connect and explore the world of tech together. Click to follow now!