Не так давно я настроил сбор журналов с Mikrotik в Elasticsearch. Для того, чтобы не забыть всю последовательность шагов я запротоколирую все действия, которые я выполнял. В этом решении есть еще места, где нужно выполнить ряд доработок и оптимизаций – я отдельно выдели эти моменты. Если доберусь до них сам, то обновлю материал публикации.
Это не полностью мое решение. За основу я взял вот эту публикацию.
Схема решения
Логическая схеме решения приведена ниже.
Схема ниже отображает физическую топологию решения.
Подготовка Elasticsearch
Выполним подготовку на стороне Elasticsearch.
1. Сначала я создам отдельную роль через Dev Tools для работы с индексами веб серверов:
https://192.168.1.3:5601/app/dev_tools
2. Для этого я выполню следующий запрос:
POST _security/role/mikrotik_logs_role
{
"cluster": ["manage_index_templates", "monitor", "manage_ilm"],
"indices": [
{
"names": [ "mikrotik-*" ],
"privileges": ["write","create","create_index","manage","manage_ilm"]
}
]
}
3. Теперь я создам отдельного пользователя:
POST _security/user/mikrotik_user
{
"password" : "Qwerty123",
"roles" : [ "mikrotik_logs_role"],
"full_name" : "Logstash User for store logs from Mikrotik devices"
}
4. Заключительным шагом я создам шаблон индекса:
POST _template/netdev
{
"index_patterns" : [
"mikrotik-*"
],
"settings" : {
"index" : {
"codec" : "best_compression",
"refresh_interval" : "5s",
"number_of_shards" : "1",
"number_of_replicas" : "0"
}
},
"mappings" : {
"numeric_detection" : true,
"dynamic_templates" : [
{
"string_fields" : {
"mapping" : {
"type" : "keyword"
},
"match_mapping_type" : "string",
"match" : "*"
}
}
],
"properties" : {
"@version" : {
"type" : "keyword"
},
"@timestamp" : {
"type" : "date"
},
"date" : {
"type" : "keyword"
},
"time" : {
"type" : "keyword"
},
"topic1" : {
"type" : "keyword"
},
"topic2" : {
"type" : "keyword"
},
"topic3" : {
"type" : "keyword"
},
"item" : {
"type" : "text"
},
"action" : {
"type" : "text"
},
"user" : {
"type" : "keyword"
},
"host" : {
"type" : "keyword"
},
"method" : {
"type" : "keyword"
},
"local_address" : {
"type" : "ip"
},
"remote_address" : {
"type" : "ip"
},
"acquired_ip" : {
"type" : "ip"
},
"released_ip" : {
"type" : "ip"
},
"address_pool" : {
"type" : "keyword"
},
"mac_address" : {
"type" : "keyword"
},
"interface" : {
"type" : "keyword"
},
"link_state" : {
"type" : "keyword"
},
"signal_strength" : {
"type" : "byte"
},
"disconnect_reason" : {
"type" : "text"
},
"chain" : {
"type" : "keyword"
},
"in_interface" : {
"type" : "keyword"
},
"out_interface" : {
"type" : "keyword"
},
"protocol" : {
"type" : "keyword"
},
"src_port" : {
"type" : "keyword"
},
"dst_port" : {
"type" : "keyword"
},
"length" : {
"type" : "short"
},
"ap_ssid" : {
"type" : "keyword"
},
"wifi_state" : {
"type" : "keyword"
}
}
},
"aliases" : { }
}
Вместо шаблона индекса рекомендуется использовать composable index templates, а не Legacy index templates как в моем примере выше. Этот момент мне тоже нужно будет доработать.
Предварительная подготовка на стороне Elasticsearch завершена.
Подготовка Logstash
Приступим к подготовке сервера Logstash:
1. Сначала я создам файл с шаблона сообщений Mikrotik:
nano /etc/logstash/custom-patterns/mikrotik
MIKROTIK_DATE \b(?:jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|aug(?:ust)?|sep(?:tember)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)\b\/(?>
MIKROTIK_TIME (?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
MIKROTIK_TOPIC wireless|info|ipsec|interface|error|dhcp|system|account|critical
MIKROTIK_FAILED_PROPOSAL failed\ to\ get\ valid\ proposal|failed\ to\ pre\-process\ ph1\ packet|phase1\ negotiation\ failed|no\ suitable\ proposal\ found
MIKROTIK_PEER_NOT_COMPLIANT Unity\ mode\ config\ request\ but\ the\ peer\ did\ not\ declare\ itself\ as\ \ unity\ compliant
MIKROTIK_PACKET_RETRANSMISSION packet\ is\ retransmitted
MIKROTIK_TRAFFIC_FLOW traffic\ flow\ target\ removed
MIKROTIK_ACQUIRED_IP assigned|acquired
MIKROTIK_RELEASED_IP released|releasing address|deassigned
MIKROTIK_DISCO_REASON extensive\ data\ loss|group\ key\ exchange\ timeout|received\ deauth:\ unspecified\ \(1\)|received\ disassoc:\ sending station\ leaving\ \(8\)|o>
MIKROTIK_WIFI_STATE reassociating
В файле предусмотрены не все шаблоны, которые могут быть в журналах. Часть сообщений может быть не разбита на отдельные составляющие. При необходимости дополните файл нужными вам шаблонами.
2. Теперь я добавлю файл конфигурации для Logstash:
nano /etc/logstash/conf.d/netdev.conf
# Input will be the tcp port specified, mikrotik config will be shown later.
input {
tcp {
port => 5514
tags => ["mikrotik"]
}
udp {
port => 5514
tags => ["mikrotik"]
}
}
# the tag mikrotik-log is added by the input
filter {
if "mikrotik" in [tags] {
grok {
tag_on_failure => "_grokparsefailure_mikrotik_log"
patterns_dir => "/etc/logstash/custom-patterns/"
match => [
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:item} %{DATA:action} by %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) user %{DATA:user} %{GREEDYDATA:action} from %{IP:[host]} via %{DATA:method}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user %{DATA:user} from %{IP:[host]} via %{DATA:method}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} for user: %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} \(Identity Protection\): %{IP:local_address}%{GREEDYDATA}%{IP:remote_address}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}\[%{GREEDYDATA}\]-%{IP:remote_address}\[%{GREEDYDATA}\] spi:%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} address for %{IP:remote_address}\[%{GREEDYDATA}\]$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action}%{IP:local_address}\[%{GREEDYDATA}\]<=>%{IP:remote_address}\[%{GREEDYDATA}\] spi=%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{GREEDYDATA:action} %{IP:released_ip} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{IP:remote_address} %{MIKROTIK_FAILED_PROPOSAL:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_PEER_NOT_COMPLIANT:action}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) the %{MIKROTIK_PACKET_RETRANSMISSION:action} by %{IP:remote_address}\[%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:interface} link %{GREEDYDATA:link_state}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{MIKROTIK_TRAFFIC_FLOW:action} by %{DATA:user}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_ACQUIRED_IP:action} %{IP:acquired_ip} to %{DATA:mac_address}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:address_pool} %{MIKROTIK_RELEASED_IP:action} %{IP:released_ip} from %{DATA:mac_address} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, signal strength %{INT:signal_strength}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{DATA:action}, %{MIKROTIK_DISCO_REASON:disconnect_reason}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:mac_address}@%{DATA:ap_ssid}: %{MIKROTIK_WIFI_STATE:wifi_state} $",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol}, %{IP:local_address}:%{INT:src_port}->%{IP:remote_address}:%{INT:dst_port}, len %{INT:length}%{GREEDYDATA}$",
"message", "(%{MIKROTIK_TOPIC:topic1}(,%{MIKROTIK_TOPIC:topic2}(,%{MIKROTIK_TOPIC:topic3}?)?)?) %{DATA:chain}: in:%{DATA:in_interface} out:%{GREEDYDATA:out_interface}, src-mac %{DATA:mac_address}, proto %{DATA:protocol} \(%GREEDYDATA}\), %{IP:local_address}->%{IP:remote_address}, len %{INT:length}%{GREEDYDATA}$"
]
}
ruby {
code => '
event.get("host").each { |k, v|
event.set("host",v)
}
event.remove("[host][ip]")
'
}
}
}
# output to all elasticsearch hosts
output {
if "mikrotik" in [tags] {
elasticsearch {
hosts => ["https://192.168.1.2:9200"]
user => "mikrotik_user"
password => "Qwerty123"
ssl => true
ssl_verification_mode => "none"
index => "mikrotik-%{+YYYY.MM}"
}
}
}
Не забудьте добавить исключения в брандмауэр, если он используется на сервере.
Настройка Mikrotik
На стороне Mikrotik выполним настройку адреса сервера для отправки сообщений журналов. В качестве адреса сервера необходимо указать адрес сервера с Logstash:
system logging action add name=logstash target=remote remote=192.168.1.3 remote-port=5514 bsd-syslog=no syslog-time-format=bsd-syslog syslog-facility=daemon syslog-severity=auto
Теперь укажем какие события нужно отправлять на удаленный сервер:
system logging add topics=info action=logstash
system logging add topics=warning action=logstash
system logging add topics=error action=logstash
system logging add topics=critical action=logstash
Проверка
Осталось проверить поступление событий в индексы Logstash.
Теперь через Dev Tools я запрошу данные из индекса:
GET netdev-*/_search
{
"query": {
"match_all": {}
},
"track_total_hits": true,
"size": 1,
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
Результат запроса:
{
"_index": "netdev-2024.07",
"_id": "Z_mRhZABf5mKjeHGHhVd",
"_score": null,
"_source": {
"@timestamp": "2024-07-06T01:03:18.519495280Z",
"item": "layer 7 protocol",
"action": "changed",
"@version": "1",
"event": {
"original": "system,info layer 7 protocol changed by admin"
},
"host": "192.168.1.1",
"tags": [
"mikrotik"
],
"topic1": "system",
"topic2": "info",
"message": "system,info layer 7 protocol changed by admin",
"user": "admin"
},
"sort": [
1720227798519
]
}
Если что-то не работает
Сбор журналов с Mikrotik в Elasticsearch заработал у меня не с первого раза. Пришлосб немного провести дебаг. Что я делал:
1. Убедитесь, что сообщения вообще доходят до сервера с Logstash. Я для этого использовал tcpdump (только замените имя интерфейса и порт на ваши значения):
tcpdump -vv -i ens18 port 5514 and udp
2. Еще можно попробовать провести дебаг на стороне сервиса Logstash:
sudo journalctl -fu logstash.service