Skip to content

Commit f6a09bf

Browse files
authored
feat!: migrate to async persistence (#1034)
This PR migrates Aedes to use the async persistence interface. It contains the following sub items: - [X] replace all callback calls to persistence by `.then()` calls - [X] move all side effects (setting up persistence, attaching event handlers etc) to a `listen()` method of Aedes - [X] update `createBroker()` to be async and make it await `listen()` as well - [X] update exports so that they produce a warning if people use old style calling - [X] update Typescript typing - [X] update documentation including examples - [X] add a `migration.md` to help users migrate. - [X] do some benchmark tests to see performances are kept - [X] update node engines version to >=20 - [X] updates all dependencies to current except for Tap and @sinonjs/fake-timers BREAKING CHANGES: See [docs](https://github.com/moscajs/aedes/blob/main/docs/MIGRATION.md)
1 parent 04e391f commit f6a09bf

37 files changed

+5967
-5555
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ Check the [Docker docs](https://github.com/moscajs/aedes-cli#docker)
5656

5757
## API
5858

59-
- [Aedes object](./docs/Aedes.md)
60-
- [Client object](./docs/Client.md)
59+
- [Aedes class](./docs/Aedes.md)
60+
- [Client class](./docs/Client.md)
6161

6262
## Features
6363

aedes.d.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
import Aedes, { AedesOptions } from './types/instance'
2-
3-
export declare function createBroker (options?: AedesOptions): Aedes
4-
51
export * from './types/instance'
62
export * from './types/packet'
73
export * from './types/client'
8-
9-
export { default } from './types/instance'

aedes.js

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class Aedes extends EventEmitter {
3838
const that = this
3939

4040
opts = Object.assign({}, defaultOptions, opts)
41+
this.opts = opts
4142
this.id = opts.id || uuidv4()
4243
// +1 when construct a new aedes-packet
4344
// internal track for last brokerCounter
@@ -56,8 +57,7 @@ class Aedes extends EventEmitter {
5657
// return, just to please standard
5758
return new Client(that, conn, req)
5859
}
59-
this.persistence = opts.persistence || memory()
60-
this.persistence.broker = this
60+
6161
this._parallel = parallel()
6262
this._series = series()
6363
this._enqueuers = reusify(DoEnqueues)
@@ -75,6 +75,22 @@ class Aedes extends EventEmitter {
7575

7676
this.clients = {}
7777
this.brokers = {}
78+
this.closed = true
79+
}
80+
81+
async listen () {
82+
const opts = this.opts
83+
const that = this
84+
85+
// metadata
86+
this.connectedClients = 0
87+
this.closed = false
88+
89+
this.persistence = opts.persistence || memory()
90+
if (this.persistence.setup.constructor.name !== 'AsyncFunction') {
91+
throw new Error('persistence.setup() must be an async function')
92+
}
93+
await this.persistence.setup(this)
7894

7995
const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat'
8096
const birthTopic = $SYS_PREFIX + that.id + '/birth'
@@ -137,7 +153,7 @@ class Aedes extends EventEmitter {
137153
that.persistence.delWill({
138154
id: will.clientId,
139155
brokerId: will.brokerId
140-
}, done)
156+
}).then(will => done(undefined, will), done)
141157
}
142158
})
143159
}
@@ -176,18 +192,16 @@ class Aedes extends EventEmitter {
176192
done()
177193
}
178194
})
179-
180-
// metadata
181-
this.connectedClients = 0
182-
this.closed = false
183195
}
184196

185197
get version () {
186198
return version
187199
}
188200

