Skip to content

Commit 9bfc913

Browse files
uladzimir-dankoUladzimir Danko
authored andcommitted
fix: memory leak in connected clients counter (moscajs#979)
Co-authored-by: Uladzimir Danko <uladzimirdanko@uladzimirsmbp14.home>
1 parent 90d35cd commit 9bfc913

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

aedes.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ function Aedes (opts) {
172172
if (that.clients[clientId] && serverId !== that.id) {
173173
if (that.clients[clientId].closed) {
174174
// remove the client from the list if it is already closed
175-
delete that.clients[clientId]
175+
that.deleteClient(clientId)
176176
done()
177177
} else {
178178
that.clients[clientId].close(done)
@@ -316,15 +316,19 @@ Aedes.prototype._finishRegisterClient = function (client) {
316316
}
317317

318318
Aedes.prototype.unregisterClient = function (client) {
319-
this.connectedClients--
320-
delete this.clients[client.id]
319+
this.deleteClient(client.id)
321320
this.emit('clientDisconnect', client)
322321
this.publish({
323322
topic: $SYS_PREFIX + this.id + '/disconnect/clients',
324323
payload: Buffer.from(client.id, 'utf8')
325324
}, noop)
326325
}
327326

327+
Aedes.prototype.deleteClient = function (clientId) {
328+
this.connectedClients--
329+
delete this.clients[clientId]
330+
}
331+
328332
function closeClient (client, cb) {
329333
this.clients[client].close(cb)
330334
}

lib/handlers/subscribe.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ function addSubs (sub, done) {
155155
func = blockDollarSignTopics(func)
156156
}
157157

158+
if (client.closed || client.broker.closed) {
159+
// a hack, sometimes client.close() or broker.close() happened
160+
// before authenticate() comes back
161+
// we don't continue subscription here
162+
return
163+
}
164+
158165
if (!client.subscriptions[topic]) {
159166
client.subscriptions[topic] = new Subscription(qos, func, rh, rap, nl)
160167
broker.subscribe(topic, func, done)

test/events.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,18 +223,20 @@ test('Test backpressure aedes published function', function (t) {
223223
})
224224

225225
test('clear closed clients when the same clientId is managed by another broker', function (t) {
226-
t.plan(1)
226+
t.plan(2)
227227

228228
const clientId = 'closed-client'
229-
const broker = aedes()
229+
const aedesBroker = aedes()
230230

231231
// simulate a closed client on the broker
232-
broker.clients[clientId] = { closed: true }
232+
aedesBroker.clients[clientId] = { closed: true, broker: aedesBroker }
233+
aedesBroker.connectedClients = 1
233234

234235
// simulate the creation of the same client on another broker of the cluster
235-
broker.publish({ topic: '$SYS/anotherbroker/new/clients', payload: clientId }, () => {
236-
t.equal(broker.clients[clientId], undefined) // check that the closed client was removed
236+
aedesBroker.publish({ topic: '$SYS/anotherbroker/new/clients', payload: clientId }, () => {
237+
t.equal(aedesBroker.clients[clientId], undefined) // check that the closed client was removed
238+
t.equal(aedesBroker.connectedClients, 0)
237239
})
238240

239-
t.teardown(broker.close.bind(broker))
241+
t.teardown(aedesBroker.close.bind(aedesBroker))
240242
})

0 commit comments

Comments
 (0)