Сбор журналов с Mikrotik в Elasticsearch

Не так давно я настроил сбор журналов с Mikrotik в Elasticsearch. Для того, чтобы не забыть всю последовательность шагов я запротоколирую все действия, которые я выполнял. В этом решении есть еще места, где нужно выполнить ряд доработок и оптимизаций – я отдельно выдели эти моменты. Если доберусь до них сам, то обновлю материал публикации.

Это не полностью мое решение. За основу я взял вот эту публикацию.

Схема решения

Логическая схеме решения приведена ниже.

Схема ниже отображает физическую топологию решения.

Подготовка Elasticsearch

Выполним подготовку на стороне Elasticsearch.

1. Сначала я создам отдельную роль через Dev Tools для работы с индексами веб серверов:

https://192.168.1.3:5601/app/dev_tools

2. Для этого я выполню следующий запрос:

POST _security/role/mikrotik_logs_role
{
  "cluster": ["manage_index_templates", "monitor", "manage_ilm"], 
  "indices": [
    {
      "names": [ "mikrotik-*" ], 
      "privileges": ["write","create","create_index","manage","manage_ilm"]  
    }
  ]
}

3. Теперь я создам отдельного пользователя:

POST _security/user/mikrotik_user
{
  "password" : "Qwerty123",
  "roles" : [ "mikrotik_logs_role"],
  "full_name" : "Logstash User for store logs from Mikrotik devices"
}

4. Заключительным шагом я создам шаблон индекса:

POST _template/netdev
{  
      "index_patterns" : [  
        "mikrotik-*"  
      ],  
      "settings" : {  
        "index" : {  
        "codec" : "best_compression",  
        "refresh_interval" : "5s",  
        "number_of_shards" : "1",  
        "number_of_replicas" : "0"  
        }  
      },  
      "mappings" : {  
        "numeric_detection" : true,  
        "dynamic_templates" : [  
          {  
          "string_fields" : {  
            "mapping" : {  
            "type" : "keyword"  
            },  
            "match_mapping_type" : "string",  
            "match" : "*"  
          }  
          }  
        ],  
        "properties" : {  
          "@version" : {  
          "type" : "keyword"  
          },  
          "@timestamp" : {  
          "type" : "date"  
          },  
          "date" : {  
          "type" : "keyword"  
          },  
          "time" : {  
          "type" : "keyword"  
          },  
          "topic1" : {  
          "type" : "keyword"  
          },  
          "topic2" : {  
          "type" : "keyword"  
          },  
          "topic3" : {  
          "type" : "keyword"  
          },  
          "item" : {  
          "type" : "text"  
          },  
          "action" : {  
          "type" : "text"  
          },  
          "user" : {  
          "type" : "keyword"  
          },  
          "host" : {  
          "type" : "keyword"  
          },  
          "method" : {  
          "type" : "keyword"  
          },  
          "local_address" : {  
          "type" : "ip"  
          },  
          "remote_address" : {  
          "type" : "ip"  
          },  
          "acquired_ip" : {  
          "type" : "ip"  
          },  
          "released_ip" : {  
          "type" : "ip"  
          },  
          "address_pool" : {  
          "type" : "keyword"  
          },  
          "mac_address" : {  
          "type" : "keyword"  
          },  
          "interface" : {  
          "type" : "keyword"  
          },  
          "link_state" : {  
          "type" : "keyword"  
          },  
          "signal_strength" : {  
          "type" : "byte"  
          },  
          "disconnect_reason" : {  
          "type" : "text"  
          },  
          "chain" : {  
          "type" : "keyword"  
          },  
          "in_interface" : {  
          "type" : "keyword"  
          },  
          "out_interface" : {  
          "type" : "keyword"  
          },  
          "protocol" : {  
          "type" : "keyword"  
          },  
          "src_port" : {  
          "type" : "keyword"  
          },  
          "dst_port" : {  
          "type" : "keyword"  
          },  
          "length" : {  
          "type" : "short"  
          },  
          "ap_ssid" : {  
          "type" : "keyword"  
          },  
          "wifi_state" : {  
          "type" : "keyword"  
          }  
        }  
      },  
      "aliases" : { }  
      }  

Вместо шаблона индекса рекомендуется использовать composable index templates, а не Legacy index templates как в моем примере выше. Этот момент мне тоже нужно будет доработать.

Предварительная подготовка на стороне Elasticsearch завершена.

Подготовка Logstash

Приступим к подготовке сервера Logstash:

1. Сначала я создам файл с шаблона сообщений Mikrotik:

