Система для сбора логов Elasticsearch + Fluentd + Kibana. Как это работает у нас

Привет. Меня зовут Андрей Товстоног, я DevOps инженер в компании Genesis. Эта статья обзорная и не претендует на авторское решение проблем сбора, доставки, хранения и обработки логов.

Выдумывать велосипеды совершенно не нужно, когда есть отличный инструмент, который позволяет жонглировать логами так, как нам удобно, — это Elastic Stack. Решение проверено временем и становится лучше с каждым релизом, к тому же еще и бесплатное. В этой статье речь пойдет о том, как мы собрали систему сбора, хранения и обработки логов, а также о том, с какими проблемами мы столкнулись и как их решали. Если интересно  — добро пожаловать.

Наш стек не уникален, все как у всех, кроме самого коллектора. Выбирали между классическим и зрелым решением Elastic Stack  — Logstash и более новым инструментом — Fluentd. Сейчас вы можете возразить: «Стоп, ты же сам сказал, что выдумывать велосипеды совершенно не нужно». Верно. Поэтому далее объясню, почему мы выбрали Fluentd.

На этапе планирования мы прикинули, что в будущем неплохо было бы на K8s перебираться (А что? Стильно, модно, молодежно). А если так, то нужно смотреть в сторону инструмента, который используется в нем де-факто. И это  Fluentd. А пока нету K8s, но есть проекты на Docker, нужно сделать так, чтобы потом было меньше проблем. Поехали.

Архитектура

Архитектурно сервис собран на одном инстансе. Состоит из клиентской части и коллектора (коллектором будем называть весь EFK-стек). Клиент пересылает, а коллектор, в свою очередь, занимается обработкой логов.

Проект был полностью собран на Docker. Деплоится это все с помощью бережно написанных манифестов Puppet. Вообще Puppet забавно деплоит контейнеры, прописывая их в Systemd как сервис. Поначалу было непривычно, а потом стало даже удобно, так как за состоянием контейнера начинает следить демон Systemd.

Сам сервис выглядит вот так:

# This file is managed by Puppet and local changes
# may be overwritten
[Unit]
Description=Daemon for fluentd_client
After=docker.service
Wants=
Requires=docker.service

[Service]
Restart=on-failure
StartLimitInterval=20
StartLimitBurst=5
TimeoutStartSec=0
RestartSec=5
Environment="HOME=/root"
SyslogIdentifier=docker.fluentd_client
ExecStartPre=-/usr/bin/docker kill fluentd_client
ExecStartPre=-/usr/bin/docker rm  fluentd_client
ExecStartPre=-/usr/bin/docker pull fluent/fluentd:v1.2.5-debian
ExecStart=/usr/bin/docker run \
-h 'host1.example.com' --net bridge -m 0b \
-p 9122:9122 \
-v /opt/fluentd_client/fluent.conf:/fluentd/etc/fluent.conf \
-v /opt/fluentd_client/data:/data \
-v /var/lib/docker/containers:/logs/docker_logs:ro \
-v /var/log/nginx:/logs/nginx_logs:ro \
-v /var/log/auth.log:/logs/auth_logs:ro \
--name fluentd_client \
fluent/fluentd:v1.2.5-debian \
ExecStop=-/usr/bin/docker stop --time=5 fluentd_client

[Install]
WantedBy=multi-user.target

Итак, наш стек состоит из таких элементов:

  1. Elasticsearch  — сердце всего проекта. Именно на нем лежит задача приема, хранения, обработки и поиска всех сущностей.
  2. Fluentd  —  коллектор, который берет на себя прием всех логов, их последующий парсинг и бережно укладывает все это добро в индексы Elasticsearch.
  3. Kibana —  визуализатор, то есть умеет работать с API Elasticsearch, получать и отображать данные.
  4. Cerebro — достаточно интересная штука, позволяющая смотреть состояние и управлять настройками ноды/кластера, управлять индексами (например создание, удаление темплейтов для индексов, шардирование).
  5. Nginx —  используется как прокси-сервер для доступа к Kibana, а также обеспечивает базовую HTTP аутентификацию (HTTP basic authentication).
  6. Curator  — так как в нашем случае нет смысла хранить логи более, чем за 30 дней, мы используем штуку, которая умеет ходить в Elasticsearch и подчищать устаревшие индексы.
  7. cAdvisor (Container Advisor) —  контейнер, позволяющий получить информацию о производительности и потреблении ресурсов по запущенным контейнерам. Также он используется для сбора метрик системой мониторинга Prometheus.

Наглядно весь EFK стек (коллектор) изображен на схеме:

Система сбора логов EFK (коллектор)

Какие логи мы собираем:

  1. Логи Nginx (access и error).
  2. Логи с Docker-контейнеров (stdout и stderr).
  3. Логи с Syslog, а именно ssh auth.log.

Тут возникает вопрос: а как это все работает под капотом?

Клиентская часть (агент)

Рассмотрим пример доставки лога от Docker-контейнера. Есть два варианта (на самом деле их больше), как писать лог Docker-контейнеров. В своем дефолтном состоянии используется Logging Driver  — «JSON file», то есть он пишет классическим методом в файл. Второй способ  — переопределить Logging Driver и посылать лог сразу в Fluentd.

Решили мы попробовать второй вариант, но столкнулись проблемой. Если просто сменить Logging Driver на Fluentd и если вдруг TCP сокет Fluentd, куда должны сыпаться логи, не доступен, то контейнер вообще не запустится. А если TCP сокет Fluentd пропадет, когда контейнер ранится, то логи начинают копиться в буфер.

Происходит это потому, что у Docker есть два режима доставки сообщений от контейнера к Logging Driver:

  • Direct  —  является дефолтным. Приложение падает, когда не может записать лог в stdout и/или stderr.
  • Non-blocking — позволяет записывать логи приложения в буфер при недоступности сокета коллектора. При этом, если буфер полон, новые логи начинают вытеснять старые.

Все эти вещи подтянули дополнительными опциями Logging Driver (—log-opt). Но так как нас не устроил ни первый, ни второй вариант, то мы решили не переопределять Logging Driver и оставить его дефолтным.

Вот тут мы и задумались о том, что необходимо устанавливать агента, который бы старым добрым способом вычитывал с помощью tail лог-файл. Кстати, наверное, не все задумывались над тем, что происходит с tail, когда ротейтится лог-файл. А оказалось все достаточно просто, так как tail снабжен ключем -F (same as —follow="name" —retry), который способен следить за файлом и переоткрывать его в случае необходимости.

Итак, первое, на что обратили внимание  — это продукт Fluent Bit. Его настройка достаточна проста, но мы столкнулись с несколькими неприятными моментами.

Проблема заключалась в том, что если TCP сокет коллектора не доступен, то Fluent Bit начинает активно писать в буфер, который находится в памяти. По мере заполнения буфера он начинает вытеснять (перезаписывать) логи до тех пор, пока не восстановит соединение с коллектором, а следовательно, мы теряем логи. Это не критично, но крайне неприятно.

Решили эту проблему с помощью добавления некоторых параметров конфигурации плагина tail в FluentBit, а именно:

  • DB  —  определяет файл базы данных, где будет вестись учет файлов, которые мониторятся, а также позицию в этих файлах (offsets).
  • DB.Sync  — устанавливает дефолтный метод синхронизации (I/O). Эта опция отвечает за то, как внутренний движок SQLite будет синхронизироваться на диск.

Стало все отлично, логи мы не теряли, но столкнулись с другой проблемой — эта штука очень жутко начала потреблять IOPs. Обратили на это внимание, когда заметили, что инстансы начали тупить и упираться в IO, которого раньше хватало за глаза.

Мы используем EC2 инстансы (AWS) и тип EBS —  gp2 (General Purpose SSD). У них есть такая штука, которая называется «Burst performance». Это когда EBS способен выдерживать некоторые пики возрастающей нагрузки на файловую систему, например запуск какой-нибудь крон задачи, которая требует интенсивного IO. Если ему недостаточно «Baseline performance», инстанс начинает потреблять накопленные кредиты на IO, то есть «Burst performance».

Так вот, наша проблема заключалась в том, что были вычерпаны кредиты IO и EBS начал захлебываться от его недостатка. Ок, пройденный этап. После этого мы решили, а почему бы не использовать тот же Fluentd в качестве агента.

Полная схема теперь выглядит вот так:

Система сбора логов EFK (коллектор + агенты)

Первый запуск Fluentd из-за большого количества файлов логов оказался достаточно волнительным. В логе самого демона говорилось о том, что он видит только часть заданных ему файлов ( тут мы немного напряглись), при том, что когда его запускаешь заново, он подхватывает разные файлы логов. Оказалось, это ожидаемое поведение, так как плагин tail однопоточный, и одновременно он может осилить только 4 файла (видимо следуя здравому смыслу), выделяя какую-то часть времени на каждый файл и переключая контексты по истечении этого времени. Но после вычитки всех файлов   его поведение нормализуется.

Эта ситуация справедлива тогда, когда есть достаточно объемные файлы логов (как было в нашем случае) и с включенной опцией read_from_head (производить вычитку файла с самого начала). Если эту опцию не добавлять, то вычитка будет производиться по стандартному поведению tail, то есть с конца файла.

Маршрутизация логов Fluentd делается на основании тегов. То есть каждый контейнер имеет свой тег, и если он совпадает с меткой на фильтре —  производит его последовательную обработку (сверху вниз). По умолчанию тег присваивается вот такой:

docker.<path_to_docker_logs>.<container_id>.<conteiner_id>-json.log

В соответствии с тегом осуществляется фильтрация (например, если необходимо добавить какое-то поле в JSON лог или направить сразу на output).

Весь процесс выглядит следующим образом.

Fluentd вычитывает лог файл.

