Наш опыт внедрения ClickHouse – аналитической CУБД

Всем привет. Меня зовут Роман Базар. Я работаю в компании MGID на позиции Tech Lead, в основном занимаюсь web-разработкой. Не так давно моя команда занималась внедрением ClickHouse в наш стек технологий. В статье хочу поделиться этим опытом.

Начали мы знакомство с ClickHouse осенью 2016. Вскоре выпустили в продакшн первую фичу с его использованием. С тех пор прошло 2 года, и сейчас 4 ClickHouse ноды принимают на вставку порядка 10-ти миллиардов записей в день, хранят около 4 TB данных, обслуживают выборки с нескольких web-приложений, а также дают возможность отделу аналитики работать с данными еще из одного источника.

Disclaimer. Статья не претендует на тег best practices. Это история о том, как мы улучшили свои инфраструктурные возможности по обработке статистических данных и поработали с ClickHouse. Надеюсь, статья будет полезной.

Для тех, кого напугал объем статьи:

  • Old School Statistics Storage — описание того, как мы хранили статистику раньше и почему больше не можем так делать;
  • Road to ClickHouse — тесты альтернативных решений до ClickHouse;
  • ClickHouse Hello World — основы ClickHouse;
  • ClickHouse First Blood — первая фича, первый взлет и первые ошибки;
  • ClickHouse & Quantiles — самое интересное. Если вы уже знакомы с ClickHouse и не хотите читать все, вам сюда.

Несколько слов о компании. MGID уже более 10-ти лет занимается нативной рекламой. Сотрудничаем с сайтами-площадками, на которых размещаем виджеты показа рекламы. Виджеты отвечают за отображение рекламных объявлений наших клиентов (да-да, та самая реклама с громкими заголовками, на которую многие хотят в тайне кликнуть). Компания продуктовая, пишем для себя. В техническом отделе работает порядка 80-ти человек. Стек технологий: PHP, JavaScript, Go, C++, Lua, Kafka, MySQL, MongoDB, Aerospike и, понятное дело, ClickHouse.

Old School Statistics Storage

Начнем с того, зачем нам вообще понадобился ClickHouse. Как я уже упомянул выше, MGID занимается нативной рекламой. Чтобы четко оценить продуктивность компании и ее ценность для клиентов и партнеров, нужна статистика. Критически важно иметь возможность быстро и удобно ее анализировать.

Долгое время в качестве основного хранилища статистических данных мы использовали MySQL. Сущностей, которые могут получать статистику, в системе достаточно много. Например: сайт, виджет отображения, рекламный блок, рекламная кампания, Список можно продолжать до бесконечности. У всех этих сущностей есть определенные метрики, которые нужно анализировать: клики, показы, траты, заработок, конверсии и т. д. В MySQL хранился ряд таблиц, в которые агрегировались данные по определенным разрезам, достаточно небольшим, чтобы была возможность их компактно хранить и делать по ним быстрые выборки. Когда-то этого было достаточно.

Время не стояло на месте и компания тоже. У бизнеса постоянно появлялись новые идеи, а новые идеи часто означали новую статистику, которую нужно собирать, хранить и анализировать. Часто нам удавалось интегрировать новые метрики в существующие таблицы. Правда, приходилось добавлять новую метрику в таблицы разных сущностей, а их немало. Но в целом ситуация оставалась контролируемой.

Но скоро компания подросла в объемах. Увеличилось число клиентов, сильно выросли объемы трафика и, следовательно, — статистики. Увеличился объем данных в MySQL, и вот таблицы, которые раньше работали хорошо, начали доставлять хлопоты. Выборки уже не были такими быстрыми. Появились проблемы с изменением структуры таблиц, особенно когда нужно было изменить первичный ключ. В общем, наш DBA начал проводить гораздо больше времени наедине с MySQL.

Тем временем, объемы данных, которые нужно обрабатывать, только увеличивались. Мы поняли, что нужно что-то менять.

Road to ClickHouse

Итак, было ясно, что использовать MySQL как единственное хранилище статистических данных не очень перспективно. Нужно двигаться дальше. Но с чего начать?

Первой попыткой стало использование MongoDB. Ставка была на документоориентированный подход: мол, все, что нужно будет, допишем в коллекцию. Может, даже во вложенную коллекцию. В рамках теста реализовали фичу с использованием MongoDB. Все вроде заработало, но мы поняли, что в наших кейсах этого решение не намного лучше подхода с MySQL. Нужно искать что-то другое.

Следующим испытуемым был Druid. По описанию все идеально подходило для наших задач. Даже в статьях с примерами оперируют теми же метриками, что и мы, — кликами и показами. «Нужно пробовать», — подумали мы. Если я не ошибаюсь, дело было весной 2016.

С внедрением возникли затруднения. Учитывая тогдашние объемы данных, проблемы с производительностью возникли уже на предварительных тестах. Особенно запомнился момент, когда нужно было запускать множество демонов вручную. Процесс чем-то напоминал покупку мебели в IKEA: вроде купил шкаф, но его еще нужно собрать из досок, а здесь вроде установил хранилище данных druid, а демон индексации данных запусти сам.

Параллельно соседняя команда занималась тестированием Google BigQuery, но у них все тоже не было гладко.

В то время попадались на глаза статьи от Yandex о том, какие у них были проблемы с хранением статистики для метрики и как они решили их внутренним инструментом — ClickHouse. Вскоре он попал в публичный доступ. ClickHouse выглядел достаточно перспективно, но мы все-таки решили подождать пару месяцев, прежде чем попробовать его использовать.

ClickHouse Hello World

Осенью 2016 мы начали поглядывать в документацию ClickHouse и оценивать, как его можно применить в нашей системе.

ClickHouse — столбцовая СУБД. Отличие от строковых (например, MySQL, PostgreSQL) в том, что столбцовые БД хранят значение одного столбца вместе, а строковые хранят данные одной строки вместе, как ни странно.

Сразу стоит отметить некоторые ограничения, которые есть в ClickHouse:

  • Нет команды UPDATE: данные, записанные в ClickHouse, нельзя изменить. Или почти нельзя. Об этом расскажу дальше.
  • Нет команды DELETE, есть только DROP. Можно удалить таблицу целиком, или ее патрицию. Конкретную строчку удалить нельзя.
  • Нет транзакций.

Данные в ClickHouse хранятся в таблицах. Доступно немалое количество движков таблиц. Зачастую для хранения данных используют семейство движков MergeTree.

Немного об основных движках.

MergeTree

MergeTree — базовый движок семейства. Он обязывает задавать первичный ключ для таблицы. Первичный ключ используется в качестве ключа сортировки для данных. Данные хранятся отсортированными по первичному ключу. Ключ задается структурой ORDER BY при создании таблицы. Первичный ключ может быть неуникальным.

Также в таблице нужно указать поле для партиционирования. Партиция — это минимальная единица данных, которую можно удалить, задетачить, оптимизировать и т. д. Задается поле или выражение структурой PARTITION BY при создании таблицы.

Физически данные хранятся в виде файлов, называются они parts или «куски». Можно всегда заглянуть в таблицу system.parts и посмотреть, что и как. Данные в файле отсортированы по первичному ключу (ключу сортировки). Файл не может содержать данные из разных партиций. При вставке новых данных они записываются в файлы небольшого размера. Через некоторое время ClickHouse запускает процесс слияния (отсюда и название — MergeTree), который объединяет файлы и сортирует данные.

MergeTree — базовый движок семейства. Он не обладает никакими дополнительными возможностями, но хорошо подходит для хранения и накопления любых данных, которые нужно хранить в «сыром» виде. Партиционирование и первичный ключ — это основные инструменты оптимизации выборок.

SummingMergeTree

Основное отличие от MergeTree заключается в том, что движок позволяет задать цифровые столбцы для суммирования (накопления данных). Эти столбцы можно назвать счетчиками. На этапе слияния файлов для строк с одинаковым первичным ключом выполняется суммирование значений счетчиков. После этапа слияния файл имеет только одну строку по каждому первичному ключу. Строка хранит суммарные показатели счетчиков.

Так как таблица может хранить данные в нескольких патрициях, а следовательно, — в нескольких файлах, не стоит полагаться на полное суммирование всех строк одного первичного ключа в рамках таблицы. Для выборок, в которых нужно получить итоговые значение счетчиков, всегда стоит использовать GROUP BY.

SummingMergeTree, по сути, выполняет некую доагрегацию на лету. Отлично подходит для случаев, когда не интересны данные по каждому отдельному событию, а важен только показатель счетчика. Благодаря суммированию значений нескольких строк, такая таблица может сильно сэкономить дисковое пространство и ускорить выборки.

CollapsingMergeTree

Основное отличие от MergeTree заключается в том, что движок позволяет задать столбец-метку, на основании которой он будет выполнять «схлопывание» данных с учетом первичного ключа на этапе слияния. Часто такой столбец называют sign. Столбец должен принимать значение 1 для новых данных, и −1 для старых. На этапе слияния ClickHouse оставит в таблице только самые актуальные данные по каждому первичному ключу, старые данные будут удалены.

CollapsingMergeTree подходит для хранения данных, которые могут изменится в будущем. Движок дает возможность заменить старые данные в таблице на новые. Но учитывайте, что схлопывание происходит не мгновенно, ClickHouse запускает слияния файлов по своему усмотрению.

В целом, это очень базовые описания. Для более детального погружения стоит читать документацию.

ClickHouse First Blood

Первой нашей фичей, где был протестирован ClickHouse, стал интерфейс создания отчетов. Конструктор отчета дает возможность динамически выбрать интересующие разрезы и метрики для подсчета.

Требования предельно просты: нужно иметь возможность произвольно выбирать разрезы и метрики и быстро получать отчет. Интерфейс будет доступен клиентам, поэтому быстродействие и отказоустойчивость критичны. Выборка всегда осуществляется в рамках клиента.

Доступных размерностей для сбора статистики у нас предостаточно. Есть как внутренние: рекламный блок, сайт, различные маркеры трафика так и пользовательские: геолокация, ОС, браузер и т. д.

Среди метрик для сбора можно выделить основные:

  • событие показа рекламы на экране пользователя;
  • событие клика на рекламный блок;
  • итоговые затраты клиента с учетом всех коэффициентов.

Есть еще ряд дополнительных метрик, но они очень специфические для нашей отрасли.

Метрики можно разделить на две группы: неизменяемые и те, которые могут изменить свой статус в будущем. К неизменяемым можно отнести показы: после отображения рекламы на экране пользователя мы считаем, что показ состоялся и изменению это не подлежит. А статусы клика и траты могут быть пересчитаны потом (отсеивание ботов и т. д.).

Было решено хранить постоянные и изменяемые данные отдельно. Показы — в MergeTree, а клики и траты — в CollapsingMergeTree.

Первый этап сбора неизменяемых данных показал, что для хранения одного дня в MergeTree нам понадобится примерно 35 GB. Показов в нашей системе достаточно много.

Философия ClickHouse, по крайней мере его документации, гласит о том, что нужно хранить каждое событие отдельно. Но в рамках фичи интересен был именно счетчик, поэтому мы сменили движок — MergeTree на SummingMergeTree. Переход на новый движок сильно ужал таблицу.

Точных цифр на момент реализации не осталось, но могу привести пример того, сколько данные занимают сейчас:

SELECT SUM(shows)
FROM advertisement_statistics
WHERE date = yesterday()

┌─SUM(shows)─┐
│  725171805 │
└────────────┘

1 rows in set. Elapsed: 0.417 sec. Processed 99.99 million rows, 999.88 MB (239.89 million rows/s., 2.40 GB/s.)

Нужно учитывать, что объемы обрабатываемых данных растут постоянно и за 2 года существенно увеличились. Тем не менее, полные партиции за месяц занимают:

SELECT
    partition,
    formatReadableSize(SUM(bytes)) AS tableSize
FROM system.parts
WHERE table = 'advertisement_statistics'
GROUP BY partition
ORDER BY partition DESC
LIMIT 1, 4

┌─partition─┬─tableSize──┐
│ 201808    │ 158.06 GiB │
│ 201807    │ 147.63 GiB │
│ 201806    │ 142.91 GiB │
└───────────┴────────────┘

Явно меньше чем 35 GB в день.

Данные, которые могут быть изменены, начали хранить в CollapsingMergeTree, 1 строка = 1 ивент. Событий не так много, как показов. Первичный ключ позволяет идентифицировать конкретную строку. При изменении статуса события происходит повторная вставка строки с обновленными данными. При слиянии файлов ClickHouse оставит в таблице только последнюю вставленную строку.

Таблицы имеют схожую структуру, финальный отчет создается при помощи UNION ALL — двух сгруппированных к необходимому разрезу выборок. ClickHouse выполняет выборки параллельно.

После пары недель работы в режиме «для внутреннего использования и тестирования», в котором не было обнаружено проблем, фича стала доступна клиентам.

Что мы получили

В первую очередь мы получили фичу, которую хотели реализовать давно, но упирались в технические преграды. Фича была реализована достаточно быстро, учитывая то, что мы пробовали новую технологию.

Мы получили хранение основных статистических данных системы в едином хранилище с возможностью быстрых выборок. Настолько быстрых, что их можно использовать в пользовательских интерфейсах, даже с учетом UNION ALL. Из-за этого мы и начали использовать ClickHouse в различных частях системы. При налаженном механизме сбора данных оставалось только делать выборки. В таблицы со временем добавились новые данные, но суть не поменялась. Мы получили единое хранилище, которое позволяет работать с данным так, как нам удобно. А доступных фич так много, что мы не успели еще все использовать.

Редактирование данных

Со временем нам таки понадобилось откорректировать данные в ClickHouse. Но это не составило особого труда.

CollapsingMergeTree — движок, который позволяет обойти ограничение ClickHouse на редактирование записей. Вся его логика строится на использовании поля sign(1, −1) и порядка вставки для определения строк, которые нужно оставить в таблице. Но даже если что-то пошло не так и были записаны строки с одинаковым sign подряд, ClickHouse продолжит работу и просто оставит последние вставленные строки в таблице. При необходимости можно сделать дополнительную вставку и, по сути, заменить строки в таблице.

SummingMergeTree также позволяет обойти ограничение на редактирование строк, но немного в другом ключе. Для счетчиков в таблице лучше использовать типы полей Int* вместо UInt*. Это дает возможность откорректировать итоговые показатели счетчиков путем вставки новых записей с корректирующими показателями, в том числе и отрицательными.

Недавно в ClickHouse появилась возможность редактировать данные, но это ресурсоемкая операция и фича пока находится в beta.

Использование внешних словарей

ClickHouse позволяет использовать внешние словари. Очень пригодилась возможность подключать словари именно с MySQL. Необходимые настройки задаются в XML-формате на уровне сервера. В настройках можно указать, какие таблицы нужно подключить с MySQL и как часто их нужно перечитывать. Дальше можно ими пользоваться через семейство функций dictGet*('dict_name', 'attr_name', id).

Реальная жизнь

В процессе работы мы столкнулись с некоторыми сложностями.

В CollapsingMergeTree мы начали хранить данные, которые могут изменить свой статус, и это поведение ожидаемое. Мы слишком положились на механизм схлопывания данных. На крупных объемах это происходит достаточно долго. Можно вызывать этот процесс вручную:

OPTIMIZE TABLE very_data_table;
OPTIMIZE TABLE very_data_table PARTITION 201808;

Иногда вызов тригерит слияние и выполняется за несколько минут, а иногда — за доли секунды, а по факту не делает ничего. На него тоже полагаться не стоит, только как на дополнительный механизм в экстренных случаях.

Изменение структуры SummingMergeTree. Таблица сильно завязана на первичном ключе, в котором перечислены все возможные разрезы, а ClickHouse не позволяет изменять первичный ключ. Поэтому каждое его изменение заключается в том, чтобы:

  • создать новую таблицу;
  • перелить в новую таблицу данные, желательно по временной метке, например: все данные до сегодня или до начала текущего часа;
  • выполнить подмену таблицы с продовой, путем RENAME TO. Можно переименовывать несколько таблиц сразу;
  • долить в уже новую продовую таблицу недостающие данные. Переливание всей таблицы может быть долгим процессом, да и нужно иметь свободное дисковое пространство для таких операций.

В какой-то момент мы решили использовать ReplicatedMergeTree семейство таблиц для удобной master-master репликации, но увы. После перехода на Replicated ноды начали периодически зависать без причин, причем запись была направлена только в одну ноду. Через неделю поисков решения проблемы админы вернули все на обычные MergeTree.

Документация ClickHouse часто не успевает за новыми фичами, которые доступны в релизе, и нужно смотреть дополнительные источники информации (статьи/каналы).

ClickHouse & Quantiles

Кроме использования в роли хранилища долгосрочной статистики, ClickHouse можно применять как промежуточное хранилище в более быстротекущих фичах.

Одна из последних фич, где был использован ClickHouse, — это оценка производительности рекламных блоков. Суть фичи в том, что нужно провести анализ «успешности» рекламного объявления (тизера). Анализ проводится по коэффициентам, которые просчитываются внутренним сервисом отдельно. С помощью ClickHouse проводится финальный срез результатов. Срезы запрашивает интерфейс web-приложения, которым пользуются наши клиенты, поэтому быстродействие критично.

Чтобы не выкладывать сразу всю структуру финального решения, в которой может быть сложно быстро разобраться, опишу путь, пройденный при разработке фичи. Спойлер: очень выручила функция quantileExactWeighted — просчет нескольких квантилей на лету, с учетом веса значений. Подробности — ниже.

Начнем с того, что мы имеем: сервис, который вычисляет оценку «успешности» рекламных блоков. Внутри системы есть множество событий, в которых могут поучаствовать рекламные блоки. Сервис просчитывает оценку рекламного блока, добавляет служебную информацию (геолокацию, время, информацию о пользователе и т. д.) и отправляет это сообщение в Kafka. Сообщение имеет формат TSV (tab-separated values), удобный для передачи данных, если, конечно, в данных нет символа TAB.

Команда, ответственная за сервис, еще на этапе запуска поняла, что ивентов, которые продуцируют сообщения, крайне много, порядка 107 миллионов в час. Проанализировав поток данных за небольшой период, аналитики пришли к выводу, что можно применить сэмплирование еще на этапе продюсинга оценок в Kafka. Они решили собирать данные по каждому N-му событию, а значение N дописывать в сообщение, чтобы понимать, с каким сэмплингом были собраны данные. У оценок разных рекламных тизеров часто общие служебные данные (геолокация, время, информация о пользователе и т. д.). Для экономии трафика начали передавать в одном сообщении данные по нескольким рекламным блокам.

Пример сообщения в Kafka:

{timestamp}TAB{userInfo}TAB{locationId}TAB{advertisementPerfomanceIndexesJSON}TAB{samplingRate}

KafkaEgnine

Для взаимодействия с Kafka в ClickHouse есть специальный движок таблицы. Пример из документации:

 CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
  ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

Он обладает неким набором настроек и даже умеет работать с TSV форматом. Идея в том, что при выборке данных из таблицы те подтягиваются из Kafka-топика. Как у этого движка обстоят дела с производительностью, сказать ничего не могу, так как использовать нам его не удалось. На момент разработки в движке был критический баг именно при работе с TSV, обидно. Пришлось таки делать консьюмер.

CreateTable

С характером данных вроде все понятно, теперь нужно создать таблицу для хранения данных. Важные моменты, на которые стоит обращать внимание при создании таблицы, — это поле партиционирования и первичный ключ. При выборе этих настроек следует всегда обращать внимание на то, как будут построены выборки: по каким разрезам, за какой период нужны данные и т. д.

В нашем случае, Web-интерфейс будет запрашивать оценки по конкретным рекламным блокам, иногда с учетом геолокации и временных диапазонов. Максимальная требуемая точность временных выборок — 1 час. Данные актуальны для выборки 24 часа, после этого их можно удалять.

Нам нужно хранить:

  • timestamp — с точностью до часа;
  • advertisementId — id рекламного блока, целое число;
  • locationId — id геолокации, целое число;
  • performanceRatio — оценочный коэффициент продуктивности блока. Дробное значение с точностью до 3-го знака, часто имеет значение около единицы;
  • samplingRate — показатель семплирования, целое число.

Для базового примера этого достаточно. Данные пользователя затрагивать не будем, чтобы не нагружать пример.

Выбор партиционирования

Как показывает мой личный опыт, партиционирование лучше делать по полям, хранящим временную метку: час, день, месяц и т. д. В зависимости от характера данных и выборок. Например: если вам нужно часто выбирать данные за день, а не, скажем, за месяц, стоит делать партиционирование по дням. При выборке ClickHouse поймет, что вам нужна конкретная партиция, и не пойдет сканировать другие. Данные разных патриций лежат в отдельных файлах. Один файл не может хранить данные нескольких партиций. Сканирование только одной патриции положительно скажется на производительности выборки.