nano /etc/logstash/custom-patterns/mikrotik
MIKROTIK_DATE \b(?:jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|aug(?:ust)?|sep(?:tember)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)\b\/(?>
MIKROTIK_TIME (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
MIKROTIK_TOPIC wireless|info|ipsec|interface|error|dhcp|system|account|critical
MIKROTIK_FAILED_PROPOSAL failed\ to\ get\ valid\ proposal|failed\ to\ pre\-process\ ph1\ packet|phase1\ negotiation\ failed|no\ suitable\ proposal\ found
MIKROTIK_PEER_NOT_COMPLIANT Unity\ mode\ config\ request\ but\ the\ peer\ did\ not\ declare\ itself\ as\ \ unity\ compliant
MIKROTIK_PACKET_RETRANSMISSION packet\ is\ retransmitted
MIKROTIK_TRAFFIC_FLOW traffic\ flow\ target\ removed
MIKROTIK_ACQUIRED_IP assigned|acquired
MIKROTIK_RELEASED_IP released|releasing address|deassigned
MIKROTIK_DISCO_REASON extensive\ data\ loss|group\ key\ exchange\ timeout|received\ deauth:\ unspecified\ \(1\)|received\ disassoc:\ sending station\ leaving\ \(8\)|o>
MIKROTIK_WIFI_STATE reassociating

В файле предусмотрены не все шаблоны, которые могут быть в журналах. Часть сообщений может быть не разбита на отдельные составляющие. При необходимости дополните файл нужными вам шаблонами.

2. Теперь я добавлю файл конфигурации для Logstash:

nano /etc/logstash/conf.d/netdev.conf
# Input will be the tcp port specified, mikrotik config will be shown later.  
input {  
  tcp {  
   port => 5514  
   tags => ["mikrotik"]  
  }  
  udp {  
   port => 5514  
   tags => ["mikrotik"]  
  }  
}  

# the tag mikrotik-log is added by the input  
filter {  
  if "mikrotik" in [tags] {  
   grok {  
    tag_on_failure => "_grokparsefailure_mikrotik_log"  
    patterns_dir => "/etc/logstash/custom-patterns/" 
    match => [  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:item} %{DATA:action} by %{DATA:user}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) user %{DATA:user} %{GREEDYDATA:action} from %{IP:[host]} via %{DATA:method}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user %{DATA:user} from %{IP:[host]} via %{DATA:method}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user: %{DATA:user}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} \(Identity Protection\): %{IP:local_address}%{GREEDYDATA}%{IP:remote_address}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}\[%{GREEDYDATA}\]-%{IP:remote_address}\[%{GREEDYDATA}\] spi:%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} address for %{IP:remote_address}\[%{GREEDYDATA}\]$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}\[%{GREEDYDATA}\]<=>%{IP:remote_address}\[%{GREEDYDATA}\] spi=%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} %{IP:released_ip} $",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{IP:remote_address} %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_PEER_NOT_COMPLIANT:action}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) the %{MIKROTIK_PACKET_RETRANSMISSION:action} by %{IP:remote_address}\[%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:interface} link %{GREEDYDATA:link_state}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_TRAFFIC_FLOW:action} by %{DATA:user}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} to %{DATA:mac_address}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_RELEASED_IP:action} %{IP:released_ip} from %{DATA:mac_address} $",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, signal strength %{INT:signal_strength}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, %{MIKROTIK_DISCO_REASON:disconnect_reason}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{MIKROTIK_WIFI_STATE:wifi_state} $",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol}, %{IP:local_address}:%{INT:src_port}->%{IP:remote_address}:%{INT:dst_port}, len %{INT:length}%{GREEDYDATA}$",  
      "message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol} \(%GREEDYDATA}\), %{IP:local_address}->%{IP:remote_address}, len %{INT:length}%{GREEDYDATA}$"  
    ]  
   }
   ruby {
        code => '
            event.get("host").each { |k, v|
                event.set("host",v)
            }
            event.remove("[host][ip]")
        '
    }
  }  
 }  
# output to all elasticsearch hosts  
output {  
  if "mikrotik" in [tags] {  
  elasticsearch {  
   hosts => ["https://192.168.1.2:9200"]
   user => "mikrotik_user"
   password => "Qwerty123"
   ssl => true
   ssl_verification_mode => "none"
   index => "mikrotik-%{+YYYY.MM}"
   }  
}  
}

Не забудьте добавить исключения в брандмауэр, если он используется на сервере.

Настройка Mikrotik

На стороне Mikrotik выполним настройку адреса сервера для отправки сообщений журналов. В качестве адреса сервера необходимо указать адрес сервера с Logstash:

system logging action add name=logstash target=remote remote=192.168.1.3 remote-port=5514 bsd-syslog=no syslog-time-format=bsd-syslog syslog-facility=daemon syslog-severity=auto

Теперь укажем какие события нужно отправлять на удаленный сервер:

system logging add topics=info action=logstash
system logging add topics=warning action=logstash
system logging add topics=error action=logstash
system logging add topics=critical action=logstash

Проверка

Осталось проверить поступление событий в индексы Logstash.

Теперь через Dev Tools я запрошу данные из индекса:

GET netdev-*/_search
{
  "query": {
    "match_all": {}
  },
  "track_total_hits": true,
  "size": 1, 
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ]
}

Результат запроса:

      {
        "_index": "netdev-2024.07",
        "_id": "Z_mRhZABf5mKjeHGHhVd",
        "_score": null,
        "_source": {
          "@timestamp": "2024-07-06T01:03:18.519495280Z",
          "item": "layer 7 protocol",
          "action": "changed",
          "@version": "1",
          "event": {
            "original": "system,info layer 7 protocol changed by admin"
          },
          "host": "192.168.1.1",
          "tags": [
            "mikrotik"
          ],
          "topic1": "system",
          "topic2": "info",
          "message": "system,info layer 7 protocol changed by admin",
          "user": "admin"
        },
        "sort": [
          1720227798519
        ]
      }

Если что-то не работает

Сбор журналов с Mikrotik в Elasticsearch заработал у меня не с первого раза. Пришлосб немного провести дебаг. Что я делал:

1. Убедитесь, что сообщения вообще доходят до сервера с Logstash. Я для этого использовал tcpdump (только замените имя интерфейса и порт на ваши значения):

tcpdump -vv -i ens18 port 5514 and udp

2. Еще можно попробовать провести дебаг на стороне сервиса Logstash:

sudo journalctl -fu logstash.service

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *