diff --git a/aedes.js b/aedes.js index e47abd2f..d0bbf5ab 100644 --- a/aedes.js +++ b/aedes.js @@ -27,6 +27,7 @@ const defaultOptions = { authorizeForward: defaultAuthorizeForward, published: defaultPublished, trustProxy: false, + sharedTopics: false, trustedProxies: [], queueLimit: 42, maxClientsIdLength: 23 @@ -75,6 +76,8 @@ function Aedes (opts) { this.trustProxy = opts.trustProxy this.trustedProxies = opts.trustedProxies + this.sharedTopics = opts.sharedTopics + this.clients = {} this.brokers = {} diff --git a/lib/client.js b/lib/client.js index 6501e938..faaa7004 100644 --- a/lib/client.js +++ b/lib/client.js @@ -144,6 +144,27 @@ function Client (broker, conn, req) { } } + this.deliverShared = function (_packet, cb) { + const sharedSub = that.subscriptions[_packet.topic] + // this function will return the next client that should send the update based on round robin algorithm + // get first subscription $share// with lower lastUpdate + that.broker.persistence.nextSharedSubscription(_packet.topic, sharedSub.group, function (err, subscription) { + if (err) { + cb(new Error('Error while fetching shared subscription: ' + err.message)) + } else { + if (subscription.clientId === that.id) { // this client should send an update + that.deliverQoS(_packet, function (err) { + if (err) { + cb(new Error('Error while updating shared subscription: ' + err.message)) + } else { + that.broker.persistence.updateSharedSubscription(that, _packet.topic, sharedSub.group, cb) + } + }) + } + } + }) + } + this._keepaliveTimer = null this._keepaliveInterval = -1 diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 886eab1b..ee0335ae 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -3,7 +3,7 @@ const fastfall = require('fastfall') const Packet = require('aedes-packet') const through = require('through2') -const { validateTopic } = require('../utils') +const { validateTopic, sharedTopic } = require('../utils') const write = require('../write') const subscribeTopicActions = fastfall([ @@ -27,6 +27,12 @@ function Subscription (qos, func) { this.func = func } +function SharedSubscription (qos, func, group) { + this.qos = qos + this.func = func + this.group = group +} + function SubscribeState (client, packet, restore, finish, granted) { this.client = client this.packet = packet @@ -115,24 +121,45 @@ function addSubs (sub, done) { const client = this.client const broker = client.broker - const topic = sub.topic + var topic = sub.topic const qos = sub.qos var func = qos > 0 ? client.deliverQoS : client.deliver0 + var group = null // [MQTT-4.7.2-1] if (isStartsWithWildcard(topic)) { func = blockDollarSignTopics(func) } - if (!client.subscriptions[topic]) { - client.subscriptions[topic] = new Subscription(qos, func) - broker.subscribe(topic, func, done) - } else if (client.subscriptions[topic].qos !== qos) { - broker.unsubscribe(topic, client.subscriptions[topic].func) - client.subscriptions[topic] = new Subscription(qos, func) - broker.subscribe(topic, func, done) - } else { - done() + if (broker.sharedTopics) { + const parsedTopic = sharedTopic(topic) + if (parsedTopic) { + topic = parsedTopic.topic + func = client.deliverShared + group = parsedTopic.group + broker.persistence.addSharedSubscription(client, sub, function addSub (err) { + if (err) { + done(err) + } else { + finish() + } + }) + } else { + finish() + } + } else finish() + + function finish () { + if (!client.subscriptions[topic]) { + client.subscriptions[topic] = group ? new SharedSubscription(qos, func, group) : new Subscription(qos, func) + broker.subscribe(topic, func, done) + } else if (client.subscriptions[topic].qos !== qos) { + broker.unsubscribe(topic, client.subscriptions[topic].func) + client.subscriptions[topic] = group ? new SharedSubscription(qos, func, group) : new Subscription(qos, func) + broker.subscribe(topic, func, done) + } else { + done() + } } } diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 16571bd6..79f9674d 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,7 +1,7 @@ 'use strict' const write = require('../write') -const { validateTopic } = require('../utils') +const { validateTopic, sharedTopic } = require('../utils') function UnsubscribeState (client, packet, finish) { this.client = client @@ -46,17 +46,36 @@ function actualUnsubscribe (client, packet, done) { function doUnsubscribe (sub, done) { const client = this.client const broker = client.broker - const s = client.subscriptions[sub] - - if (s) { - var func = s.func - delete client.subscriptions[sub] - broker.unsubscribe( - sub, - func, - done) - } else { - done() + + if (broker.sharedTopics) { + const parsedTopic = sharedTopic(sub) + if (parsedTopic) { + sub = parsedTopic.topic + broker.persistence.removeSharedSubscription(client, sub, function removeSub (err) { + if (err) { + done(err) + } else { + finish() + } + }) + } else { + finish() + } + } else finish() + + function finish () { + const s = client.subscriptions[sub] + + if (s) { + var func = s.func + delete client.subscriptions[sub] + broker.unsubscribe( + sub, + func, + done) + } else { + done() + } } } diff --git a/lib/utils.js b/lib/utils.js index 9d3e5b28..80d879f9 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -26,6 +26,18 @@ function validateTopic (topic, message) { } } +function sharedTopic (topic) { + if (!topic || !topic.startsWith('$share/')) return null + + var group = topic.substring(7, topic.indexOf('/', 7)) + + return { + group: group, + topic: topic.substring(8 + group.length) + } +} + module.exports = { - validateTopic + validateTopic, + sharedTopic }