Описание подписок на основе селекторов
Предпосылка
Усложнение критериев выборки интересующих подписчика событий, например в задачах трекинга необходима подписка по событиям внутри полигона.
Подписки на селекторах должны обеспечить высокую степень управляемости очередей сообщений обусловленную использованием специальных объектов для конфигурирования очереди, а не компонентов составного имени темы.
Термины
Селектор - объект проверяющий сообщение и возможно его контекст на соответствие некоторому условию, которое и является сутью селектора. Простые селекторы могут проверять например равенство некоторого поля сообщения заданному значению, другие - составные, содержать другие селекторы и производить над ними операции AND, OR, NOT. То есть компоновать другие селекторы в дерево булевого выражения.
Сообщение - объект описывающий изменение параметра базы данных
Контекст - объект или свойство объекта не входящий в объект-сообщение но связанный с ним.
Текущие дата и время, пользователь отправивший изменения, объект сервера, связи объекта - всё это примеры контекстов.
Для эффективной работы с контекстами нужна модель в памяти firefly, другие более просты, например текущие дата и время.
Мутатор - объект трансформирующий сообщение, добавляющий или удаляющий поля в соответствии с некоторым критерием или безусловно. Концепция мутаторов припасена на будущее, в запланированной реализации скорее всего найдёт ограниченное применение. Но вообще, конвейерная обработка сообщений на одном или разных узлах сети firefly открывает новые возможности постепенного добавления в сообщение новых данных и гибкого перераспределения информационных потоков.
Как это работает:
- Обходим текущие темы сообщений;
- Каждой теме соответствует единственный, возможно композитный селектор;
- Каждое сообщение транзакции проходит проверку на соответствие критериями селектора текущей
очереди и в случае успеха добавляется в массив транзакции для текущей темы;
- Когда транзакции для каждой темы сформированы производится отправка данных (publish);
Управление темами
- com.integra.open_topic(selector) где selector это сериализованный в JSON селектор.
- Возвращает имя подписки соответствующей переданному селектору при успехе.
- Имя содержит UUID что гарантирует её недоступность для других пользователей.
- com.integra.close_topic(topic_name) где topic_name это имя подписки которое вернулось при открытии темы.
- Возвращает имя подписки при успехе.
- Закрыть подписку может только владелец.
Автоматическое закрытие тем
Если на тему никто не подписался то она закроется через минуту.
Если у подписки были подписчики, а потом все отписались то она закроется автоматически не позже чем через query_timout секунд.
Наименование типа | Композитный | Параметры |
PropertyEqualitySelector | нет | name - имя свойства
value - значение свойства |
AndCompositeSelector | да | массив вложенных селекторов |
PositionInPolygonSelector | нет | polygon_text - текстовое представление полигона(как в PostgreSQL) |
ValueInListSelector | нет | name - имя свойства-массива
value - значение |
Пример селектора
1 { 2 "$type": "AndCompositeSelector", 3 "selectors": [ 4 { 5 "$type": "PropertyEqualitySelector", 6 "name": "name", 7 "value": "ip" 8 }, 9 { 10 "$type": "PropertyEqualitySelector", 11 "name": "value", 12 "value": "127.0.0.1" 13 }, 14 { 15 "$type": "PositionInPolygonSelector", 16 "polygon_text": "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))" 17 }, 18 { 19 "$type": "ValueInListSelector", 20 "name": "users", 21 "value": 5 22 } 23 ] 24 }
Пример транзакции из одного изменения проходящего проверку селектором из примера
1 [ 2 { 3 "server": "76606d62-a514-11e4-8c37-a7064178f246", 4 "txid": 843037, 5 "version": 51425717, 6 "id": "883c24e2-02cb-4988-8e07-3c8bd24654da", 7 "position": "POINT (0.1 0.2)", 8 "name": "ip", 9 "value": "127.0.0.1", 10 "datetime": "2015-09-02 06:24:06.844", 11 "types": [ 12 "Item", 13 "Computer", 14 "Object", 15 "DuoEthernetDevice", 16 "EthernetDevice", 17 "PowerConsumer", 18 "MssControlable", 19 "SpatialObject", 20 "BaseObject" 21 ], 22 "dict": { "a": "b" }, 23 "users": [ 24 null, 25 13, 26 5 27 ] 28 } 29 ]
Пример работы с подписками на селекторах:
1. Подписались на изменения загрузки процессора:
- 1 firefly.WAMP.p_com_integra_open_topic({"$type": "PropertyEqualitySelector","name": "name","value": "cpu_usage"}).
- 2 then(function(res) { console.log(res); }, function(res) { console.log(res); });
2. Нам вернётся имя темы на которую надо подписаться:
- 1 firefly.session.subscribe('com.integra.selector_subscriptions.027004e8aed04834bf45bcda18ea2ccf', function(res) { console.log('test sub', res); } ).
- 2 then(function(res) { console.log(res); }, function(res) { console.log(res); })
3. Поменяем что нибудь подходящее в базе. Нужно помнить, что изменения из базы больше не вычитываются и для тестов надо использовать apply_changes:
- 1 firefly.WAMP.p_com_integra_apply_changes('76606d62-a514-11e4-8c37-a7064178f246', [{"id": "4b618576-ed74-413a-919d-a3e127cbe98f","name": "cpu_usage","value" : 77,"datetime": null,"dict": null}]).
- 2 then(function(r) { console.log(r); },function(e) { console.log(e); })
4. В консоли должен появиться вывод test sub и далее данные.