Не стоит делать партиционирование по полю, которое имеет большой диапазон значений. Это может плохо сказаться как на выборках, так и на количестве места, занимаемого таблицей.

Для фич, которые хранят большие объемы данных с коротким сроком актуальности, например, 24 часа, мы практикуем партиционирование по timestamp. Предварительно timestamp округляем до желаемой временной засечки, например, к целому часу, отбрасывая минуты и секунды выше. При острой необходимости можно на лету поменять логику округления timestamp. Например, можно увеличить засечки до 2-х часов, и новые данные будут собираться в патриции по 2 часа, без дополнительных манипуляций со структурой таблицы.

Для выборок советую использовать такой формат:

hour >= toUnixTimestamp('2018-08-27 16:00:00')

Запросы более читаемые чем, скажем:

hour >= 1535374800

В этой фиче интересны данные за 24 часа. Более старые данные не актуальны. ClickHouse позволяет удалять данные с таблиц по партициям, поэтому почасовое партиционирование на основе округленного timestamp оптимально для этой задачи.

Выбор первичного ключа

Помимо партиционирования нужно определить первичный ключ. Первичный ключ не требует уникальности данных в пределах таблицы. Ключ используется как указатель сортировки данных при хранении. При создании таблицы он задается через структуру ORDER BY, но в документации упоминается как первичный ключ.

Порядок сортировки данных в файлах прямо сказывается на выборках. По сути, первичный ключ — это индекс. При его создании нужно придерживаться тех же правил, что и при создании обычного индекса.

В фиче срез данных интересен только для конкретного рекламного блока, иногда с уточняющим разрезом на геолокациию и время. Итого, первичный ключ:

ORDER BY (advertisementId, locationId, hour)

Хранение дробного значения

Оценка рекламного блока — это дробное значение. Точность — 3 знака после запятой. В ClickHouse есть такие типы полей: Float32 и Float64, но они зачастую слишком «float» для большинства реальных задач. Дробные значение лучше хранить в формате Int. В нашем случае перед записью нужно умножить на 1000 и стоит не забывать поделить на 1000 при выборке.

Таблица данных

Учитывая все вышесказанное, структура таблицы выглядит так:

CREATE TABLE advertisements_performance_data
(
    hour UInt32,
    advertisementId UInt64,
    locationId UInt64,
    performanceRatio UInt8,
    samplingRate UInt8
)
ENGINE = MergeTree()
PARTITION BY hour
ORDER BY (advertisementId, locationId, hour)
SETTINGS index_granularity = 8192

Данные из Kafka топика забирает консьюмер. Распаковывает сообщение, так как оно хранит данные сразу по нескольким рекламным блокам.

Оптимизация вставок

Уже на этом этапе можно оптимизировать вставки. Записывать в ClickHouse не отдельными строками по каждому блоку, а по примеру записи в Kafka топик: общие служебные данные + данные по нескольким блокам.

Идея вкратце:

  • пишем в таблицу данные по нескольким блокам одной строкой;
  • распаковываем данные, получаем оценки по блокам;
  • записываем оценки по блокам в целевую таблицу (в ту что, мы уже создали, — advertisements_performance_data).

В этом нам помогут:

  • ARRAY JOIN — распаковка данных по разным блокам внутри ClickHouse;
  • MaterializedView — перекладывание данных в целевую табличку;
  • Engine Null.

ARRAY JOIN

Крайне полезный инструмент ClickHouse. Позволяет выполнять JOIN как с внешним массивом, так и с локальным, находящимся в таблице. Проще объяснить примером.

Внешний массив:

CREATE TABLE gotham
(
    hero String
)
ENGINE = MergeTree()
ORDER BY tuple()

INSERT INTO gotham VALUES
('batman'),
('robin');

SELECT hero
FROM gotham

┌─hero───┐
│ batman │
│ robin  │
└────────┘

SELECT
    hero,
    task
FROM gotham
ARRAY JOIN ['find bad guys', 'do violence', 'save the day'] AS task

┌─hero───┬─task──────────┐
│ batman │ find bad guys │
│ batman │ do violence   │
│ batman │ save the day  │
│ robin  │ find bad guys │
│ robin  │ do violence   │
│ robin  │ save the day  │
└────────┴───────────────┘

Локальный массив:

CREATE TABLE gotham
(
    hero String,
    tasks Array(String)
)
ENGINE = MergeTree()
ORDER BY tuple()


INSERT INTO gotham VALUES
('batman', ['be a billionaire', 'ride a batmobile']),
('robin', ['wash a batmobile']);

SELECT *
FROM gotham

┌─hero───┬─tasks───────────────────────────────────┐
│ batman │ ['be a billionaire','ride a batmobile'] │
│ robin  │ ['wash a batmobile']                    │
└────────┴─────────────────────────────────────────┘

SELECT
    hero,
    task
FROM gotham
ARRAY JOIN tasks AS task

┌─hero───┬─task─────────────┐
│ batman │ be a billionaire │
│ batman │ ride a batmobile │
│ robin  │ wash a batmobile │
└────────┴──────────────────┘

С помощью ARRAY JOIN удобно «распаковывать» входящие данные.

