Система для сбора логов Elasticsearch + Fluentd + Kibana. Как это работает у нас

Привет. Меня зовут Андрей Товстоног, я DevOps инженер в компании Genesis. Эта статья обзорная и не претендует на авторское решение проблем сбора, доставки, хранения и обработки логов.

Выдумывать велосипеды совершенно не нужно, когда есть отличный инструмент, который позволяет жонглировать логами так, как нам удобно, — это Elastic Stack. Решение проверено временем и становится лучше с каждым релизом, к тому же еще и бесплатное. В этой статье речь пойдет о том, как мы собрали систему сбора, хранения и обработки логов, а также о том, с какими проблемами мы столкнулись и как их решали. Если интересно  — добро пожаловать.

Наш стек не уникален, все как у всех, кроме самого коллектора. Выбирали между классическим и зрелым решением Elastic Stack  — Logstash и более новым инструментом — Fluentd. Сейчас вы можете возразить: «Стоп, ты же сам сказал, что выдумывать велосипеды совершенно не нужно». Верно. Поэтому далее объясню, почему мы выбрали Fluentd.

На этапе планирования мы прикинули, что в будущем неплохо было бы на K8s перебираться (А что? Стильно, модно, молодежно). А если так, то нужно смотреть в сторону инструмента, который используется в нем де-факто. И это  Fluentd. А пока нету K8s, но есть проекты на Docker, нужно сделать так, чтобы потом было меньше проблем. Поехали.

Архитектура

Архитектурно сервис собран на одном инстансе. Состоит из клиентской части и коллектора (коллектором будем называть весь EFK-стек). Клиент пересылает, а коллектор, в свою очередь, занимается обработкой логов.

Проект был полностью собран на Docker. Деплоится это все с помощью бережно написанных манифестов Puppet. Вообще Puppet забавно деплоит контейнеры, прописывая их в Systemd как сервис. Поначалу было непривычно, а потом стало даже удобно, так как за состоянием контейнера начинает следить демон Systemd.

Сам сервис выглядит вот так:

# This file is managed by Puppet and local changes
# may be overwritten
[Unit]
Description=Daemon for fluentd_client
After=docker.service
Wants=
Requires=docker.service

[Service]
Restart=on-failure
StartLimitInterval=20
StartLimitBurst=5
TimeoutStartSec=0
RestartSec=5
Environment="HOME=/root"
SyslogIdentifier=docker.fluentd_client
ExecStartPre=-/usr/bin/docker kill fluentd_client
ExecStartPre=-/usr/bin/docker rm  fluentd_client
ExecStartPre=-/usr/bin/docker pull fluent/fluentd:v1.2.5-debian
ExecStart=/usr/bin/docker run \
-h 'host1.example.com' --net bridge -m 0b \
-p 9122:9122 \
-v /opt/fluentd_client/fluent.conf:/fluentd/etc/fluent.conf \
-v /opt/fluentd_client/data:/data \
-v /var/lib/docker/containers:/logs/docker_logs:ro \
-v /var/log/nginx:/logs/nginx_logs:ro \
-v /var/log/auth.log:/logs/auth_logs:ro \
--name fluentd_client \
fluent/fluentd:v1.2.5-debian \
ExecStop=-/usr/bin/docker stop --time=5 fluentd_client

[Install]
WantedBy=multi-user.target

Итак, наш стек состоит из таких элементов:

  1. Elasticsearch  — сердце всего проекта. Именно на нем лежит задача приема, хранения, обработки и поиска всех сущностей.
  2. Fluentd  —  коллектор, который берет на себя прием всех логов, их последующий парсинг и бережно укладывает все это добро в индексы Elasticsearch.
  3. Kibana —  визуализатор, то есть умеет работать с API Elasticsearch, получать и отображать данные.
  4. Cerebro — достаточно интересная штука, позволяющая смотреть состояние и управлять настройками ноды/кластера, управлять индексами (например создание, удаление темплейтов для индексов, шардирование).
  5. Nginx —  используется как прокси-сервер для доступа к Kibana, а также обеспечивает базовую HTTP аутентификацию (HTTP basic authentication).
  6. Curator  — так как в нашем случае нет смысла хранить логи более, чем за 30 дней, мы используем штуку, которая умеет ходить в Elasticsearch и подчищать устаревшие индексы.
  7. cAdvisor (Container Advisor) —  контейнер, позволяющий получить информацию о производительности и потреблении ресурсов по запущенным контейнерам. Также он используется для сбора метрик системой мониторинга Prometheus.