189-
static createBroker (opts) {
190-
return new Aedes(opts)
201+
static async createBroker (opts) {
202+
const aedes = new Aedes(opts)
203+
await aedes.listen()
204+
return aedes
191205
}
192206

193207
publish (packet, client, done) {
@@ -263,7 +277,8 @@ class Aedes extends EventEmitter {
263277

264278
function storeRetained (packet, done) {
265279
if (packet.retain) {
266-
this.broker.persistence.storeRetained(packet, done)
280+
this.broker.persistence.storeRetained(packet)
281+
.then(() => done(null), done)
267282
} else {
268283
done()
269284
}
@@ -281,10 +296,8 @@ function enqueueOffline (packet, done) {
281296
enqueuer.packet = packet
282297
enqueuer.topic = packet.topic
283298
enqueuer.broker = this.broker
284-
this.broker.persistence.subscriptionsByTopic(
285-
packet.topic,
286-
enqueuer.done
287-
)
299+
this.broker.persistence.subscriptionsByTopic(packet.topic)
300+
.then(subs => enqueuer.done(null, subs), enqueuer.done)
288301
}
289302

290303
class DoEnqueues {
@@ -319,7 +332,8 @@ class DoEnqueues {
319332
that.complete = null
320333
that.topic = null
321334

322-
broker.persistence.outgoingEnqueueCombi(subs, packet, complete)
335+
broker.persistence.outgoingEnqueueCombi(subs, packet)
336+
.then(() => complete(null), complete)
323337
broker._enqueuers.release(that)
324338
}
325339
}
@@ -390,6 +404,14 @@ class PublishState {
390404

391405
function noop () {}
392406

393-
module.exports = Aedes.createBroker
407+
function warnMigrate () {
408+
throw new Error(
409+
` Aedes default export has been removed.
410+
Use 'const aedes = await Aedes.createBroker()' instead.
411+
See: https://github.com/moscajs/aedes/docs/MIGRATION.MD
412+
`)
413+
}
414+
415+
module.exports = warnMigrate
394416
module.exports.createBroker = Aedes.createBroker
395417
module.exports.Aedes = Aedes

benchmarks/server.js

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,27 @@
22

33
// To be used with cpuprofilify http://npm.im/cpuprofilify
44

5-
const aedes = require('../')()
6-
const server = require('net').createServer(aedes.handle)
7-
const port = 1883
5+
const { Aedes } = require('../')
86

9-
server.listen(port, function () {
10-
console.error('server listening on port', port, 'pid', process.pid)
11-
})
7+
Aedes.createBroker().then(aedes => {
8+
const server = require('net').createServer(aedes.handle)
9+
const port = 1883
1210

13-
aedes.on('clientError', function (client, err) {
14-
console.error('client error', client.id, err.message)
15-
})
11+
server.listen(port, function () {
12+
console.error('server listening on port', port, 'pid', process.pid)
13+
})
1614

17-
// Cleanly shut down process on SIGTERM to ensure that perf-<pid>.map gets flushed
18-
process.on('SIGINT', onSIGINT)
15+
aedes.on('clientError', function (client, err) {
16+
console.error('client error', client.id, err.message)
17+
})
1918

20-
function onSIGINT () {
19+
// Cleanly shut down process on SIGTERM to ensure that perf-<pid>.map gets flushed
20+
process.on('SIGINT', onSIGINT)
21+
22+
function onSIGINT () {
2123
// IMPORTANT to log on stderr, to not clutter stdout which is purely for data, i.e. dtrace stacks
22-
console.error('Caught SIGTERM, shutting down.')
23-
server.close()
24-
process.exit(0)
25-
}
24+
console.error('Caught SIGTERM, shutting down.')
25+
server.close()
26+
process.exit(0)
27+
}
28+
})

docs/Aedes.md

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
# Aedes
33

44
- [Aedes](#aedes)
5-
- [new Aedes([options]) / new Aedes.Server([options])](#new-aedesoptions--new-aedesserveroptions)
5+
- [new Aedes([options])](#new-aedesoptions)
6+
- [Aedes.createBroker([options])](#aedescreatebrokeroptions)
7+
- [aedes.listen()](#aedeslisten)
68
- [aedes.id](#aedesid)
79
- [aedes.connectedClients](#aedesconnectedclients)
810
- [aedes.closed](#aedesclosed)
@@ -31,12 +33,13 @@
3133
- [Handler: authorizeForward (client, packet)](#handler-authorizeforward-client-packet)
3234
- [Handler: published (packet, client, callback)](#handler-published-packet-client-callback)
3335

34-
## new Aedes([options]) / new Aedes.Server([options])
36+
## new Aedes([options])
3537

3638
- options `<object>`
3739
- `mq` [`<MQEmitter>`](../README.md#mqemitter) middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. __Default__: `mqemitter`
3840
- `concurrency` `<number>` maximum number of concurrent messages delivered by `mq`. __Default__: `100`
3941
- `persistence` [`<Persistence>`](../README.md#persistence) middleware that stores _QoS > 0, retained, will_ packets and _subscriptions_. __Default__: `aedes-persistence` (_in memory_)
42+
Versions 1.x and above require persistence to support async access,see [MIGRATION.md][MIGRATION] for details.
4043
- `queueLimit` `<number>` maximum number of queued messages before client session is established. If number of queued items exceeds, `connectionError` throws an error `Client queue limit reached`. __Default__: `42`
4144
- `maxClientsIdLength` option to override MQTT 3.1.0 clients Id length limit. __Default__: `23`
4245
- `heartbeatInterval` `<number>` an interval in millisconds at which server beats its health signal in `$SYS/<aedes.id>/heartbeat` topic. __Default__: `60000`
@@ -45,9 +48,36 @@
4548
- `keepaliveLimit` `<number>` maximum client keep alive time allowed, 0 means no limit. __Default__: `0`
4649
- Returns `<Aedes>`
4750

48-
Create a new Aedes server.
51+
Create a new Aedes server instance.
4952

50-
Aedes is the class and function exposed by this module. It can be created by `Aedes()` or using `new Aedes()`. An variant `aedes.Server` is for TypeScript or ES modules.
53+
Aedes is the class exported by this module.
54+
The instance will only start listening after [aedes.listen()](#aedeslisten) is called.
55+
The recommended way to start an Aedes server is to use [Aedes.createBroker([options])](#aedescreatebrokeroptions) instead.
56+
57+
## Aedes.createBroker([options])
58+
59+
An async static method in the Aedes class which creates the instance and automatically awaits `listen()`.
60+
61+
Using `Aedes.createBroker([options])` is the recommended way to start Aedes, example:
62+
63+
```js
64+
const aedes = await Aedes.createBroker([options])
65+
```
66+
67+
It uses the same options as [new Aedes([options])](#new-aedesoptions)
68+
69+
## aedes.listen()
70+
71+
Async method to make the aedes instance start listening.
72+
Example:
73+
74+
```js
75+
const aedes = new Aedes([options])
76+
await aedes.listen()
77+
```
78+
79+
You should typically not need to use this as it is more compact to use
80+
[Aedes.createBroker([options])](#aedescreatebrokeroptions) instead.
5181

5282
## aedes.id
5383

@@ -173,7 +203,9 @@ Emitted when server is closed.
173203
A connection listener that pipe stream to aedes.
174204

175205
```js
176-
const aedes = require('./aedes')()
206+
const { Aedes } = require('/aedes')
207+
const aedes = await Aedes.createBroker()
208+
177209
const server = require('net').createServer(aedes.handle)
178210
```
179211

@@ -414,3 +446,4 @@ same as [`Event: publish`](#event-publish), but provides a backpressure function
414446
[PINGREQ]: https://github.com/mqttjs/mqtt-packet#pingreq
415447
[PUBLISH]: https://github.com/mqttjs/mqtt-packet#publish
416448
[PUBREL]: https://github.com/mqttjs/mqtt-packet#pubrel
449+
[MIGRATION]: MIGRATION.md

docs/Examples.md

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,13 @@
44
## Simple plain MQTT server
55

66
```js
7-
const aedes = require('aedes')()
8-
const server = require('net').createServer(aedes.handle)
7+
8+
const { Aedes } = require('aedes')
9+
const { createServer } = require('net')
10+
11+
const aedes = await Aedes.createBroker()
912
const port = 1883
13+
const server = createServer(aedes.handle)
1014

1115
server.listen(port, function () {
1216
console.log('server started and listening on port ', port)
@@ -21,7 +25,7 @@ import { createServer } from 'net'
2125

2226
const port = 1883
2327

24-
const aedes = new Aedes()
28+
const aedes = await Aedes.createBroker()
2529
const server = createServer(aedes.handle)
2630

2731
server.listen(port, function () {
@@ -32,10 +36,11 @@ server.listen(port, function () {
3236
## Simple plain MQTT server using server-factory
3337

3438
```js
35-
const aedes = require('aedes')()
39+
const { Aedes } = require('aedes')
3640
const { createServer } = require('aedes-server-factory')
3741
const port = 1883
3842

43+
const aedes = await Aedes.createBroker()
3944
const server = createServer(aedes)
4045

4146
server.listen(port, function () {
@@ -47,14 +52,14 @@ server.listen(port, function () {
4752

4853
```js
4954
const fs = require('fs')
50-
const aedes = require('aedes')()
55+
const { Aedes } = require('aedes')
5156
const port = 8883
5257

5358
const options = {
5459
key: fs.readFileSync('YOUR_PRIVATE_KEY_FILE.pem'),
5560
cert: fs.readFileSync('YOUR_PUBLIC_CERT_FILE.pem')
5661
}
57-
62+
const aedes = await Aedes.createBroker()
5863
const server = require('tls').createServer(options, aedes.handle)
5964

6065
server.listen(port, function () {
@@ -65,11 +70,13 @@ server.listen(port, function () {
6570
## MQTT server over WebSocket
6671

6772
```js
68-
const aedes = require('aedes')()
69-
const httpServer = require('http').createServer()
73+
const { Aedes } = require('aedes')
74+
const { createServer } require('http')
7075
const ws = require('ws')
7176
const port = 8888
7277

78+
const aedes = await Aedes.createBroker()
79+
const httpServer = createServer()
7380
const wss = new ws.WebSocketServer({
7481
server:httpServer
7582
})
@@ -87,10 +94,11 @@ httpServer.listen(port, function () {
8794
## MQTT server over WebSocket using server-factory
8895

8996
```js
90-
const aedes = require('aedes')()
97+
const { Aedes } = require('aedes')
9198
const { createServer } = require('aedes-server-factory')
9299
const port = 8888
93100

101+
const aedes = await Aedes.createBroker()
94102
const httpServer = createServer(aedes, { ws: true })
95103

96104
httpServer.listen(port, function () {

0 commit comments

Comments
 (0)