<source>
  @type tail
  @id in_tail_container_logs
  path /logs/docker_logs/*/*.log
  pos_file /data/docker.pos
  read_from_head true
  tag docker.**
  skip_refresh_on_startup false
  refresh_interval 10s
  enable_watch_timer true
  enable_stat_watcher false
  <parse>
    @type json
    time_format %Y-%m-%dT%H:%M:%S.%NZ
  </parse>
</source>

Маршрутизирует на основании тегов (фильтрует). В данном случае дописывает имя хостовой системы, на которой запущен контейнер.

<filter docker.**>
  @type record_transformer
  enable_ruby
  <record>
    hostname "#{Socket.gethostname}"
  </record>
</filter>

Выполняет разбивку на порции данных (chunks) и пишет их в буфер (buffer), который находится на диске. Порции данных помещает в очередь. Отправляет коллектору.

<match docker.**>
  @type forward
  @id forward_docker
  <server>
    name main_server
    host collector.example.com
    port 24224
  </server>
  <buffer>
    @type file
    path /data/buffers/containers
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 2s
    retry_forever
    retry_max_interval 30
    chunk_limit_size 4M
    queue_limit_length 512
    overflow_action block
  </buffer>
</match>

Если TCP сокет коллектора не доступен — продолжает читать файл и аккуратно складывает порции данных до установленного предела. Когда достигнет предела — останавливает чтение файла и запоминает позицию. При возобновлении связи начинает передачу собравшихся данных и продолжает чтение файла с места, на котором остановился.

Вот так выглядит сформированный формат JSON лога, который получается на выходе для передачи на коллектор:

2018-10-05 07:31:29.069710307 +0000 docker.logs.docker_logs.93f5b2302f7c91e5a6de695ec29ee125db2835fd29f1f861bbe5153f4ce7878c.93f5b2302f7c91e5a6de695ec29ee125db2835fd29f1f861bbe5153f4ce7878c-json.log:
{"log":"[05/Oct/2018:07:31:25 +0000]
"example.com"
"1.2.3.4"
"304"
"0.002"
"0.002"
"443"
"GET /default.css HTTP/1.1"
"https://example.com/efk"
"Mozilla/5.0",
"stream":"stdout",
"attrs":{"container_name":"example-nginx",
         "service":"nginx"},
"hostname":"host.example.com"}"

Итак разобрались с тем, что происходит на клиенте.

Коллектор

Теперь рассмотрим, что происходит на коллекторе.

Стрим поступает на TCP сокет коллектора, начинает процесс маршрутизации (фильтрации) и проходит следующий воркфлоу.

Он попадает на цепочку docker_before . Эта цепочка предназначена для переопределения тегов входящих логов от Docker-контейнеров. Каждый контейнер имеет дополнительные атрибуты, такие как «attrs.service» и «attrs.container_name». Переопределение тега происходит на основании «attrs.service», например Nginx. При этом, если нам нет необходимости собирать логи с других контейнеров, например MongoDB, все, что нам нужно, — это убрать его из цепочки определения сервиса, и он просто упадет в null. Выглядит вот так:

<match docker.logs.**>
  @type rewrite_tag_filter
  <rule>
    key $[‘attrs’][‘service’]
    pattern ^nginx$
    tag docker.nginx
  </rule>
</match>

<match docker.logs.**>
  @type null
</match>

Далее выполняется маршрутизация (фильтрация) на основании переопределенного тега «docker.nginx» и попадает в следующий конфиг-файл.

Цепочка Nginx_common  выполняет разбивку логов на error и access на основании JSON поля «stream» и переопределяет тэги на «docker.nginx.stdout» и «docker.nginx.stderr». Дальнейшее продвижение лога происходит на основании уже переопределенных тэгов.

<match docker.nginx>
  @type rewrite_tag_filter
  <rule>
    key stream
    pattern /stdout/
    tag docker.nginx.stdout
  </rule>
  <rule>
    key stream
    pattern /stderr/
    tag docker.nginx.stderr
  </rule>
</match>

Далее происходит матчинг по тегу «docker.nginx.stdout». К нему применяется парсер с определенным форматом.

<filter docker.nginx.stdout>
  @type parser
  key_name log
  reserve_data true
  time_parse true
 format  /^\[(?<time>.*)\]\s"(?<request_host>.*)"\s"(?<remote_host>.*)"\s"(?<status>\d{3})"\s"(?<request_time>\d+.\d+)"\s"(?<upstream_response_time>.*)"\s"(?<bytes_sent>.*)"\s"(?<method>\w{3,7})\s(?<request_uri>.*)\s(?<http_version>.*)"\s"(?<http_referer>.*)"\s"(?<http_user_agent>.*)"/
  time_format %d/%b/%Y:%H:%M:%S %z
</filter>

Затем модифицируется поле upstream_response_time (мы переопределяем значения некоторых полей (mapping) в Elasticsearch индексе для корректного поиска и фильтрации), чтобы поле содержало только числа, а не прочерки, если значение отсутствует.

<filter docker.nginx.stdout>
  @type record_transformer
  enable_ruby true
  auto_typecast true
  <record>
   upstream_response_time ${record["upstream_response_time"] == '-' ? -1.000 :  record["upstream_response_time"]}
  </record>
</filter>

Далее добавляем GeoIP данные, такие как «country_code», «country_name», «city» и «geoip_hash».

Вот тут, кстати, заметили, что память «текла» при использовании библиотеки «geoip2_c», поэтому выбрали «geoip2_compat».

1. Использование библиотеки geoip2_c — видно постепенный рост потребления памяти. 2. Использование библиотеки geoip2_compat — видно ровное потребление памяти. Коллега говорит: «Как в книжке пишут» :)

<filter docker.nginx.stdout>
  @type geoip
  geoip_lookup_keys remote_host
  geoip2_database "/opt/GeoLite2-City.mmdb"
  backend_library geoip2_compat
  <record>
    country        ${country_code["remote_host"]}
    country_name   ${country_name["remote_host"]}
    city           ${city["remote_host"]}
    geoip_hash     '[${longitude["remote_host"]},${latitude["remote_host"]}]'
 </record>
  skip_adding_null_record  true
</filter>

Так как поля «stream» и «log» мы распарсили, то смело можем их удалять. Поле «log» удаляем только с логов, тегированных как «docker.nginx.stdout», а поле «stream» удаляем со всех логов (это поле, на основании которого мы делали переопределение тега в начале этой цепочки).

<filter docker.nginx.stdout>
  @type record_modifier
  remove_keys log
</filter>
<filter docker.nginx.*>
  @type record_modifier
  remove_keys stream
</filter>

Парсим логи, тегированные как «nginx.docker.stderr». Тут вообще все просто. Разбираем по минимуму: время, тип (severity type), pid, ну и, собственно, само тело сообщения (оставляем как есть).

<filter nginx.docker.stderr>
  @type parser
  key_name log
  reserve_data true
  format /^(?<time>.*)\s(?<type>\[\w+\])\s(?<pid>\d+#\d+):\s(?<log>[^ ].*)/
</filter>

Далее аккуратно раскладываем по индексам, предварительно закидывая шаблон для индекса. Но, прежде чем отправить в индекс данные, учтите, что они складываются порциями в буфер, который расположен на диске, попадают в очередь и сбрасываются порциями (bulk flush) каждые 5 с.

<match docker.nginx.stdout>
  @type elasticsearch
  logstash_format true
  logstash_prefix docker.nginx.stdout
  host elasticsearch
  port 9200
  template_file /fluentd/etc/index_maps/docker.nginx.stdout
  template_name docker.nginx.stdout
  template_overwrite true
  max_retry_putting_template 15
  request_timeout 15s
  reload_connections false
  <buffer>
     @type file
     path /buffers/docker.nginx.stdout
     flush_mode interval
     retry_type exponential_backoff
     flush_thread_count 2
     flush_interval 5s
     retry_forever
     retry_max_interval 30
     chunk_limit_size 2M
     queue_limit_length 100
     overflow_action block
   </buffer>
</match>
<match docker.nginx.stderr>
  @type elasticsearch
  logstash_format true
  logstash_prefix docker.nginx.stderr
  host elasticsearch
  port 9200
  template_file /fluentd/etc/index_maps/docker.nginx.stderr
  template_name docker.nginx.stderr
  template_overwrite true
  max_retry_putting_template 15
  request_timeout 15s
  reload_connections false
  <buffer>
     @type file
     path /buffers/docker.nginx.stderr
     flush_mode interval
     retry_type exponential_backoff
     flush_thread_count 2
     flush_interval 5s
     retry_forever
     retry_max_interval 30
     chunk_limit_size 2M
     queue_limit_length 100
     overflow_action block
   </buffer>
</match>

На выходе из коллектора лог получается вот таким:

2018-10-05 07:31:29.069710307 +0000 docker.nginx.stdout:
{"attrs":{"container_name":"example-nginx",
            "service":"nginx"},
"hostname":"example.host.com",
"request_host":"example.com",
"remote_host":"1.2.3.4",
"status":"304",
"request_time":"0.002",
"upstream_response_time":"0.002",
"bytes_sent":"443",
"method":"GET",
"request_uri":"/default.css",
"http_version":"HTTP/1.1",
"http_referer":"-",
"http_user_agent":"Mozilla/5.0",
"country":"US",
"country_name":"United States",
"city":"Riverdale",
"geoip_hash":[-84.4522,33.5561]}

А в Kibana это выглядит уже вот так:

Kibana, пример живой системы

Что касается настройки индекса в Elasticsearch, то мы изменяем параметр «number_of_shards: 1», который по дефолту равен 5. Увеличивать количество шард (shard) есть смысл, когда собран кластер, так как при запросах Elasticsearch параллелит поиск по шардам в дата-нодах. Поскольку дата-нода у нас одна, то и количество шард должно быть равным количеству дата-нод кластера.

Еще один параметр, который мы меняем, — это «number_of_replica: 0» . Не держим реплику для Primary Shard, так как реплицировать, собственно говоря, и некуда.

В мапинге (mapping) индекса Elasticsearch мы изменяем типы следующих полей:

  • bytes_sent  -  integer
  • geoip_hash  -  geo_point
  • remote_host  -  ip
  • request_time  - half_float
  • status  -  short
  • upstream_response_time  -  half_float

Итоги

Построить систему сбора, хранения и анализа логов достаточно просто, используя бесплатные проекты, которые активно развиваются. Эту систему, мы используем, в первую очередь, как средство оперативного анализа, локализации и понимая возникающих проблем. Но (just for fun), такая система позволяет делать с логами крутые вещи, включая их визуализацию, строить красивые графики, диаграммы и таблицы.

Похожие статьи:
Всем привет! Меня зовут Саша Емельянов, я продакт-менеджер. Свой путь в менеджменте я начал с того, что открыл собственный стартап,...
Близько шести років у Брюховичах неподалік Львова працює Hebron IT Academy — соціальний освітній заклад, в якому підлітки-сироти...
Компания MediaTek представляет ультраэнегоэффективный восьмиядерный чипсет helio P20, который дополнит флагманскую линейку...
IT Academy CONTACT открывает набор на 8-недельный курс «Java START». В процессе обучения на курсах Java START...
Приглашаем наших читателей принять участие в опросе об ИТ-литературе. По результатам...
Яндекс.Метрика