RabbitMQ
Движок работает с RabbitMQ.
RabbitMQ позволяет:
- Публиковать/подписываться на потоки данных.
- Обрабатывать потоки по мере их появления.
Создание таблицы
Обязательные параметры:
rabbitmq_host_port– адрес сервера (хост:порт). Например:localhost:5672.rabbitmq_exchange_name– имя точки обмена в RabbitMQ.rabbitmq_format– формат сообщения. Используется такое же обозначение, как и в функцииFORMATв SQL, например,JSONEachRow. Подробнее см. в разделе Форматы входных и выходных данных.
Дополнительные параметры:
rabbitmq_exchange_type– тип точки обмена в RabbitMQ:direct,fanout,topic,headers,consistent_hash. По умолчанию:fanout.rabbitmq_routing_key_list– список ключей маршрутизации, через запятую.rabbitmq_row_delimiter– символ-разделитель, который завершает сообщение.rabbitmq_schema– опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap’n Proto требует путь к файлу со схемой и название корневого объектаschema.capnp:Message.rabbitmq_num_consumers– количество потребителей на таблицу. По умолчанию:1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.rabbitmq_num_queues– количество очередей. По умолчанию:1. Большее число очередей может сильно увеличить пропускную способность.rabbitmq_queue_base- настройка для имен очередей. Сценарии использования описаны ниже.rabbitmq_persistent- флаг, от которого зависит настройка 'durable' для сообщений при запросахINSERT. По умолчанию:0.rabbitmq_skip_broken_messages– максимальное количество некорректных сообщений в блоке. Еслиrabbitmq_skip_broken_messages = N, то движок отбрасываетNсообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.rabbitmq_max_block_sizerabbitmq_flush_interval_msrabbitmq_queue_settings_list- позволяет самостоятельно установить настройки RabbitMQ при создании очереди. Доступные настройки:x-max-length,x-max-length-bytes,x-message-ttl,x-expires,x-priority,x-max-priority,x-overflow,x-dead-letter-exchange,x-queue-type. Настрокаdurableдля очереди ставится автоматически.
Настройки форматов данных также могут быть добавлены в списке RabbitMQ настроек.
Example:
Конфигурация сервера RabbitMQ добавляется с помощью конфигурационного файла ClickHouse.
Требуемая конфигурация:
Дополнительная конфигурация:
Описание
Запрос SELECT не очень полезен для чтения сообщений (за исключением отладки), поскольку каждое сообщение может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:
- Создайте потребителя RabbitMQ с помощью движка и рассматривайте его как поток данных.
- Создайте таблицу с необходимой структурой.
- Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.
Когда к движку присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от RabbitMQ и преобразовывать их в необходимый формат с помощью SELECT.
У одной таблицы RabbitMQ может быть неограниченное количество материализованных представлений.
Данные передаются с помощью параметров rabbitmq_exchange_type и rabbitmq_routing_key_list.
Может быть не более одной точки обмена на таблицу. Одна точка обмена может использоваться несколькими таблицами: это позволяет выполнять маршрутизацию по нескольким таблицам одновременно.
Параметры точек обмена:
direct- маршрутизация основана на точном совпадении ключей. Пример списка ключей:key1,key2,key3,key4,key5. Ключ сообщения может совпадать с одним из них.fanout- маршрутизация по всем таблицам, где имя точки обмена совпадает, независимо от ключей.topic- маршрутизация основана на правилах с ключами, разделенными точками. Например:*.logs,records.*.*.2020,*.2018,*.2019,*.2020.headers- маршрутизация основана на совпаденииkey=valueс настройкойx-match=allилиx-match=any. Пример списка ключей таблицы:x-match=all,format=logs,type=report,year=2020.consistent_hash- данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ:rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
Настройка rabbitmq_queue_base может быть использована в следующих случаях:
- чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в
rabbitmq_queue_baseнастройку и не указывать настройкиrabbitmq_num_consumersиrabbitmq_num_queues. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек:rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues. По умолчанию, если настройкаrabbitmq_queue_baseне указана, будут использованы уникальные для каждой таблицы имена очередей. - чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек:
rabbitmq_num_consumers,rabbitmq_num_queues. - чтобы повторно использовать созданные c
durableнастройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).
Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Если параметрыrabbitmq_num_consumers и/или rabbitmq_num_queues заданы вместе с параметром rabbitmq_exchange_type:
- плагин
rabbitmq-consistent-hash-exchangeдолжен быть включен. - свойство
message_idдолжно быть определено (уникальное для каждого сообщения/пакета).
При запросах INSERT отправляемым сообщениям добавляются метаданные: messageID и флаг republished - доступны через заголовки сообщений (headers).
Для запросов чтения и вставки не должна использоваться одна и та же таблица.
Пример:
Virtual Columns
_exchange_name- имя точки обмена RabbitMQ._channel_id- идентификатор каналаChannelID, на котором было получено сообщение._delivery_tag- значениеDeliveryTagполученного сообщения. Уникально в рамках одного канала._redelivered- флагredelivered. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)_message_id- значение поляmessageIDполученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения._timestamp- значение поляtimestampполученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.