Наглядно весь EFK стек (коллектор) изображен на схеме:

Система сбора логов EFK (коллектор)

Какие логи мы собираем:

  1. Логи Nginx (access и error).
  2. Логи с Docker-контейнеров (stdout и stderr).
  3. Логи с Syslog, а именно ssh auth.log.

Тут возникает вопрос: а как это все работает под капотом?

Клиентская часть (агент)

Рассмотрим пример доставки лога от Docker-контейнера. Есть два варианта (на самом деле их больше), как писать лог Docker-контейнеров. В своем дефолтном состоянии используется Logging Driver  — «JSON file», то есть он пишет классическим методом в файл. Второй способ  — переопределить Logging Driver и посылать лог сразу в Fluentd.

Решили мы попробовать второй вариант, но столкнулись проблемой. Если просто сменить Logging Driver на Fluentd и если вдруг TCP сокет Fluentd, куда должны сыпаться логи, не доступен, то контейнер вообще не запустится. А если TCP сокет Fluentd пропадет, когда контейнер ранится, то логи начинают копиться в буфер.

Происходит это потому, что у Docker есть два режима доставки сообщений от контейнера к Logging Driver:

  • Direct  —  является дефолтным. Приложение падает, когда не может записать лог в stdout и/или stderr.
  • Non-blocking — позволяет записывать логи приложения в буфер при недоступности сокета коллектора. При этом, если буфер полон, новые логи начинают вытеснять старые.

Все эти вещи подтянули дополнительными опциями Logging Driver (—log-opt). Но так как нас не устроил ни первый, ни второй вариант, то мы решили не переопределять Logging Driver и оставить его дефолтным.

Вот тут мы и задумались о том, что необходимо устанавливать агента, который бы старым добрым способом вычитывал с помощью tail лог-файл. Кстати, наверное, не все задумывались над тем, что происходит с tail, когда ротейтится лог-файл. А оказалось все достаточно просто, так как tail снабжен ключем -F (same as —follow="name" —retry), который способен следить за файлом и переоткрывать его в случае необходимости.

Итак, первое, на что обратили внимание  — это продукт Fluent Bit. Его настройка достаточна проста, но мы столкнулись с несколькими неприятными моментами.

Проблема заключалась в том, что если TCP сокет коллектора не доступен, то Fluent Bit начинает активно писать в буфер, который находится в памяти. По мере заполнения буфера он начинает вытеснять (перезаписывать) логи до тех пор, пока не восстановит соединение с коллектором, а следовательно, мы теряем логи. Это не критично, но крайне неприятно.

Решили эту проблему с помощью добавления некоторых параметров конфигурации плагина tail в FluentBit, а именно:

  • DB  —  определяет файл базы данных, где будет вестись учет файлов, которые мониторятся, а также позицию в этих файлах (offsets).
  • DB.Sync  — устанавливает дефолтный метод синхронизации (I/O). Эта опция отвечает за то, как внутренний движок SQLite будет синхронизироваться на диск.

Стало все отлично, логи мы не теряли, но столкнулись с другой проблемой — эта штука очень жутко начала потреблять IOPs. Обратили на это внимание, когда заметили, что инстансы начали тупить и упираться в IO, которого раньше хватало за глаза.

Мы используем EC2 инстансы (AWS) и тип EBS —  gp2 (General Purpose SSD). У них есть такая штука, которая называется «Burst performance». Это когда EBS способен выдерживать некоторые пики возрастающей нагрузки на файловую систему, например запуск какой-нибудь крон задачи, которая требует интенсивного IO. Если ему недостаточно «Baseline performance», инстанс начинает потреблять накопленные кредиты на IO, то есть «Burst performance».

Так вот, наша проблема заключалась в том, что были вычерпаны кредиты IO и EBS начал захлебываться от его недостатка. Ок, пройденный этап. После этого мы решили, а почему бы не использовать тот же Fluentd в качестве агента.

Полная схема теперь выглядит вот так:

Система сбора логов EFK (коллектор + агенты)