Вернемся к рекламным блокам. Напомню, фича предполагает выборку по конкретному рекламному блоку, а вот хранить данные массивами не очень удобно. Поэтому будет использовано 2 таблицы: одна — для приема данных, вторая — для их хранения. Перекладываться данные будут автоматически с помощью MaterializedView.

MaterializedView

У ClickHouse есть встроенный инструмент под названием view. View делятся на два типа:

1. Обычные: ничего не хранят, по сути, являются сохраненными запросами.

CREATE VIEW gotham_tasks AS
SELECT
    hero,
    task
FROM gotham
ARRAY JOIN tasks AS task

SELECT *
FROM gotham_tasks

┌─hero───┬─task─────────────┐
│ batman │ be a billionaire │
│ batman │ ride a batmobile │
│ robin  │ wash a batmobile │
└────────┴──────────────────┘

2. MaterializedView: могут хранить данные или перекладывать данные на лету в другую таблицу. View собирает данные при вставке строк в таблицу, указанную в SELECT. По сути, MaterializedView работает как триггер на вставку, применяя SELECT к свежим строкам.

CREATE TABLE gotham_tasks_list
(
    hero String,
    task String
)
ENGINE = MergeTree()
ORDER BY tuple()

CREATE MATERIALIZED VIEW gotham_tasks_handler TO gotham_tasks_list
(
    hero String,
    task String
) AS
SELECT
    hero,
    task
FROM gotham
ARRAY JOIN tasks AS task

INSERT INTO gotham VALUES
('batman', ['be a billionaire', 'ride a batmobile']),
('robin', ['wash a batmobile']);

SELECT *
FROM gotham_tasks_list

┌─hero───┬─task─────────────┐
│ batman │ be a billionaire │
│ batman │ ride a batmobile │
│ robin  │ wash a batmobile │
└────────┴──────────────────┘

Данные хранятся в новой таблице gotham_tasks_list. Если удалить view, данные не исчезнут.

DROP TABLE gotham_tasks_handler

SELECT *
FROM gotham_tasks_list

┌─hero───┬─task─────────────┐
│ batman │ be a billionaire │
│ batman │ ride a batmobile │
│ robin  │ wash a batmobile │
└────────┴──────────────────┘

В этом заключается преимущество перекладывания данных в другую таблицу, вместо хранения их в самом view. Скажем, если понадобится остановить запись, нужно будет детачить view. Пока view находится в состоянии детача, нельзя посмотреть данные в нем. Процесс удаления партиций тоже не очень прозрачный, хоть и возможный.

NullEngien

Данные в таблице, которая принимает вставки, никак не используются, кроме как для MaterializedView. Следовательно, их хранить не стоит. Можно использовать движок Null, данные в нем не сохраняются, но MaterializedView успешно забирает данные при вставке.

Структура БД с учетом распаковки данных

CREATE TABLE advertisements_performance_data_stream
(
    timestamp UInt32,
    locationId UInt64,
    samplingRate UInt8,
    `advertisements.id` Array(UInt64),
    `advertisements.performance` Array(UInt8)
)
ENGINE = Null()


CREATE TABLE advertisements_performance_data
(
    hour UInt32,
    advertisementId UInt64,
    locationId UInt64,
    performanceRatio UInt8,
    samplingRate UInt8
)
ENGINE = MergeTree()
PARTITION BY hour
ORDER BY (advertisementId, locationId)
SETTINGS index_granularity = 8192


CREATE MATERIALIZED VIEW advertisements_performance_data_handler TO advertisements_performance_data
(
    hour UInt32,
    advertisementId UInt64,
    locationId UInt64,
    performanceRatio UInt8,
    samplingRate UInt8
) AS
SELECT
    toUnixTimestamp(toStartOfHour(toDateTime(timestamp))) as hour,
    block.id as advertisementId,
    locationId,
    block.performance as performanceRatio,
    samplingRate
FROM advertisements_performance_data_stream
ARRAY JOIN advertisements as block

Итого, получаем сборку таблиц:

  • данные принимает advertisements_performance_data_stream. Таблица не хранит данные, просто принимает их;
  • к строкам, которые приходят на вставку, применяется выборка из advertisements_performance_data_handler;
  • результат выборки сохраняется в advertisements_performance_data.

Как можно заметить, округлением timestamp до прошлого полного часа занимается ClickHouse.

toUnixTimestamp(toStartOfHour(toDateTime(timestamp))) as hour

Пример вставки:

INSERT INTO advertisements_performance_data_stream VALUES
(1535655600, 1, 30, [101,102,103], [1100, 600, 1000])
(1535655600, 1, 30, [201,202,203], [1200, 700, 1150])
(1535655600, 2, 30, [101,102,103], [1300, 800, 1250])

SELECT *
FROM advertisements_performance_data

┌───────hour─┬─advertisementId─┬─locationId─┬─performanceRatio─┬─samplingRate─┐
│ 1535655600 │             101 │          1 │               76 │           30 │
│ 1535655600 │             101 │          2 │               20 │           30 │
│ 1535655600 │             102 │          1 │               88 │           30 │
│ 1535655600 │             102 │          2 │               32 │           30 │
│ 1535655600 │             103 │          1 │              232 │           30 │
│ 1535655600 │             103 │          2 │              226 │           30 │
│ 1535655600 │             201 │          1 │              176 │           30 │
│ 1535655600 │             202 │          1 │              188 │           30 │
│ 1535655600 │             203 │          1 │              126 │           30 │
└────────────┴─────────────────┴────────────┴──────────────────┴──────────────┘

