Skip to content

Commit 75a9fa6

Browse files
authored
feat: addition of callback persistence (#59)
* feature: add callBackPersistence * update examples and links in README.md * lint fix * remove dependency on multistream
1 parent 47bcbdc commit 75a9fa6

File tree

9 files changed

+345
-29
lines changed

9 files changed

+345
-29
lines changed

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ A persistence needs to pass all tests defined in
7373
in the following manner:
7474

7575
```js
76-
var test = require('node:test')
77-
var myperst = require('./')
78-
var abs = require('aedes-cached-persistence/abstract')
76+
const test = require('node:test')
77+
const myperst = require('./')
78+
const abs = require('aedes-cached-persistence/abstract')
7979

8080
abs({
8181
test: test,
@@ -87,10 +87,10 @@ If you require some async stuff before returning, a callback is also
8787
supported:
8888

8989
```js
90-
var test = require('node:test')
91-
var myperst = require('./')
92-
var abs = require('aedes-persistence/abstract')
93-
var clean = require('./clean') // invented module
90+
const test = require('node:test')
91+
const myperst = require('./')
92+
const abs = require('aedes-persistence/abstract')
93+
const clean = require('./clean') // invented module
9494

9595
abs({
9696
test: test,
@@ -107,5 +107,5 @@ abs({
107107

108108
MIT
109109

110-
[aedes]: http://npm.im/aedes
111-
[aedes-persistence]: http://npm.im/aedes-persistence
110+
[aedes]: http://npmjs.com/package/aedes
111+
[aedes-persistence]: http://npmjs.com/package/aedes-persistence

callBackPersistence.js

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
'use strict'
2+
3+
/* This module provides a callback layer for async persistence implementations */
4+
const { Readable } = require('node:stream')
5+
const CachedPersistence = require('./index.js')
6+
7+
function toValue (obj, prop) {
8+
if (typeof obj === 'object' && obj !== null && prop in obj) {
9+
return obj[prop]
10+
}
11+
return obj
12+
}
13+
14+
class CallBackPersistence extends CachedPersistence {
15+
constructor (asyncInstanceFactory, opts = {}) {
16+
super(opts)
17+
this.asyncPersistence = asyncInstanceFactory(opts)
18+
}
19+
20+
_setup () {
21+
if (this.ready) {
22+
return
23+
}
24+
this.asyncPersistence.broker = this.broker
25+
this.asyncPersistence._trie = this._trie
26+
this.asyncPersistence.setup()
27+
.then(() => {
28+
this.emit('ready')
29+
})
30+
.catch(err => {
31+
this.emit('error', err)
32+
})
33+
}
34+
35+
storeRetained (packet, cb) {
36+
if (!this.ready) {
37+
this.once('ready', this.storeRetained.bind(this, packet, cb))
38+
return
39+
}
40+
this.asyncPersistence.storeRetained(packet).then(() => {
41+
cb(null)
42+
}).catch(cb)
43+
}
44+
45+
createRetainedStream (pattern) {
46+
return Readable.from(this.asyncPersistence.createRetainedStream(pattern))
47+
}
48+
49+
createRetainedStreamCombi (patterns) {
50+
return Readable.from(this.asyncPersistence.createRetainedStreamCombi(patterns))
51+
}
52+
53+
addSubscriptions (client, subs, cb) {
54+
if (!this.ready) {
55+
this.once('ready', this.addSubscriptions.bind(this, client, subs, cb))
56+
return
57+
}
58+
59+
const addSubs1 = this.asyncPersistence.addSubscriptions(client, subs)
60+
// promisify
61+
const addSubs2 = new Promise((resolve, reject) => {
62+
this._addedSubscriptions(client, subs, (err) => {
63+
if (err) {
64+
reject(err)
65+
} else {
66+
resolve()
67+
}
68+
})
69+
})
70+
Promise.all([addSubs1, addSubs2])
71+
.then(() => cb(null, client))
72+
.catch(err => cb(err, client))
73+
}
74+
75+
removeSubscriptions (client, subs, cb) {
76+
if (!this.ready) {
77+
this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb))
78+
return
79+
}
80+
81+
const remSubs1 = this.asyncPersistence.removeSubscriptions(client, subs)
82+
// promisify
83+
const mappedSubs = subs.map(sub => { return { topic: sub } })
84+
const remSubs2 = new Promise((resolve, reject) => {
85+
this._removedSubscriptions(client, mappedSubs, (err) => {
86+
if (err) {
87+
reject(err)
88+
} else {
89+
resolve()
90+
}
91+
})
92+
})
93+
Promise.all([remSubs1, remSubs2])
94+
.then(() => process.nextTick(cb, null, client))
95+
.catch(err => cb(err, client))
96+
}
97+
98+
subscriptionsByClient (client, cb) {
99+
if (!this.ready) {
100+
this.once('ready', this.subscriptionsByClient.bind(this, client, cb))
101+
return
102+
}
103+
104+
this.asyncPersistence.subscriptionsByClient(client)
105+
.then(results => {
106+
// promisified shim returns an object, true async only the resubs
107+
const resubs = results?.resubs || results
108+
process.nextTick(cb, null, resubs.length > 0 ? resubs : null, client)
109+
})
110+
.catch(cb)
111+
}
112+
113+
countOffline (cb) {
114+
this.asyncPersistence.countOffline()
115+
.then(res => process.nextTick(cb, null, res.subsCount, res.clientsCount))
116+
.catch(cb)
117+
}
118+
119+
destroy (cb = noop) {
120+
if (!this.ready) {
121+
this.once('ready', this.destroy.bind(this, cb))
122+
return
123+
}
124+
125+
if (this._destroyed) {
126+
throw new Error('destroyed called twice!')
127+
}
128+
129+
this._destroyed = true
130+
131+
this.asyncPersistence.destroy()
132+
.finally(cb) // swallow err in case of failure
133+
}
134+
135+
outgoingEnqueue (sub, packet, cb) {
136+
if (!this.ready) {
137+
this.once('ready', this.outgoingEnqueue.bind(this, sub, packet, cb))
138+
return
139+
}
140+
this.asyncPersistence.outgoingEnqueue(sub, packet)
141+
.then(() => process.nextTick(cb, null, packet))
142+
.catch(cb)
143+
}
144+
145+
outgoingEnqueueCombi (subs, packet, cb) {
146+
if (!this.ready) {
147+
this.once('ready', this.outgoingEnqueueCombi.bind(this, subs, packet, cb))
148+
return
149+
}
150+
this.asyncPersistence.outgoingEnqueueCombi(subs, packet)
151+
.then(() => process.nextTick(cb, null, packet))
152+
.catch(cb)
153+
}
154+
155+
outgoingStream (client) {
156+
return Readable.from(this.asyncPersistence.outgoingStream(client))
157+
}
158+
159+
outgoingUpdate (client, packet, cb) {
160+
if (!this.ready) {
161+
this.once('ready', this.outgoingUpdate.bind(this, client, packet, cb))
162+
return
163+
}
164+
this.asyncPersistence.outgoingUpdate(client, packet)
165+
.then(() => cb(null, client, packet))
166+
.catch(cb)
167+
}
168+
169+
outgoingClearMessageId (client, packet, cb) {
170+
if (!this.ready) {
171+
this.once('ready', this.outgoingClearMessageId.bind(this, client, packet, cb))
172+
return
173+
}
174+
this.asyncPersistence.outgoingClearMessageId(client, packet)
175+
.then((packet) => cb(null, packet))
176+
.catch(cb)
177+
}
178+
179+
incomingStorePacket (client, packet, cb) {
180+
if (!this.ready) {
181+
this.once('ready', this.incomingStorePacket.bind(this, client, packet, cb))
182+
return
183+
}
184+
this.asyncPersistence.incomingStorePacket(client, packet)
185+
.then(() => cb(null))
186+
.catch(cb)
187+
}
188+
189+
incomingGetPacket (client, packet, cb) {
190+
if (!this.ready) {
191+
this.once('ready', this.incomingGetPacket.bind(this, client, packet, cb))
192+
return
193+
}
194+
this.asyncPersistence.incomingGetPacket(client, packet)
195+
.then((packet) => cb(null, packet, client))
196+
.catch(cb)
197+
}
198+
199+
incomingDelPacket (client, packet, cb) {
200+
if (!this.ready) {
201+
this.once('ready', this.incomingDelPacket.bind(this, client, packet, cb))
202+
return
203+
}
204+
this.asyncPersistence.incomingDelPacket(client, packet)
205+
.then(() => cb(null))
206+
.catch(cb)
207+
}
208+
209+
putWill (client, packet, cb) {
210+
if (!this.ready) {
211+
this.once('ready', this.putWill.bind(this, client, packet, cb))
212+
return
213+
}
214+
this.asyncPersistence.putWill(client, packet)
215+
.then(() => cb(null, client))
216+
.catch(cb)
217+
}
218+
219+
getWill (client, cb) {
220+
this.asyncPersistence.getWill(client)
221+
.then((result) => {
222+
// promisified shim returns an object, true async only the resubs
223+
const packet = toValue(result, 'packet')
224+
cb(null, packet, client)
225+
})
226+
.catch(cb)
227+
}
228+
229+
delWill (client, cb) {
230+
this.asyncPersistence.delWill(client)
231+
.then(result => {
232+
// promisified shim returns an object, true async only the resubs
233+
const packet = toValue(result, 'packet')
234+
cb(null, packet, client)
235+
})
236+
.catch(cb)
237+
}
238+
239+
streamWill (brokers) {
240+
return Readable.from(this.asyncPersistence.streamWill(brokers))
241+
}
242+
243+
getClientList (topic) {
244+
return Readable.from(this.asyncPersistence.getClientList(topic))
245+
}
246+
}
247+
248+
function noop () {}
249+
250+
module.exports = { CallBackPersistence }

index.js

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const QlobberSub = require('qlobber/aedes/qlobber-sub')
22
const { Packet } = require('aedes-persistence')
3-
const MultiStream = require('multistream')
3+
const { Readable } = require('node:stream')
44
const parallel = require('fastparallel')
55
const { EventEmitter } = require('node:events')
66
const QlobberOpts = {
@@ -13,6 +13,14 @@ const newSubTopic = '$SYS/sub/add'
1313
const rmSubTopic = '$SYS/sub/rm'
1414
const subTopic = '$SYS/sub/+'
1515

16+
async function * multiStream (streams) {
17+
for (const stream of streams) {
18+
for await (const chunk of stream) {
19+
yield chunk
20+
}
21+
}
22+
}
23+
1624
class CachedPersistence extends EventEmitter {
1725
constructor (opts) {
1826
super()
@@ -167,10 +175,8 @@ class CachedPersistence extends EventEmitter {
167175
}
168176

169177
createRetainedStreamCombi (patterns) {
170-
const streams = patterns.map((p) => {
171-
return this.createRetainedStream(p)
172-
})
173-
return MultiStream.obj(streams)
178+
const streams = patterns.map(p => this.createRetainedStream(p))
179+
return Readable.from(multiStream(streams), { objectMode: true })
174180
}
175181

176182
destroy (cb) {

package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
"scripts": {
88
"lint": "eslint",
99
"lint:fix": "eslint --fix",
10-
"unit": "node --test test.js",
10+
"unit": "node --test test/*.js",
1111
"test": "npm run lint && npm run unit && tsd",
1212
"test:typescript": "tsd",
13-
"coverage": "c8 --reporter=lcov node --test test.js",
13+
"coverage": "c8 --reporter=lcov npm run unit",
14+
"coverage:report": "c8 report",
1415
"test:ci": "npm run lint && npm run coverage && npm run test:typescript",
1516
"license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause'",
1617
"release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics"
@@ -63,9 +64,8 @@
6364
"tsd": "^0.32.0"
6465
},
6566
"dependencies": {
66-
"aedes-persistence": "^10.0.2",
67+
"aedes-persistence": "^10.1.0",
6768
"fastparallel": "^2.4.1",
68-
"multistream": "^4.1.0",
6969
"qlobber": "^8.0.1"
7070
}
7171
}

promisified.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
const parent = require('aedes-persistence/promisified')
2+
3+
module.exports = parent

test.js renamed to test/helpers/testPersistence.js

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
const test = require('node:test')
2-
const CachedPersistence = require('./')
3-
const Memory = require('aedes-persistence')
4-
const abs = require('./abstract')
1+
'use strict'
2+
const CachedPersistence = require('../..')
53

6-
class MyPersistence extends CachedPersistence {
4+
class TestPersistence extends CachedPersistence {
75
constructor (opts) {
86
super(opts)
97
this.backend = opts.backend
@@ -19,6 +17,7 @@ class MyPersistence extends CachedPersistence {
1917
for (const key of methods) {
2018
this[key] = this.backend[key].bind(this.backend)
2119
}
20+
2221
// putWill is a special because it needs this.broker.id
2322
this.putWill = (client, packet, cb) => {
2423
this.backend.broker = this.broker
@@ -48,9 +47,4 @@ class MyPersistence extends CachedPersistence {
4847
}
4948
}
5049

51-
const persistence = () => new MyPersistence({ backend: Memory() })
52-
53-
abs({
54-
test,
55-
persistence
56-
})
50+
module.exports = { TestPersistence }

0 commit comments

Comments
 (0)