Первый запуск Fluentd из-за большого количества файлов логов оказался достаточно волнительным. В логе самого демона говорилось о том, что он видит только часть заданных ему файлов ( тут мы немного напряглись), при том, что когда его запускаешь заново, он подхватывает разные файлы логов. Оказалось, это ожидаемое поведение, так как плагин tail однопоточный, и одновременно он может осилить только 4 файла (видимо следуя здравому смыслу), выделяя какую-то часть времени на каждый файл и переключая контексты по истечении этого времени. Но после вычитки всех файлов   его поведение нормализуется.

Эта ситуация справедлива тогда, когда есть достаточно объемные файлы логов (как было в нашем случае) и с включенной опцией read_from_head (производить вычитку файла с самого начала). Если эту опцию не добавлять, то вычитка будет производиться по стандартному поведению tail, то есть с конца файла.

Маршрутизация логов Fluentd делается на основании тегов. То есть каждый контейнер имеет свой тег, и если он совпадает с меткой на фильтре —  производит его последовательную обработку (сверху вниз). По умолчанию тег присваивается вот такой:

docker.<path_to_docker_logs>.<container_id>.<conteiner_id>-json.log

В соответствии с тегом осуществляется фильтрация (например, если необходимо добавить какое-то поле в JSON лог или направить сразу на output).

Весь процесс выглядит следующим образом.

Fluentd вычитывает лог файл.