В чем плюсы такого подхода:

  • Данные можно преобразовать на этапе выборки в MaterializedView. В ClickHouse есть огромный набор встроенных функций обработки данных.
  • На единый поток данных можно подвязать несколько MaterializedView. Иногда из общего пула данных нужно выбрать что-то для разных фич или, скажем, собирать в разные таблицы, в силу разности партиционирования.

Выборки

На этапе внедрения данные за 24 часа занимали примерно 1 TB. Размер данных большой, но проводить выборки по всем данным нет необходимости, срезы нужны именно по конкретному рекламному блоку. Ключ сортировки начинается именно с id рекламного блока, что сильно снижает количество строк, которые ClickHouse будет читать при выборке.

Нам нужно провести анализ производительности/успешности рекламного блока, выставить ему оценку. Примерная шкала оценивания:

  • успешный рекламный блок — advertisements_performance_data. performanceRatio выше единицы в 70% и более случаев. Это значит, что в 70% обработанных событий блок показал себя хорошо;
  • хороший блок — performanceRatio выше единицы в 50%;
  • нормальный — 30%;
  • ниже 30% — считаем таковым, который не оправдал надежд.

Нужные данные можно рассчитать через квантиль/персентиль. Описание с Wikipedia:

«Квантиль в математической статистике — значение, которое заданная случайная величина не превышает с фиксированной вероятностью. Если вероятность задана в процентах, то квантиль называется процентилем или перцентилем. Например, фраза „для развитых стран 95-процентиль продолжительности жизни составляет 100 лет“ означает, что ожидается, что 95 % людей не доживут до 100 лет».

Пример о возрасте достаточно исчерпывающий. Также квантили часто используют для замеров ответа приложения.

Что касается оценки рекламных блоков, это выглядит так:

Нужно понять, какова оценка блока на 30-м, 50-м и 70-м квантилях. В ClickHouse есть целое семейство функций, посвященных расчету квантилей — quantile(), а также копия этого семейства для расчета нескольких квантилей сразу — quantiles(). Если нужно вычислить несколько значений, всегда лучше использовать quantiles. Эта функция использует общие обработанные данные для расчета нескольких квантилей. Под этот кейс подходит quantilesExact(level1, level2, ...)(x). level — какой квантиль считать, задается значением 0 < level < 1. x — по какому полю считать. По сравнению с quantiles, quantilesExact вычисляет квантили более точно, хоть и более затратно.

SELECT
    advertisementId,
    quantilesExact(0.3, 0.5, 0.7)(performanceRatio) AS quantiles
FROM advertisements_performance_data
GROUP BY advertisementId

┌─advertisementId─┬─quantiles─────┐
│             203 │ [126,126,126] │
│             101 │ [20,76,76]    │
│             202 │ [188,188,188] │
│             103 │ [226,232,232] │
│             201 │ [176,176,176] │
│             102 │ [32,88,88]    │
└─────────────────┴───────────────┘

Результат квантилей возвращается в виде массива. Напомню, что ClickHouse использует ключи массивов, начиная с 1.

Просчет сразу всех квантилей нужен для потребностей web-интерфейса. На основании этих оценок он сможет дать совет клиенту о том, что нужно сделать для повышения производительности рекламного блока.

Также можно как угодно варьировать группировку и условие выборки. Например, указав несколько геолокаций, получить отчет по конкретному городу или стране за последние 3 часа и т. д.

Оптимизация хранения места

Остался всего один момент, который стоит оптимизировать — данные занимают 1TB.

Улучшить ситуацию нам помогут:

  • SummingMergeTree — движок таблицы для автоматического схлопывания данных;
  • quantilesExactWeighted(level1, level2, ...)(x, weight) — тот же метод расчета квантилей, но с учетом веса записи.

Структура:

CREATE TABLE advertisements_performance_data
(
    hour UInt32,
    advertisementId UInt64,
    locationId UInt64,
    performanceRatio UInt8,
    eventsCount UInt64
)
ENGINE = SummingMergeTree(eventsCount)
PARTITION BY hour
ORDER BY (advertisementId, locationId, performanceRatio)
SETTINGS index_granularity = 8192

CREATE MATERIALIZED VIEW advertisements_performance_data_handler TO advertisements_performance_data
(
    hour UInt32,
    advertisementId UInt64,
    locationId UInt64,
    performanceRatio UInt8,
    eventsCount UInt64
) AS
SELECT
    toUnixTimestamp(toStartOfHour(toDateTime(timestamp))) as hour,
    block.id as advertisementId,
    locationId,
    block.performance as performanceRatio,
    toUInt64(samplingRate) as eventsCount
FROM advertisements_performance_data_stream
ARRAY JOIN advertisements as block

Основные изменения:

  • поле advertisements_performance_data. samplingRate превратилось в eventsCount с типом UInt64. Для хранения количества всех событий нужна более высокая размерность поля.
  • таблица advertisements_performance_data поменяла движок на SummingMergeTree, с суммированием eventsCount. В первичный ключ, он же ключ сортировки, теперь входит фактическое значение оценки рекламного блока. По сути, теперь мы храним все уникальные оценки блока + их количество. Строки будут автоматически суммироваться, а подсчет счетчика eventsCount — это забота ClickHouse.

Заполним таблицу:

# можно выполнить несколько раз
INSERT INTO advertisements_performance_data_stream VALUES
(1535655600, 1, 30, [101,102,103], [1100, 600, 1000])
(1535655600, 1, 30, [201,202,203], [1200, 700, 1150])
(1535655600, 2, 30, [101,102,103], [1300, 800, 1250])

SELECT *
FROM advertisements_performance_data

┌───────hour─┬─advertisementId─┬─locationId─┬─performanceRatio─┬─eventsCount─┐
│ 1535655600 │             101 │          1 │               76 │         150 │
│ 1535655600 │             101 │          2 │               20 │         150 │
│ 1535655600 │             102 │          1 │               88 │         150 │
│ 1535655600 │             102 │          2 │               32 │         150 │
│ 1535655600 │             103 │          1 │              232 │         150 │
│ 1535655600 │             103 │          2 │              226 │         150 │
│ 1535655600 │             201 │          1 │              176 │         150 │
│ 1535655600 │             202 │          1 │              188 │         150 │
│ 1535655600 │             203 │          1 │              126 │         150 │
└────────────┴─────────────────┴────────────┴──────────────────┴─────────────┘

Здесь уже видно, что данные схлопнулись, вставляем мы с samplingRate 30, а eventsCount уже 150, да и лишних записей нет.

Пример выборки:

SELECT
    advertisementId,
    quantilesExactWeighted(0.3, 0.5, 0.7)(performanceRatio, eventsCount) AS quantiles
FROM advertisements_performance_data
GROUP BY advertisementId

┌─advertisementId─┬─quantiles─────┐
│             203 │ [126,126,126] │
│             101 │ [20,20,76]    │
│             202 │ [188,188,188] │
│             103 │ [226,226,232] │
│             201 │ [176,176,176] │
│             102 │ [32,32,88]    │
└─────────────────┴───────────────┘

Положительный момент также заключается в том, что если резко изменится samplingRate на входе, то запрос поведет себя корректно: он учтет тот факт, что новые записи имеют другой вес.

Благодаря тому, что значения оценок колеблются вокруг единицы, такой подход уменьшает размер данных с 1 TB до 30 GB, что также положительно сказалось на выборках. Сканировать, как не крути, теперь нужно меньше.

Коротко о консьюмере

Консьюмер принимает данные из Kafka и пишет в ClickHouse. Пишем данные через GitHub — roistat/go-clickhouse: Golang ClickHouse connector пакетами по 100k, параллельными потоками. Потоков может быть до 12-ти, стартуют потоки не чаще, чем раз в секунду. Такие настройки выведены вручную опытным путем. Увеличение и уменьшение размера пакета негативно сказывались на скорости вставки. ¯_(ツ)_/¯ В самую крупную ноду пишем порядка 2kkk вставок за 24 часа, что примерно — 71kk в час. Визуализация из Datadog:

Production

Теория — это хорошо но, что мы имеем на проде.

Конфигурация сервера в Google Cloud:

  • n1-standard-16 (16 виртуальных ЦП, 60 ГБ памяти) — для менее нагруженных нод;
  • n1-standard-32 (32 виртуальных ЦП, 120 ГБ памяти) — для основных.

Занимаемое место:

SELECT
    table,
    formatReadableSize(SUM(bytes)) AS tableSize
FROM system.parts
WHERE active
GROUP BY table
ORDER BY tableSize DESC

┌─table────────────────┬─tableSize─┐
│ auction_data_teasers │ 36.19 GiB │
└──────────────────────┴───────────┘

Количество строк и хранимых ивентов в них:

SELECT
    COUNT(),
    SUM(auctions)
FROM auction_data_teasers

┌─────COUNT()─┬─SUM(auctions)─┐
│ 11533653287 │ 3883532374770 │
└─────────────┴───────────────┘

1 rows in set. Elapsed: 313.706 sec. Processed 11.53 billion rows, 92.27 GB (36.77 million rows/s., 294.13 MB/s.)

11kkk строк, которые хранят почти 3.8kkkk ивентов, c учетом семплирования.

Показатели выборки:

11 rows in set. Elapsed: 0.752 sec. Processed 4.51 million rows, 108.33 MB (6.00 million rows/s., 144.03 MB/s.)

Заключение

За 2 года использования ClickHouse на продакшине у нас были как стремительные взлеты, так и взлеты пониже. Но все же мы склоняемся к тому, чтобы использовать его активнее. ClickHouse — хороший инструмент для ряда задач.

И напоследок — несколько полезных ссылок:

Похожие статьи:
Начинаем подводить итоги года. Любой, кто следит за украинской новостной лентой, твердо знает, что вокруг все плохо. Но только...
Вітаю! Я Петро Грабовський, Software Engineer у компанії EPAM. Декілька років тому розробники переважно обговорювали доцільність...
У свіжому дайджесті DOU News обговорюємо, що буде з законом про підвищення податків. А ще — скорочення в компанії RIA.com,...
Міністерство юстиції продовжує роботу над відновленням державних реєстрів після кібератаки росії. Наразі працюють...
Ілля Пелих, Senior Java Developer у Ciklum, загинув під час захисту України від російських окупантів. 40-річний уродженець міста...
Яндекс.Метрика