<source>
  @type tail
  @id in_tail_container_logs
  path /logs/docker_logs/*/*.log
  pos_file /data/docker.pos
  read_from_head true
  tag docker.**
  skip_refresh_on_startup false
  refresh_interval 10s
  enable_watch_timer true
  enable_stat_watcher false
  <parse>
    @type json
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

Маршрутизирует на основании тегов (фильтрует). В данном случае дописывает имя хостовой системы, на которой запущен контейнер.

<filter docker.**>
  @type record_transformer
  enable_ruby
  <record>
    hostname "#{Socket.gethostname}"
  </record>
</filter>

Выполняет разбивку на порции данных (chunks) и пишет их в буфер (buffer), который находится на диске. Порции данных помещает в очередь. Отправляет коллектору.

<match docker.**>
  @type forward
  @id forward_docker
  <server>
    name main_server
    host collector.example.com
    port 24224
  </server>
  <buffer>
    @type file
    path /data/buffers/containers
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 2s
    retry_forever
    retry_max_interval 30
    chunk_limit_size 4M
    queue_limit_length 512
    overflow_action block
  </buffer>
</match>

Если TCP сокет коллектора не доступен — продолжает читать файл и аккуратно складывает порции данных до установленного предела. Когда достигнет предела — останавливает чтение файла и запоминает позицию. При возобновлении связи начинает передачу собравшихся данных и продолжает чтение файла с места, на котором остановился.

Вот так выглядит сформированный формат JSON лога, который получается на выходе для передачи на коллектор:

2018-10-05 07:31:29.069710307 +0000 docker.logs.docker_logs.93f5b2302f7c91e5a6de695ec29ee125db2835fd29f1f861bbe5153f4ce7878c.93f5b2302f7c91e5a6de695ec29ee125db2835fd29f1f861bbe5153f4ce7878c-json.log:
{"log":"[05/Oct/2018:07:31:25 +0000]
"example.com"
"1.2.3.4"
"304"
"0.002"
"0.002"
"443"
"GET /default.css HTTP/1.1"
"https://example.com/efk"
"Mozilla/5.0",
"stream":"stdout",
"attrs":{"container_name":"example-nginx",
         "service":"nginx"},
"hostname":"host.example.com"}"

Итак разобрались с тем, что происходит на клиенте.

Коллектор

Теперь рассмотрим, что происходит на коллекторе.

Стрим поступает на TCP сокет коллектора, начинает процесс маршрутизации (фильтрации) и проходит следующий воркфлоу.

Он попадает на цепочку docker_before . Эта цепочка предназначена для переопределения тегов входящих логов от Docker-контейнеров. Каждый контейнер имеет дополнительные атрибуты, такие как «attrs.service» и «attrs.container_name». Переопределение тега происходит на основании «attrs.service», например Nginx. При этом, если нам нет необходимости собирать логи с других контейнеров, например MongoDB, все, что нам нужно, — это убрать его из цепочки определения сервиса, и он просто упадет в null. Выглядит вот так:

<match docker.logs.**>
  @type rewrite_tag_filter
  <rule>
    key $[‘attrs’][‘service’]
    pattern ^nginx$
    tag docker.nginx
  </rule>
</match>

<match docker.logs.**>
  @type null
</match>

Далее выполняется маршрутизация (фильтрация) на основании переопределенного тега «docker.nginx» и попадает в следующий конфиг-файл.

Цепочка Nginx_common  выполняет разбивку логов на error и access на основании JSON поля «stream» и переопределяет тэги на «docker.nginx.stdout» и «docker.nginx.stderr». Дальнейшее продвижение лога происходит на основании уже переопределенных тэгов.

<match docker.nginx>
  @type rewrite_tag_filter
  <rule>
    key stream
    pattern /stdout/
    tag docker.nginx.stdout
  </rule>
  <rule>
    key stream
    pattern /stderr/
    tag docker.nginx.stderr
  </rule>
</match>

Далее происходит матчинг по тегу «docker.nginx.stdout». К нему применяется парсер с определенным форматом.

<filter docker.nginx.stdout>
  @type parser
  key_name log
  reserve_data true
  time_parse true
 format  /^\[(?<time>.*)\]\s"(?<request_host>.*)"\s"(?<remote_host>.*)"\s"(?<status>\d{3})"\s"(?<request_time>\d+.\d+)"\s"(?<upstream_response_time>.*)"\s"(?<bytes_sent>.*)"\s"(?<method>\w{3,7})\s(?<request_uri>.*)\s(?<http_version>.*)"\s"(?<http_referer>.*)"\s"(?<http_user_agent>.*)"/
  time_format %d/%b/%Y:%H:%M:%S %z
</filter>

Затем модифицируется поле upstream_response_time (мы переопределяем значения некоторых полей (mapping) в Elasticsearch индексе для корректного поиска и фильтрации), чтобы поле содержало только числа, а не прочерки, если значение отсутствует.

<filter docker.nginx.stdout>
  @type record_transformer
  enable_ruby true
  auto_typecast true
  <record>
   upstream_response_time ${record["upstream_response_time"] == '-' ? -1.000 :  record["upstream_response_time"]}
  </record>
</filter>

Далее добавляем GeoIP данные, такие как «country_code», «country_name», «city» и «geoip_hash».

Вот тут, кстати, заметили, что память «текла» при использовании библиотеки «geoip2_c», поэтому выбрали «geoip2_compat».

1. Использование библиотеки geoip2_c — видно постепенный рост потребления памяти. 2. Использование библиотеки geoip2_compat — видно ровное потребление памяти. Коллега говорит: «Как в книжке пишут» :)

<filter docker.nginx.stdout>
  @type geoip
  geoip_lookup_keys remote_host
  geoip2_database "/opt/GeoLite2-City.mmdb"
  backend_library geoip2_compat
  <record>
    country        ${country_code["remote_host"]}
    country_name   ${country_name["remote_host"]}
    city           ${city["remote_host"]}
    geoip_hash     '[${longitude["remote_host"]},${latitude["remote_host"]}]'
 </record>
  skip_adding_null_record  true
</filter>

Так как поля «stream» и «log» мы распарсили, то смело можем их удалять. Поле «log» удаляем только с логов, тегированных как «docker.nginx.stdout», а поле «stream» удаляем со всех логов (это поле, на основании которого мы делали переопределение тега в начале этой цепочки).

<filter docker.nginx.stdout>
  @type record_modifier
  remove_keys log
</filter>
<filter docker.nginx.*>
  @type record_modifier
  remove_keys stream
</filter>

Парсим логи, тегированные как «nginx.docker.stderr». Тут вообще все просто. Разбираем по минимуму: время, тип (severity type), pid, ну и, собственно, само тело сообщения (оставляем как есть).

<filter nginx.docker.stderr>
  @type parser
  key_name log
  reserve_data true
  format /^(?<time>.*)\s(?<type>\[\w+\])\s(?<pid>\d+#\d+):\s(?<log>[^ ].*)/
</filter>

Далее аккуратно раскладываем по индексам, предварительно закидывая шаблон для индекса. Но, прежде чем отправить в индекс данные, учтите, что они складываются порциями в буфер, который расположен на диске, попадают в очередь и сбрасываются порциями (bulk flush) каждые 5 с.

<match docker.nginx.stdout>
  @type elasticsearch
  logstash_format true
  logstash_prefix docker.nginx.stdout
  host elasticsearch
  port 9200
  template_file /fluentd/etc/index_maps/docker.nginx.stdout
  template_name docker.nginx.stdout
  template_overwrite true
  max_retry_putting_template 15
  request_timeout 15s
  reload_connections false
  <buffer>
     @type file
     path /buffers/docker.nginx.stdout
     flush_mode interval
     retry_type exponential_backoff
     flush_thread_count 2
     flush_interval 5s
     retry_forever
     retry_max_interval 30
     chunk_limit_size 2M
     queue_limit_length 100
     overflow_action block
   </buffer>
</match>
<match docker.nginx.stderr>
  @type elasticsearch
  logstash_format true
  logstash_prefix docker.nginx.stderr
  host elasticsearch
  port 9200
  template_file /fluentd/etc/index_maps/docker.nginx.stderr
  template_name docker.nginx.stderr
  template_overwrite true
  max_retry_putting_template 15
  request_timeout 15s
  reload_connections false
  <buffer>
     @type file
     path /buffers/docker.nginx.stderr
     flush_mode interval
     retry_type exponential_backoff
     flush_thread_count 2
     flush_interval 5s
     retry_forever
     retry_max_interval 30
     chunk_limit_size 2M
     queue_limit_length 100
     overflow_action block
   </buffer>
</match>

На выходе из коллектора лог получается вот таким:

2018-10-05 07:31:29.069710307 +0000 docker.nginx.stdout:
{"attrs":{"container_name":"example-nginx",
            "service":"nginx"},
"hostname":"example.host.com",
"request_host":"example.com",
"remote_host":"1.2.3.4",
"status":"304",
"request_time":"0.002",
"upstream_response_time":"0.002",
"bytes_sent":"443",
"method":"GET",
"request_uri":"/default.css",
"http_version":"HTTP/1.1",
"http_referer":"-",
"http_user_agent":"Mozilla/5.0",
"country":"US",
"country_name":"United States",
"city":"Riverdale",
"geoip_hash":[-84.4522,33.5561]}

А в Kibana это выглядит уже вот так:

Kibana, пример живой системы

Что касается настройки индекса в Elasticsearch, то мы изменяем параметр «number_of_shards: 1», который по дефолту равен 5. Увеличивать количество шард (shard) есть смысл, когда собран кластер, так как при запросах Elasticsearch параллелит поиск по шардам в дата-нодах. Поскольку дата-нода у нас одна, то и количество шард должно быть равным количеству дата-нод кластера.

Еще один параметр, который мы меняем, — это «number_of_replica: 0» . Не держим реплику для Primary Shard, так как реплицировать, собственно говоря, и некуда.

В мапинге (mapping) индекса Elasticsearch мы изменяем типы следующих полей:

  • bytes_sent  -  integer
  • geoip_hash  -  geo_point
  • remote_host  -  ip
  • request_time  - half_float
  • status  -  short
  • upstream_response_time  -  half_float

Итоги

Построить систему сбора, хранения и анализа логов достаточно просто, используя бесплатные проекты, которые активно развиваются. Эту систему, мы используем, в первую очередь, как средство оперативного анализа, локализации и понимая возникающих проблем. Но (just for fun), такая система позволяет делать с логами крутые вещи, включая их визуализацию, строить красивые графики, диаграммы и таблицы.

Похожие статьи:
Хто такий фронтенд-розробник, якою мовою він пише, які фреймворки використовує, де мешкає та скільки заробляє. Проаналізували 1440 анкет...
У новому випуску поговоримо про ринок праці: вакансії, відгуки, зарплати фахівців C-level та обов’язкові тестові завдання. Серед...
Компания BlackBerry продолжает оптимизацию количества занятого персонала, увольняя работников. Последнее сокращение коснулось...
Наука тільки недавно дізналася, що життя після тридцяти не закінчується. Лишилося тільки питання, чи можна це називати...
В Україні завершився освітній проєкт ІТ Generation. Найбільше випускників — Front-end, QA, UI/UX Design. У Мінцифри розповіли про...
Яндекс.Метрика