diff --git a/abstract.js b/abstract.js index 4fe72a4..c848887 100644 --- a/abstract.js +++ b/abstract.js @@ -1,162 +1,374 @@ -const assert = require('node:assert/strict') -const looseAssert = require('node:assert') const { Readable } = require('node:stream') const Packet = require('aedes-packet') -function abstractPersistence (opts) { - const test = opts.test - let _persistence = opts.persistence - const waitForReady = opts.waitForReady +// promisified versions of the instance methods +// to avoid deep callbacks while testing +function storeRetained (instance, packet) { + return new Promise((resolve, reject) => { + instance.storeRetained(packet, err => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} - // requiring it here so it will not error for modules - // not using the default emitter - const buildEmitter = opts.buildEmitter || require('mqemitter') +async function addSubscriptions (instance, client, subs) { + return new Promise((resolve, reject) => { + instance.addSubscriptions(client, subs, (err, reClient) => { + if (err) { + reject(err) + } else { + resolve(reClient) + } + }) + }) +} - if (_persistence.length === 0) { - _persistence = function asyncify (cb) { - cb(null, opts.persistence()) - } - } +async function removeSubscriptions (instance, client, subs) { + return new Promise((resolve, reject) => { + instance.removeSubscriptions(client, subs, (err, reClient) => { + if (err) { + reject(err) + } else { + resolve(reClient) + } + }) + }) +} - function persistence (cb) { - const mq = buildEmitter() - const broker = { - id: 'broker-42', - mq, - publish: mq.emit.bind(mq), - subscribe: mq.on.bind(mq), - unsubscribe: mq.removeListener.bind(mq), - counter: 0 - } +async function subscriptionsByClient (instance, client) { + return new Promise((resolve, reject) => { + instance.subscriptionsByClient(client, (err, resubs, reClient) => { + if (err) { + reject(err) + } else { + resolve({ resubs, reClient }) + } + }) + }) +} - _persistence((err, instance) => { - if (instance) { - // Wait for ready event, if applicable, to ensure the persistence isn't - // destroyed while it's still being set up. - // https://github.com/mcollina/aedes-persistence-redis/issues/41 - if (waitForReady) { - // We have to listen to 'ready' before setting broker because that - // can result in 'ready' being emitted. - instance.on('ready', () => { - instance.removeListener('error', cb) - cb(null, instance) - }) - instance.on('error', cb) - } - instance.broker = broker - if (waitForReady) { - // 'ready' event will call back. - return - } +async function subscriptionsByTopic (instance, topic) { + return new Promise((resolve, reject) => { + instance.subscriptionsByTopic(topic, (err, resubs) => { + if (err) { + reject(err) + } else { + resolve(resubs) } - cb(err, instance) }) - } + }) +} - // legacy third party streams are typically not iterable - function iterableStream (stream) { - if (typeof stream[Symbol.asyncIterator] !== 'function') { - return new Readable({ objectMode: true }).wrap(stream) - } - return stream - } - // end of legacy third party streams support +async function cleanSubscriptions (instance, client) { + return new Promise((resolve, reject) => { + instance.cleanSubscriptions(client, (err) => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} - async function getArrayFromStream (stream) { - const list = [] - for await (const item of iterableStream(stream)) { - list.push(item) - } - return list - } +async function countOffline (instance) { + return new Promise((resolve, reject) => { + instance.countOffline((err, subsCount, clientsCount) => { + if (err) { + reject(err) + } else { + resolve({ subsCount, clientsCount }) + } + }) + }) +} - async function streamForEach (stream, fn) { - for await (const item of iterableStream(stream)) { - await fn(item) - } - } +async function outgoingEnqueue (instance, sub, packet) { + return new Promise((resolve, reject) => { + instance.outgoingEnqueue(sub, packet, err => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} - function storeRetained (instance, opts, cb) { - opts = opts || {} +async function outgoingEnqueueCombi (instance, subs, packet) { + return new Promise((resolve, reject) => { + instance.outgoingEnqueueCombi(subs, packet, err => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} - const packet = { - cmd: 'publish', - id: instance.broker.id, - topic: opts.topic || 'hello/world', - payload: opts.payload || Buffer.from('muahah'), - qos: 0, - retain: true - } +async function outgoingClearMessageId (instance, client, packet) { + return new Promise((resolve, reject) => { + instance.outgoingClearMessageId(client, packet, (err, repacket) => { + if (err) { + reject(err) + } else { + resolve(repacket) + } + }) + }) +} - instance.storeRetained(packet, err => { - cb(err, packet) +async function outgoingUpdate (instance, client, packet) { + return new Promise((resolve, reject) => { + instance.outgoingUpdate(client, packet, (err, reclient, repacket) => { + if (err) { + reject(err) + } else { + resolve({ reclient, repacket }) + } }) - } + }) +} - function matchRetainedWithPattern (t, pattern, opts) { - persistence((err, instance) => { - if (err) { throw err } - - storeRetained(instance, opts, (err, packet) => { - assert.ok(!err, 'no error') - let stream - if (Array.isArray(pattern)) { - stream = instance.createRetainedStreamCombi(pattern) - } else { - stream = instance.createRetainedStream(pattern) - } +async function incomingStorePacket (instance, client, packet) { + return new Promise((resolve, reject) => { + instance.incomingStorePacket(client, packet, err => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) +} +async function incomingGetPacket (instance, client, packet) { + return new Promise((resolve, reject) => { + instance.incomingGetPacket(client, packet, (err, retrieved) => { + if (err) { + reject(err) + } else { + resolve(retrieved) + } + }) + }) +} - getArrayFromStream(stream).then(list => { - assert.deepEqual(list, [packet], 'must return the packet') - instance.destroy() - }) - }) +async function incomingDelPacket (instance, client, packet) { + return new Promise((resolve, reject) => { + instance.incomingDelPacket(client, packet, err => { + if (err) { + reject(err) + } else { + resolve() + } }) - } + }) +} - function testInstance (title, cb) { - test(title, t => { - persistence((err, instance) => { - if (err) { throw err } - cb(t, instance) - }) +async function putWill (instance, client, packet) { + return new Promise((resolve, reject) => { + instance.putWill(client, packet, (err, reClient) => { + if (err) { + reject(err) + } else { + resolve(reClient) + } }) + }) +} + +async function getWill (instance, client) { + return new Promise((resolve, reject) => { + instance.getWill(client, (err, packet, reClient) => { + if (err) { + reject(err) + } else { + resolve({ packet, reClient }) + } + }) + }) +} + +async function delWill (instance, client) { + return new Promise((resolve, reject) => { + instance.delWill(client, (err, packet, reClient) => { + if (err) { + reject(err) + } else { + resolve({ packet, reClient }) + } + }) + }) +} +// end of promisified versions of instance methods + +// helper functions +function waitForEvent (obj, resolveEvt) { + return new Promise((resolve, reject) => { + obj.once(resolveEvt, () => { + resolve() + }) + obj.once('error', reject) + }) +} + +async function doCleanup (t, instance) { + const instanceDestroy = new Promise((resolve, reject) => { + instance.destroy((err) => { + if (err) { + reject(err) + return + } + resolve() + }) + }) + await instanceDestroy + t.diagnostic('instance cleaned up') +} + +// legacy third party streams are typically not iterable +function iterableStream (stream) { + if (typeof stream[Symbol.asyncIterator] !== 'function') { + return new Readable({ objectMode: true }).wrap(stream) } + return stream +} +// end of legacy third party streams support + +function outgoingStream (instance, client) { + return iterableStream(instance.outgoingStream(client)) +} - function testPacket (t, packet, expected) { - if (packet.messageId === null) packet.messageId = undefined - assert.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') - // deepLooseEqual? - looseAssert.deepEqual(structuredClone(packet), expected, 'must return the packet') +async function getArrayFromStream (stream) { + const list = [] + for await (const item of iterableStream(stream)) { + list.push(item) } + return list +} - function deClassed (obj) { - return Object.assign({}, obj) +async function storeRetainedPacket (instance, opts = {}) { + const packet = { + cmd: 'publish', + id: instance.broker.id, + topic: opts.topic || 'hello/world', + payload: opts.payload || Buffer.from('muahah'), + qos: 0, + retain: true } + await storeRetained(instance, packet) + return packet +} + +async function enqueueAndUpdate (t, instance, client, sub, packet, messageId) { + await outgoingEnqueueCombi(instance, [sub], packet) + const updated = new Packet(packet) + updated.messageId = messageId + + const { reclient, repacket } = await outgoingUpdate(instance, client, updated) + t.assert.equal(reclient, client, 'client matches') + t.assert.equal(repacket, updated, 'packet matches') + return repacket +} - test('store and look up retained messages', t => { - matchRetainedWithPattern(t, 'hello/world') +function testPacket (t, packet, expected) { + if (packet.messageId === null) packet.messageId = undefined + t.assert.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') + // deepLooseEqual? + t.assert.deepEqual(structuredClone(packet), expected, 'must return the packet') +} + +function deClassed (obj) { + return Object.assign({}, obj) +} + +// start of abstractPersistence +function abstractPersistence (opts) { + const test = opts.test + const _persistence = opts.persistence + const waitForReady = opts.waitForReady + + // requiring it here so it will not error for modules + // not using the default emitter + const buildEmitter = opts.buildEmitter || require('mqemitter') + + async function persistence (t) { + const mq = buildEmitter() + const broker = { + id: 'broker-42', + mq, + publish: mq.emit.bind(mq), + subscribe: mq.on.bind(mq), + unsubscribe: mq.removeListener.bind(mq), + counter: 0 + } + + const instance = await _persistence() + if (instance) { + // Wait for ready event, if applicable, to ensure the persistence isn't + // destroyed while it's still being set up. + // https://github.com/mcollina/aedes-persistence-redis/issues/41 + if (waitForReady) { + await waitForEvent(instance, 'ready') + } + instance.broker = broker + t.diagnostic('instance created') + return instance + } + throw new Error('no instance') + } + + async function matchRetainedWithPattern (t, pattern) { + const instance = await persistence(t) + const packet = await storeRetainedPacket(instance) + let stream + if (Array.isArray(pattern)) { + stream = instance.createRetainedStreamCombi(pattern) + } else { + stream = instance.createRetainedStream(pattern) + } + t.diagnostic('created stream') + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [packet], 'must return the packet') + t.diagnostic('stream was ok') + await doCleanup(t, instance) + } + + // testing starts here + test('store and look up retained messages', async t => { + t.plan(1) + await matchRetainedWithPattern(t, 'hello/world') }) - test('look up retained messages with a # pattern', t => { - matchRetainedWithPattern(t, '#') + test('look up retained messages with a # pattern', async t => { + t.plan(1) + await matchRetainedWithPattern(t, '#') }) - test('look up retained messages with a hello/world/# pattern', t => { - matchRetainedWithPattern(t, 'hello/world/#') + test('look up retained messages with a hello/world/# pattern', async t => { + t.plan(1) + await matchRetainedWithPattern(t, 'hello/world/#') }) - test('look up retained messages with a + pattern', t => { - matchRetainedWithPattern(t, 'hello/+') + test('look up retained messages with a + pattern', async t => { + t.plan(1) + await matchRetainedWithPattern(t, 'hello/+') }) - test('look up retained messages with multiple patterns', t => { - matchRetainedWithPattern(t, ['hello/+', 'other/hello']) + test('look up retained messages with multiple patterns', async t => { + t.plan(1) + await matchRetainedWithPattern(t, ['hello/+', 'other/hello']) }) - testInstance('store multiple retained messages in order', (t, instance) => { + test('store multiple retained messages in order', async (t) => { + t.plan(1000) + const instance = await persistence(t) const totalMessages = 1000 - let done = 0 const retained = { cmd: 'publish', @@ -166,59 +378,43 @@ function abstractPersistence (opts) { retain: true } - function checkIndex (index) { - const packet = new Packet(retained, instance.broker) - - instance.storeRetained(packet, err => { - assert.ok(!err, 'no error') - assert.equal(packet.brokerCounter, index + 1, 'packet stored in order') - if (++done === totalMessages) { - instance.destroy() - } - }) - } - for (let i = 0; i < totalMessages; i++) { - checkIndex(i) + const packet = new Packet(retained, instance.broker) + await storeRetainedPacket(instance, packet) + t.assert.equal(packet.brokerCounter, i + 1, 'packet stored in order') } + await doCleanup(t, instance) }) - testInstance('remove retained message', (t, instance) => { - storeRetained(instance, {}, (err, packet) => { - assert.ok(!err, 'no error') - storeRetained(instance, { - payload: Buffer.alloc(0) - }, err => { - assert.ok(!err, 'no error') - - const stream = instance.createRetainedStream('#') - getArrayFromStream(stream).then(list => { - assert.deepEqual(list, [], 'must return an empty list') - instance.destroy() - }) - }) + test('remove retained message', async (t) => { + t.plan(1) + const instance = await persistence(t) + await storeRetainedPacket(instance, {}) + await storeRetainedPacket(instance, { + payload: Buffer.alloc(0) }) + const stream = instance.createRetainedStream('#') + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [], 'must return an empty list') + await doCleanup(t, instance) }) - testInstance('storing twice a retained message should keep only the last', (t, instance) => { - storeRetained(instance, {}, (err, packet) => { - assert.ok(!err, 'no error') - storeRetained(instance, { - payload: Buffer.from('ahah') - }, (err, packet) => { - assert.ok(!err, 'no error') - - const stream = instance.createRetainedStream('#') - - getArrayFromStream(stream).then(list => { - assert.deepEqual(list, [packet], 'must return the last packet') - instance.destroy() - }) - }) + test('storing twice a retained message should keep only the last', async (t) => { + t.plan(1) + const instance = await persistence(t) + await storeRetainedPacket(instance, {}) + const packet = await storeRetainedPacket(instance, { + payload: Buffer.from('ahah') }) + const stream = instance.createRetainedStream('#') + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [packet], 'must return the last packet') + await doCleanup(t, instance) }) - testInstance('Create a new packet while storing a retained message', (t, instance) => { + test('Create a new packet while storing a retained message', async (t) => { + t.plan(1) + const instance = await persistence(t) const packet = { cmd: 'publish', id: instance.broker.id, @@ -229,20 +425,18 @@ function abstractPersistence (opts) { } const newPacket = Object.assign({}, packet) - instance.storeRetained(packet, err => { - assert.ok(!err, 'no error') - // packet reference change to check if a new packet is stored always - packet.retain = false - const stream = instance.createRetainedStream('#') - - getArrayFromStream(stream).then(list => { - assert.deepEqual(list, [newPacket], 'must return the last packet') - instance.destroy() - }) - }) + await storeRetained(instance, packet) + // packet reference change to check if a new packet is stored always + packet.retain = false + const stream = instance.createRetainedStream('#') + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [newPacket], 'must return the last packet') + await doCleanup(t, instance) }) - testInstance('store and look up subscriptions by client', (t, instance) => { + test('store and look up subscriptions by client', async (t) => { + t.plan(3) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -264,19 +458,17 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ok(!err, 'no error') - instance.subscriptionsByClient(client, (err, resubs, reReClient) => { - assert.equal(reReClient, client, 'client must be the same') - assert.ok(!err, 'no error') - assert.deepEqual(resubs, subs) - instance.destroy() - }) - }) + const reClient = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient, client, 'client must be the same') + const { resubs, reClient: reClient2 } = await subscriptionsByClient(instance, client) + t.assert.equal(reClient2, client, 'client must be the same') + t.assert.deepEqual(resubs, subs) + await doCleanup(t, instance) }) - testInstance('remove subscriptions by client', (t, instance) => { + test('remove subscriptions by client', async (t) => { + t.plan(4) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -292,28 +484,25 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.ok(!err, 'no error') - instance.removeSubscriptions(client, ['hello'], (err, reClient) => { - assert.ok(!err, 'no error') - assert.equal(reClient, client, 'client must be the same') - instance.subscriptionsByClient(client, (err, resubs, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ok(!err, 'no error') - assert.deepEqual(resubs, [{ - topic: 'matteo', - qos: 1, - rh: 0, - rap: true, - nl: false - }]) - instance.destroy() - }) - }) - }) + const reclient1 = await addSubscriptions(instance, client, subs) + t.assert.equal(reclient1, client, 'client must be the same') + const reClient2 = await removeSubscriptions(instance, client, ['hello']) + t.assert.equal(reClient2, client, 'client must be the same') + const { resubs, reClient } = await subscriptionsByClient(instance, client) + t.assert.equal(reClient, client, 'client must be the same') + t.assert.deepEqual(resubs, [{ + topic: 'matteo', + qos: 1, + rh: 0, + rap: true, + nl: false + }]) + await doCleanup(t, instance) }) - testInstance('store and look up subscriptions by topic', (t, instance) => { + test('store and look up subscriptions by topic', async (t) => { + t.plan(2) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -335,31 +524,30 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, err => { - assert.ok(!err, 'no error') - instance.subscriptionsByTopic('hello', (err, resubs) => { - assert.ok(!err, 'no error') - assert.deepEqual(resubs, [{ - clientId: client.id, - topic: 'hello/#', - qos: 1, - rh: 0, - rap: true, - nl: false - }, { - clientId: client.id, - topic: 'hello', - qos: 1, - rh: 0, - rap: true, - nl: false - }]) - instance.destroy() - }) - }) + const reclient = await addSubscriptions(instance, client, subs) + t.assert.equal(reclient, client, 'client must be the same') + const resubs = await subscriptionsByTopic(instance, 'hello') + t.assert.deepEqual(resubs, [{ + clientId: client.id, + topic: 'hello/#', + qos: 1, + rh: 0, + rap: true, + nl: false + }, { + clientId: client.id, + topic: 'hello', + qos: 1, + rh: 0, + rap: true, + nl: false + }]) + await doCleanup(t, instance) }) - testInstance('get client list after subscriptions', (t, instance) => { + test('get client list after subscriptions', async (t) => { + t.plan(1) + const instance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'efghi' } const subs = [{ @@ -367,68 +555,52 @@ function abstractPersistence (opts) { qos: 1 }] - instance.addSubscriptions(client1, subs, err => { - assert.ok(!err, 'no error for client 1') - instance.addSubscriptions(client2, subs, err => { - assert.ok(!err, 'no error for client 2') - const stream = instance.getClientList(subs[0].topic) - getArrayFromStream(stream).then(out => { - assert.deepEqual(out, [client1.id, client2.id]) - instance.destroy() - }) - }) - }) + await addSubscriptions(instance, client1, subs) + await addSubscriptions(instance, client2, subs) + const stream = instance.getClientList(subs[0].topic) + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [client1.id, client2.id]) + await doCleanup(t, instance) }) - testInstance('get client list after an unsubscribe', (t, instance) => { + test('get client list after an unsubscribe', async (t) => { + t.plan(1) + const instance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'efghi' } const subs = [{ topic: 'helloagain', qos: 1 }] - - instance.addSubscriptions(client1, subs, err => { - assert.ok(!err, 'no error for client 1') - instance.addSubscriptions(client2, subs, err => { - assert.ok(!err, 'no error for client 2') - instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => { - assert.ok(!err, 'no error for removeSubscriptions') - const stream = instance.getClientList(subs[0].topic) - getArrayFromStream(stream).then(out => { - assert.deepEqual(out, [client1.id]) - instance.destroy() - }) - }) - }) - }) + await addSubscriptions(instance, client1, subs) + await addSubscriptions(instance, client2, subs) + await removeSubscriptions(instance, client2, [subs[0].topic]) + const stream = instance.getClientList(subs[0].topic) + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [client1.id]) + await doCleanup(t, instance) }) - testInstance('get subscriptions list after an unsubscribe', (t, instance) => { + test('get subscriptions list after an unsubscribe', async (t) => { + t.plan(1) + const instance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'efghi' } const subs = [{ topic: 'helloagain', qos: 1 }] - - instance.addSubscriptions(client1, subs, err => { - assert.ok(!err, 'no error for client 1') - instance.addSubscriptions(client2, subs, err => { - assert.ok(!err, 'no error for client 2') - instance.removeSubscriptions(client2, [subs[0].topic], (err, reClient) => { - assert.ok(!err, 'no error for removeSubscriptions') - instance.subscriptionsByTopic(subs[0].topic, (err, clients) => { - assert.ok(!err, 'no error getting subscriptions by topic') - assert.deepEqual(clients[0].clientId, client1.id) - instance.destroy() - }) - }) - }) - }) + await addSubscriptions(instance, client1, subs) + await addSubscriptions(instance, client2, subs) + await removeSubscriptions(instance, client2, [subs[0].topic]) + const clients = await subscriptionsByTopic(instance, subs[0].topic) + t.assert.deepEqual(clients[0].clientId, client1.id) + await doCleanup(t, instance) }) - testInstance('QoS 0 subscriptions, restored but not matched', (t, instance) => { + test('QoS 0 subscriptions, restored but not matched', async (t) => { + t.plan(2) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -450,28 +622,24 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, err => { - assert.ok(!err, 'no error') - instance.subscriptionsByClient(client, (err, resubs) => { - assert.ok(!err, 'no error') - assert.deepEqual(resubs, subs) - instance.subscriptionsByTopic('hello', (err, resubs2) => { - assert.ok(!err, 'no error') - assert.deepEqual(resubs2, [{ - clientId: client.id, - topic: 'hello/#', - qos: 1, - rh: 0, - rap: true, - nl: false - }]) - instance.destroy() - }) - }) - }) + await addSubscriptions(instance, client, subs) + const { resubs } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(resubs, subs) + const resubs2 = await subscriptionsByTopic(instance, 'hello') + t.assert.deepEqual(resubs2, [{ + clientId: client.id, + topic: 'hello/#', + qos: 1, + rh: 0, + rap: true, + nl: false + }]) + await doCleanup(t, instance) }) - testInstance('clean subscriptions', (t, instance) => { + test('clean subscriptions', async (t) => { + t.plan(4) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -481,57 +649,37 @@ function abstractPersistence (opts) { qos: 1 }] - instance.addSubscriptions(client, subs, err => { - assert.ok(!err, 'no error') - instance.cleanSubscriptions(client, err => { - assert.ok(!err, 'no error') - instance.subscriptionsByTopic('hello', (err, resubs) => { - assert.ok(!err, 'no error') - assert.deepEqual(resubs, [], 'no subscriptions') - - instance.subscriptionsByClient(client, (err, resubs) => { - assert.ifError(err) - assert.deepEqual(resubs, null, 'no subscriptions') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 0, 'no subscriptions added') - assert.equal(clientsCount, 0, 'no clients added') - - instance.destroy() - }) - }) - }) - }) - }) + await addSubscriptions(instance, client, subs) + await cleanSubscriptions(instance, client) + const resubs = await subscriptionsByTopic(instance, 'hello') + t.assert.deepEqual(resubs, [], 'no subscriptions') + const { resubs: resubs2 } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(resubs2, null, 'no subscriptions') + const { subsCount, clientsCount } = await countOffline(instance) + t.assert.equal(subsCount, 0, 'no subscriptions added') + t.assert.equal(clientsCount, 0, 'no clients added') + await doCleanup(t, instance) }) - testInstance('clean subscriptions with no active subscriptions', (t, instance) => { + test('clean subscriptions with no active subscriptions', async (t) => { + t.plan(4) + const instance = await persistence(t) const client = { id: 'abcde' } - instance.cleanSubscriptions(client, err => { - assert.ok(!err, 'no error') - instance.subscriptionsByTopic('hello', (err, resubs) => { - assert.ok(!err, 'no error') - assert.deepEqual(resubs, [], 'no subscriptions') - - instance.subscriptionsByClient(client, (err, resubs) => { - assert.ifError(err) - assert.deepEqual(resubs, null, 'no subscriptions') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 0, 'no subscriptions added') - assert.equal(clientsCount, 0, 'no clients added') - - instance.destroy() - }) - }) - }) - }) + await cleanSubscriptions(instance, client) + const resubs = await subscriptionsByTopic(instance, 'hello') + t.assert.deepEqual(resubs, [], 'no subscriptions') + const { resubs: resubs2 } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(resubs2, null, 'no subscriptions') + const { subsCount, clientsCount } = await countOffline(instance) + t.assert.equal(subsCount, 0, 'no subscriptions added') + t.assert.equal(clientsCount, 0, 'no clients added') + await doCleanup(t, instance) }) - testInstance('same topic, different QoS', (t, instance) => { + test('same topic, different QoS', async (t) => { + t.plan(5) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -547,89 +695,69 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ifError(err, 'no error') - - instance.subscriptionsByClient(client, (err, subsForClient, client) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForClient, [{ - topic: 'hello', - qos: 1, - rh: 0, - rap: true, - nl: false - }]) - - instance.subscriptionsByTopic('hello', (err, subsForTopic) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForTopic, [{ - clientId: 'abcde', - topic: 'hello', - qos: 1, - rh: 0, - rap: true, - nl: false - }]) - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 1, 'one subscription added') - assert.equal(clientsCount, 1, 'one client added') - - instance.destroy() - }) - }) - }) - }) + const reClient = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient, client, 'client must be the same') + const { resubs } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(resubs, [{ + topic: 'hello', + qos: 1, + rh: 0, + rap: true, + nl: false + }]) + + const resubs2 = await subscriptionsByTopic(instance, 'hello') + t.assert.deepEqual(resubs2, [{ + clientId: 'abcde', + topic: 'hello', + qos: 1, + rh: 0, + rap: true, + nl: false + }]) + + const { subsCount, clientsCount } = await countOffline(instance) + t.assert.equal(subsCount, 1, 'one subscription added') + t.assert.equal(clientsCount, 1, 'one client added') + await doCleanup(t, instance) }) - testInstance('replace subscriptions', (t, instance) => { + test('replace subscriptions', async (t) => { + t.plan(25) + const instance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const sub = { topic, rh: 0, rap: true, nl: false } const subByTopic = { clientId: client.id, topic, rh: 0, rap: true, nl: false } - function check (qos, cb) { + async function check (qos) { sub.qos = subByTopic.qos = qos - instance.addSubscriptions(client, [sub], (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ifError(err, 'no error') - instance.subscriptionsByClient(client, (err, subsForClient, client) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForClient, [sub]) - instance.subscriptionsByTopic(topic, (err, subsForTopic) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic]) - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - if (qos === 0) { - assert.equal(subsCount, 0, 'no subscriptions added') - } else { - assert.equal(subsCount, 1, 'one subscription added') - } - assert.equal(clientsCount, 1, 'one client added') - cb() - }) - }) - }) - }) + const reClient = await addSubscriptions(instance, client, [sub]) + t.assert.equal(reClient, client, 'client must be the same') + const { resubs } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(resubs, [sub]) + const subsForTopic = await subscriptionsByTopic(instance, topic) + t.assert.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic]) + const { subsCount, clientsCount } = await countOffline(instance) + if (qos === 0) { + t.assert.equal(subsCount, 0, 'no subscriptions added') + } else { + t.assert.equal(subsCount, 1, 'one subscription added') + } + t.assert.equal(clientsCount, 1, 'one client added') } - check(0, () => { - check(1, () => { - check(2, () => { - check(1, () => { - check(0, () => { - instance.destroy() - }) - }) - }) - }) - }) + await check(0) + await check(1) + await check(2) + await check(1) + await check(0) + await doCleanup(t, instance) }) - testInstance('replace subscriptions in same call', (t, instance) => { + test('replace subscriptions in same call', async (t) => { + t.plan(5) + const instance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const subs = [ @@ -639,27 +767,21 @@ function abstractPersistence (opts) { { topic, qos: 1, rh: 0, rap: true, nl: false }, { topic, qos: 0, rh: 0, rap: true, nl: false } ] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ifError(err, 'no error') - instance.subscriptionsByClient(client, (err, subsForClient, client) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForClient, [{ topic, qos: 0, rh: 0, rap: true, nl: false }]) - instance.subscriptionsByTopic(topic, (err, subsForTopic) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForTopic, []) - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 0, 'no subscriptions added') - assert.equal(clientsCount, 1, 'one client added') - instance.destroy() - }) - }) - }) - }) + const reClient = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient, client, 'client must be the same') + const { resubs: subsForClient } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(subsForClient, [{ topic, qos: 0, rh: 0, rap: true, nl: false }]) + const subsForTopic = await subscriptionsByTopic(instance, topic) + t.assert.deepEqual(subsForTopic, []) + const { subsCount, clientsCount } = await countOffline(instance) + t.assert.equal(subsCount, 0, 'no subscriptions added') + t.assert.equal(clientsCount, 1, 'one client added') + await doCleanup(t, instance) }) - testInstance('store and count subscriptions', (t, instance) => { + test('store and count subscriptions', async (t) => { + t.plan(11) + const instance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -672,61 +794,33 @@ function abstractPersistence (opts) { qos: 0 }] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ifError(err, 'no error') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 2, 'two subscriptions added') - assert.equal(clientsCount, 1, 'one client added') - - instance.removeSubscriptions(client, ['hello'], (err, reClient) => { - assert.ifError(err, 'no error') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 1, 'one subscription added') - assert.equal(clientsCount, 1, 'one client added') - - instance.removeSubscriptions(client, ['matteo'], (err, reClient) => { - assert.ifError(err, 'no error') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 0, 'zero subscriptions added') - assert.equal(clientsCount, 1, 'one client added') - - instance.removeSubscriptions(client, ['noqos'], (err, reClient) => { - assert.ifError(err, 'no error') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 0, 'zero subscriptions added') - assert.equal(clientsCount, 0, 'zero clients added') - - instance.removeSubscriptions(client, ['noqos'], (err, reClient) => { - assert.ifError(err, 'no error') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, 0, 'zero subscriptions added') - assert.equal(clientsCount, 0, 'zero clients added') - - instance.destroy() - }) - }) - }) - }) - }) - }) - }) - }) - }) - }) + const reclient = await addSubscriptions(instance, client, subs) + t.assert.equal(reclient, client, 'client must be the same') + const { subsCount, clientsCount } = await countOffline(instance) + t.assert.equal(subsCount, 2, 'two subscriptions added') + t.assert.equal(clientsCount, 1, 'one client added') + await removeSubscriptions(instance, client, ['hello']) + const { subsCount: subsCount2, clientsCount: clientsCount2 } = await countOffline(instance) + t.assert.equal(subsCount2, 1, 'one subscription added') + t.assert.equal(clientsCount2, 1, 'one client added') + await removeSubscriptions(instance, client, ['matteo']) + const { subsCount: subsCount3, clientsCount: clientsCount3 } = await countOffline(instance) + t.assert.equal(subsCount3, 0, 'zero subscriptions added') + t.assert.equal(clientsCount3, 1, 'one client added') + await removeSubscriptions(instance, client, ['noqos']) + const { subsCount: subsCount4, clientsCount: clientsCount4 } = await countOffline(instance) + t.assert.equal(subsCount4, 0, 'zero subscriptions added') + t.assert.equal(clientsCount4, 0, 'zero clients added') + await removeSubscriptions(instance, client, ['noqos']) + const { subsCount: subsCount5, clientsCount: clientsCount5 } = await countOffline(instance) + t.assert.equal(subsCount5, 0, 'zero subscriptions added') + t.assert.equal(clientsCount5, 0, 'zero clients added') + await doCleanup(t, instance) }) - testInstance('count subscriptions with two clients', (t, instance) => { + test('count subscriptions with two clients', async (t) => { + t.plan(26) + const instance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'fghij' } const subs = [{ @@ -740,51 +834,32 @@ function abstractPersistence (opts) { qos: 0 }] - function remove (client, subs, expectedSubs, expectedClients, cb) { - instance.removeSubscriptions(client, subs, (err, reClient) => { - assert.ifError(err, 'no error') - assert.equal(reClient, client, 'client must be the same') - - instance.countOffline((err, subsCount, clientsCount) => { - assert.ifError(err, 'no error') - assert.equal(subsCount, expectedSubs, 'subscriptions added') - assert.equal(clientsCount, expectedClients, 'clients added') - - cb() - }) - }) - } - - instance.addSubscriptions(client1, subs, (err, reClient) => { - assert.equal(reClient, client1, 'client must be the same') - assert.ifError(err, 'no error') - - instance.addSubscriptions(client2, subs, (err, reClient) => { - assert.equal(reClient, client2, 'client must be the same') - assert.ifError(err, 'no error') - - remove(client1, ['foobar'], 4, 2, () => { - remove(client1, ['hello'], 3, 2, () => { - remove(client1, ['hello'], 3, 2, () => { - remove(client1, ['matteo'], 2, 2, () => { - remove(client1, ['noqos'], 2, 1, () => { - remove(client2, ['hello'], 1, 1, () => { - remove(client2, ['matteo'], 0, 1, () => { - remove(client2, ['noqos'], 0, 0, () => { - instance.destroy() - }) - }) - }) - }) - }) - }) - }) - }) - }) - }) + async function remove (client, subs, expectedSubs, expectedClients) { + const reClient = await removeSubscriptions(instance, client, subs) + t.assert.equal(reClient, client, 'client must be the same') + const { subsCount, clientsCount } = await countOffline(instance) + t.assert.equal(subsCount, expectedSubs, 'subscriptions added') + t.assert.equal(clientsCount, expectedClients, 'clients added') + } + + const reClient1 = await addSubscriptions(instance, client1, subs) + t.assert.equal(reClient1, client1, 'client must be the same') + const reClient2 = await addSubscriptions(instance, client2, subs) + t.assert.equal(reClient2, client2, 'client must be the same') + await remove(client1, ['foobar'], 4, 2) + await remove(client1, ['hello'], 3, 2) + await remove(client1, ['hello'], 3, 2) + await remove(client1, ['matteo'], 2, 2) + await remove(client1, ['noqos'], 2, 1) + await remove(client2, ['hello'], 1, 1) + await remove(client2, ['matteo'], 0, 1) + await remove(client2, ['noqos'], 0, 0) + await doCleanup(t, instance) }) - testInstance('add duplicate subs to persistence for qos > 0', (t, instance) => { + test('add duplicate subs to persistence for qos > 0', async (t) => { + t.plan(3) + const instance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const subs = [{ @@ -795,24 +870,19 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ifError(err, 'no error') - - instance.addSubscriptions(client, subs, (err, resCLient) => { - assert.equal(resCLient, client, 'client must be the same') - assert.ifError(err, 'no error') - subs[0].clientId = client.id - instance.subscriptionsByTopic(topic, (err, subsForTopic) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForTopic, subs) - instance.destroy() - }) - }) - }) + const reClient = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient, client, 'client must be the same') + const reClient2 = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient2, client, 'client must be the same') + subs[0].clientId = client.id + const subsForTopic = await subscriptionsByTopic(instance, topic) + t.assert.deepEqual(subsForTopic, subs) + await doCleanup(t, instance) }) - testInstance('add duplicate subs to persistence for qos 0', (t, instance) => { + test('add duplicate subs to persistence for qos 0', async (t) => { + t.plan(3) + const instance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const subs = [{ @@ -823,23 +893,18 @@ function abstractPersistence (opts) { nl: false }] - instance.addSubscriptions(client, subs, (err, reClient) => { - assert.equal(reClient, client, 'client must be the same') - assert.ifError(err, 'no error') - - instance.addSubscriptions(client, subs, (err, resCLient) => { - assert.equal(resCLient, client, 'client must be the same') - assert.ifError(err, 'no error') - instance.subscriptionsByClient(client, (err, subsForClient, client) => { - assert.ifError(err, 'no error') - assert.deepEqual(subsForClient, subs) - instance.destroy() - }) - }) - }) + const reClient = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient, client, 'client must be the same') + const reClient2 = await addSubscriptions(instance, client, subs) + t.assert.equal(reClient2, client, 'client must be the same') + const { resubs: subsForClient } = await subscriptionsByClient(instance, client) + t.assert.deepEqual(subsForClient, subs) + await doCleanup(t, instance) }) - testInstance('get topic list after concurrent subscriptions of a client', (t, instance) => { + test('get topic list after concurrent subscriptions of a client', async (t) => { + t.plan(3) + const instance = await persistence(t) const client = { id: 'abcde' } const subs1 = [{ topic: 'hello1', @@ -857,28 +922,31 @@ function abstractPersistence (opts) { }] let calls = 2 - function done () { - if (!--calls) { - instance.subscriptionsByClient(client, (err, resubs) => { - assert.ok(!err, 'no error') - resubs.sort((a, b) => b.topic.localeCompare(b.topic, 'en')) - assert.deepEqual(resubs, [subs1[0], subs2[0]]) - instance.destroy() - }) + await new Promise((resolve, reject) => { + async function done () { + if (!--calls) { + const { resubs } = await subscriptionsByClient(instance, client) + resubs.sort((a, b) => a.topic.localeCompare(b.topic, 'en')) + t.assert.deepEqual(resubs, [subs1[0], subs2[0]]) + await doCleanup(t, instance) + resolve() + } } - } - instance.addSubscriptions(client, subs1, err => { - assert.ok(!err, 'no error for hello1') - done() - }) - instance.addSubscriptions(client, subs2, err => { - assert.ok(!err, 'no error for hello2') - done() + instance.addSubscriptions(client, subs1, err => { + t.assert.ok(!err, 'no error for hello1') + done() + }) + instance.addSubscriptions(client, subs2, err => { + t.assert.ok(!err, 'no error for hello2') + done() + }) }) }) - testInstance('add outgoing packet and stream it', (t, instance) => { + test('add outgoing packet and stream it', async (t) => { + t.plan(2) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -910,19 +978,16 @@ function abstractPersistence (opts) { messageId: undefined } - instance.outgoingEnqueue(sub, packet, err => { - assert.ifError(err) - const stream = instance.outgoingStream(client) - - getArrayFromStream(stream).then(list => { - const packet = list[0] - testPacket(t, packet, expected) - instance.destroy() - }) - }) + await outgoingEnqueue(instance, sub, packet) + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + testPacket(t, list[0], expected) + await doCleanup(t, instance) }) - testInstance('add outgoing packet for multiple subs and stream to all', (t, instance) => { + test('add outgoing packet for multiple subs and stream to all', async (t) => { + t.plan(4) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -963,24 +1028,20 @@ function abstractPersistence (opts) { messageId: undefined } - instance.outgoingEnqueueCombi(subs, packet, err => { - assert.ifError(err) - const stream = instance.outgoingStream(client) - getArrayFromStream(stream).then(list => { - const packet = list[0] - testPacket(t, packet, expected) - - const stream2 = instance.outgoingStream(client2) - getArrayFromStream(stream2).then(list2 => { - const packet = list2[0] - testPacket(t, packet, expected) - instance.destroy() - }) - }) - }) + await outgoingEnqueueCombi(instance, subs, packet) + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + testPacket(t, list[0], expected) + + const stream2 = outgoingStream(instance, client2) + const list2 = await getArrayFromStream(stream2) + testPacket(t, list2[0], expected) + await doCleanup(t, instance) }) - testInstance('add outgoing packet as a string and pump', (t, instance) => { + test('add outgoing packet as a string and pump', async (t) => { + t.plan(7) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1008,33 +1069,30 @@ function abstractPersistence (opts) { brokerCounter: 50 } const queue = [] - enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => { - enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => { - const stream = instance.outgoingStream(client) - - async function clearQueue (data) { - return new Promise((resolve, reject) => { - instance.outgoingUpdate(client, data, - (err, client, packet) => { - assert.ok(!err, 'no error') - queue.push(packet) - resolve() - }) - }) - } - streamForEach(stream, clearQueue).then(function done () { - assert.equal(queue.length, 2) - if (queue.length === 2) { - assert.deepEqual(deClassed(queue[0]), deClassed(updated1)) - assert.deepEqual(deClassed(queue[1]), deClassed(updated2)) - } - instance.destroy() - }) - }) - }) + + const updated1 = await enqueueAndUpdate(t, instance, client, sub, packet1, 42) + const updated2 = await enqueueAndUpdate(t, instance, client, sub, packet2, 43) + const stream = outgoingStream(instance, client) + + async function clearQueue (data) { + const { repacket } = await outgoingUpdate(instance, client, data) + t.diagnostic('packet received') + queue.push(repacket) + } + + const list = await getArrayFromStream(stream) + for (const data of list) { + await clearQueue(data) + } + t.assert.equal(queue.length, 2) + t.assert.deepEqual(deClassed(queue[0]), deClassed(updated1)) + t.assert.deepEqual(deClassed(queue[1]), deClassed(updated2)) + await doCleanup(t, instance) }) - testInstance('add outgoing packet as a string and stream', (t, instance) => { + test('add outgoing packet as a string and stream', async (t) => { + t.plan(2) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1066,19 +1124,16 @@ function abstractPersistence (opts) { messageId: undefined } - instance.outgoingEnqueueCombi([sub], packet, err => { - assert.ifError(err) - const stream = instance.outgoingStream(client) - - getArrayFromStream(stream).then(list => { - const packet = list[0] - testPacket(t, packet, expected) - instance.destroy() - }) - }) + await outgoingEnqueueCombi(instance, [sub], packet) + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + testPacket(t, list[0], expected) + await doCleanup(t, instance) }) - testInstance('add outgoing packet and stream it twice', (t, instance) => { + test('add outgoing packet and stream it twice', async (t) => { + t.plan(5) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1111,42 +1166,20 @@ function abstractPersistence (opts) { messageId: undefined } - instance.outgoingEnqueueCombi([sub], packet, err => { - assert.ifError(err) - const stream = instance.outgoingStream(client) - - getArrayFromStream(stream).then(list => { - const packet = list[0] - testPacket(t, packet, expected) - - const stream2 = instance.outgoingStream(client) - - getArrayFromStream(stream2).then(list2 => { - const packet = list2[0] - testPacket(t, packet, expected) - assert.notEqual(packet, expected, 'packet must be a different object') - instance.destroy() - }) - }) - }) + await outgoingEnqueueCombi(instance, [sub], packet) + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + testPacket(t, list[0], expected) + const stream2 = outgoingStream(instance, client) + const list2 = await getArrayFromStream(stream2) + testPacket(t, list2[0], expected) + t.assert.notEqual(packet, expected, 'packet must be a different object') + await doCleanup(t, instance) }) - function enqueueAndUpdate (t, instance, client, sub, packet, messageId, callback) { - instance.outgoingEnqueueCombi([sub], packet, err => { - assert.ifError(err) - const updated = new Packet(packet) - updated.messageId = messageId - - instance.outgoingUpdate(client, updated, (err, reclient, repacket) => { - assert.ifError(err) - assert.equal(reclient, client, 'client matches') - assert.equal(repacket, updated, 'packet matches') - callback(updated) - }) - }) - } - - testInstance('add outgoing packet and update messageId', (t, instance) => { + test('add outgoing packet and update messageId', async (t) => { + t.plan(5) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1165,20 +1198,20 @@ function abstractPersistence (opts) { brokerCounter: 42 } - enqueueAndUpdate(t, instance, client, sub, packet, 42, updated => { - const stream = instance.outgoingStream(client) - delete updated.messageId - getArrayFromStream(stream).then(list => { - delete list[0].messageId - assert.notEqual(list[0], updated, 'must not be the same object') - assert.deepEqual(deClassed(list[0]), deClassed(updated), 'must return the packet') - assert.equal(list.length, 1, 'must return only one packet') - instance.destroy() - }) - }) + const updated = await enqueueAndUpdate(t, instance, client, sub, packet, 42) + updated.messageId = undefined + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + list[0].messageId = undefined + t.assert.notEqual(list[0], updated, 'must not be the same object') + t.assert.deepEqual(deClassed(list[0]), deClassed(updated), 'must return the packet') + t.assert.equal(list.length, 1, 'must return only one packet') + await doCleanup(t, instance) }) - testInstance('add 2 outgoing packet and clear messageId', (t, instance) => { + test('add 2 outgoing packet and clear messageId', async (t) => { + t.plan(10) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1208,28 +1241,25 @@ function abstractPersistence (opts) { brokerCounter: 43 } - enqueueAndUpdate(t, instance, client, sub, packet1, 42, updated1 => { - enqueueAndUpdate(t, instance, client, sub, packet2, 43, updated2 => { - instance.outgoingClearMessageId(client, updated1, (err, packet) => { - assert.ifError(err) - assert.deepEqual(packet.messageId, 42, 'must have the same messageId') - assert.deepEqual(packet.payload.toString(), packet1.payload.toString(), 'must have original payload') - assert.deepEqual(packet.topic, packet1.topic, 'must have original topic') - const stream = instance.outgoingStream(client) - delete updated2.messageId - getArrayFromStream(stream).then(list => { - delete list[0].messageId - assert.notEqual(list[0], updated2, 'must not be the same object') - assert.deepEqual(deClassed(list[0]), deClassed(updated2), 'must return the packet') - assert.equal(list.length, 1, 'must return only one packet') - instance.destroy() - }) - }) - }) - }) + const updated1 = await enqueueAndUpdate(t, instance, client, sub, packet1, 42) + const updated2 = await enqueueAndUpdate(t, instance, client, sub, packet2, 43) + const pkt = await outgoingClearMessageId(instance, client, updated1) + t.assert.deepEqual(pkt.messageId, 42, 'must have the same messageId') + t.assert.deepEqual(pkt.payload.toString(), packet1.payload.toString(), 'must have original payload') + t.assert.deepEqual(pkt.topic, packet1.topic, 'must have original topic') + const stream = outgoingStream(instance, client) + updated2.messageId = undefined + const list = await getArrayFromStream(stream) + list[0].messageId = undefined + t.assert.notEqual(list[0], updated2, 'must not be the same object') + t.assert.deepEqual(deClassed(list[0]), deClassed(updated2), 'must return the packet') + t.assert.equal(list.length, 1, 'must return only one packet') + await doCleanup(t, instance) }) - testInstance('add many outgoing packets and clear messageIds', async (t, instance) => { + test('add many outgoing packets and clear messageIds', async (t) => { + // t.plan() is called below after we know the high watermark + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1246,64 +1276,44 @@ function abstractPersistence (opts) { retain: false } - function outStream (instance, client) { - return iterableStream(instance.outgoingStream(client)) - } - // we just need a stream to figure out the high watermark - const stream = outStream(instance, client) + const stream = (outgoingStream(instance, client)) const total = stream.readableHighWaterMark * 2 - - function submitMessage (id) { - return new Promise((resolve, reject) => { - const p = new Packet(packet, instance.broker) - p.messageId = id - instance.outgoingEnqueue(sub, p, (err) => { - if (err) { - return reject(err) - } - instance.outgoingUpdate(client, p, resolve) - }) - }) - } - - function clearMessage (p) { - return new Promise((resolve, reject) => { - instance.outgoingClearMessageId(client, p, (err, received) => { - assert.ifError(err) - assert.deepEqual(received, p, 'must return the packet') - resolve() - }) - }) - } + t.plan(total * 2) for (let i = 0; i < total; i++) { - await submitMessage(i) + const p = new Packet(packet, instance.broker) + p.messageId = i + await outgoingEnqueue(instance, sub, p) + await outgoingUpdate(instance, client, p) } let queued = 0 - for await (const p of outStream(instance, client)) { + for await (const p of (outgoingStream(instance, client))) { if (p) { queued++ } } - assert.equal(queued, total, `outgoing queue must hold ${total} items`) + t.assert.equal(queued, total, `outgoing queue must hold ${total} items`) - for await (const p of outStream(instance, client)) { - await clearMessage(p) + for await (const p of (outgoingStream(instance, client))) { + const received = await outgoingClearMessageId(instance, client, p) + t.assert.deepEqual(received, p, 'must return the packet') } let queued2 = 0 - for await (const p of outStream(instance, client)) { + for await (const p of (outgoingStream(instance, client))) { if (p) { queued2++ } } - assert.equal(queued2, 0, 'outgoing queue is empty') - instance.destroy() + t.assert.equal(queued2, 0, 'outgoing queue is empty') + await doCleanup(t, instance) }) - testInstance('update to publish w/ same messageId', (t, instance) => { + test('update to publish w/ same messageId', async (t) => { + t.plan(5) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1335,26 +1345,23 @@ function abstractPersistence (opts) { messageId: 42 } - instance.outgoingEnqueue(sub, packet1, () => { - instance.outgoingEnqueue(sub, packet2, () => { - instance.outgoingUpdate(client, packet1, () => { - instance.outgoingUpdate(client, packet2, () => { - const stream = instance.outgoingStream(client) - getArrayFromStream(stream).then(list => { - assert.equal(list.length, 2, 'must have two items in queue') - assert.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match') - assert.equal(list[0].messageId, packet1.messageId, 'messageId must match') - assert.equal(list[1].brokerCounter, packet2.brokerCounter, 'brokerCounter must match') - assert.equal(list[1].messageId, packet2.messageId, 'messageId must match') - instance.destroy() - }) - }) - }) - }) - }) + await outgoingEnqueue(instance, sub, packet1) + await outgoingEnqueue(instance, sub, packet2) + await outgoingUpdate(instance, client, packet1) + await outgoingUpdate(instance, client, packet2) + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + t.assert.equal(list.length, 2, 'must have two items in queue') + t.assert.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match') + t.assert.equal(list[0].messageId, packet1.messageId, 'messageId must match') + t.assert.equal(list[1].brokerCounter, packet2.brokerCounter, 'brokerCounter must match') + t.assert.equal(list[1].messageId, packet2.messageId, 'messageId must match') + await doCleanup(t, instance) }) - testInstance('update to pubrel', (t, instance) => { + test('update to pubrel', async (t) => { + t.plan(3) + const instance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1373,36 +1380,28 @@ function abstractPersistence (opts) { brokerCounter: 42 } - instance.outgoingEnqueueCombi([sub], packet, err => { - assert.ifError(err) - const updated = new Packet(packet) - updated.messageId = 42 + await outgoingEnqueueCombi(instance, [sub], packet) + const updated = new Packet(packet) + updated.messageId = 42 + const { reclient, repacket } = await outgoingUpdate(instance, client, updated) + t.assert.equal(reclient, client, 'client matches') + t.assert.equal(repacket, updated, 'packet matches') - instance.outgoingUpdate(client, updated, (err, reclient, repacket) => { - assert.ifError(err) - assert.equal(reclient, client, 'client matches') - assert.equal(repacket, updated, 'packet matches') - - const pubrel = { - cmd: 'pubrel', - messageId: updated.messageId - } - - instance.outgoingUpdate(client, pubrel, err => { - assert.ifError(err) - - const stream = instance.outgoingStream(client) + const pubrel = { + cmd: 'pubrel', + messageId: updated.messageId + } - getArrayFromStream(stream).then(list => { - assert.deepEqual(list, [pubrel], 'must return the packet') - instance.destroy() - }) - }) - }) - }) + await outgoingUpdate(instance, client, pubrel) + const stream = outgoingStream(instance, client) + const list = await getArrayFromStream(stream) + t.assert.deepEqual(list, [pubrel], 'must return the packet') + await doCleanup(t, instance) }) - testInstance('add incoming packet, get it, and clear with messageId', (t, instance) => { + test('add incoming packet, get it, and clear with messageId', async (t) => { + t.plan(3) + const instance = await persistence(t) const client = { id: 'abcde' } @@ -1416,40 +1415,36 @@ function abstractPersistence (opts) { retain: false, messageId: 42 } - - instance.incomingStorePacket(client, packet, err => { - assert.ifError(err) - - instance.incomingGetPacket(client, { + await incomingStorePacket(instance, client, packet) + const retrieved = await incomingGetPacket(instance, client, { + messageId: packet.messageId + }) + // adjusting the objects so they match + delete retrieved.brokerCounter + delete retrieved.brokerId + delete packet.length + // strip the class identifier from the packet + const result = structuredClone(retrieved) + // Convert Uint8 to Buffer for comparison + result.payload = Buffer.from(result.payload) + t.assert.deepEqual(result, packet, 'retrieved packet must be deeply equal') + t.assert.notEqual(retrieved, packet, 'retrieved packet must not be the same object') + await incomingDelPacket(instance, client, retrieved) + + try { + await incomingGetPacket(instance, client, { messageId: packet.messageId - }, (err, retrieved) => { - assert.ifError(err) - - // adjusting the objects so they match - delete retrieved.brokerCounter - delete retrieved.brokerId - delete packet.length - // strip the class identifier from the packet - const result = structuredClone(retrieved) - // Convert Uint8 to Buffer for comparison - result.payload = Buffer.from(result.payload) - assert.deepEqual(result, packet, 'retrieved packet must be deeply equal') - assert.notEqual(retrieved, packet, 'retrieved packet must not be the same object') - - instance.incomingDelPacket(client, retrieved, err => { - assert.ifError(err) - instance.incomingGetPacket(client, { - messageId: packet.messageId - }, (err, retrieved) => { - assert.ok(err, 'must error') - instance.destroy() - }) - }) }) - }) + t.assert.ok(false, 'must error') + } catch (err) { + t.assert.ok(err, 'must error') + await doCleanup(t, instance) + } }) - testInstance('store, fetch and delete will message', (t, instance) => { + test('store, fetch and delete will message', async (t) => { + t.plan(7) + const instance = await persistence(t) const client = { id: '12345' } @@ -1460,30 +1455,24 @@ function abstractPersistence (opts) { retain: true } - instance.putWill(client, expected, (err, c) => { - assert.ifError(err, 'no error') - assert.equal(c, client, 'client matches') - instance.getWill(client, (err, packet, c) => { - assert.ifError(err, 'no error') - assert.deepEqual(packet, expected, 'will matches') - assert.equal(c, client, 'client matches') - client.brokerId = packet.brokerId - instance.delWill(client, (err, packet, c) => { - assert.ifError(err, 'no error') - assert.deepEqual(packet, expected, 'will matches') - assert.equal(c, client, 'client matches') - instance.getWill(client, (err, packet, c) => { - assert.ifError(err, 'no error') - assert.ok(!packet, 'no will after del') - assert.equal(c, client, 'client matches') - instance.destroy() - }) - }) - }) - }) + const c = await putWill(instance, client, expected) + t.assert.equal(c, client, 'client matches') + const { packet: p1, reClient: c1 } = await getWill(instance, client) + t.assert.deepEqual(p1, expected, 'will matches') + t.assert.equal(c1, client, 'client matches') + client.brokerId = p1.brokerId + const { packet: p2, reClient: c2 } = await delWill(instance, client) + t.assert.deepEqual(p2, expected, 'will matches') + t.assert.equal(c2, client, 'client matches') + const { packet: p3, reClient: c3 } = await getWill(instance, client) + t.assert.ok(!p3, 'no will after del') + t.assert.equal(c3, client, 'client matches') + await doCleanup(t, instance) }) - testInstance('stream all will messages', (t, instance) => { + test('stream all will messages', async (t) => { + t.plan(3) + const instance = await persistence(t) const client = { id: '12345', brokerId: instance.broker.id @@ -1495,27 +1484,28 @@ function abstractPersistence (opts) { retain: true } - instance.putWill(client, toWrite, (err, c) => { - assert.ifError(err, 'no error') - assert.equal(c, client, 'client matches') - streamForEach(instance.streamWill(), (chunk) => { - assert.deepEqual(chunk, { - clientId: client.id, - brokerId: instance.broker.id, - topic: 'hello/died', - payload: Buffer.from('muahahha'), - qos: 0, - retain: true - }, 'packet matches') - instance.delWill(client, (err, result, client) => { - assert.ifError(err, 'no error') - instance.destroy() - }) - }) - }) + const expected = { + clientId: client.id, + brokerId: instance.broker.id, + topic: 'hello/died', + payload: Buffer.from('muahahha'), + qos: 0, + retain: true + } + + const c = await putWill(instance, client, toWrite) + t.assert.equal(c, client, 'client matches') + const stream = iterableStream(instance.streamWill()) + const list = await getArrayFromStream(stream) + t.assert.equal(list.length, 1, 'must return only one packet') + t.assert.deepEqual(list[0], expected, 'packet matches') + await delWill(instance, client) + await doCleanup(t, instance) }) - testInstance('stream all will message for unknown brokers', (t, instance) => { + test('stream all will message for unknown brokers', async (t) => { + t.plan(4) + const instance = await persistence(t) const originalId = instance.broker.id const client = { id: '42', @@ -1537,35 +1527,32 @@ function abstractPersistence (opts) { qos: 0, retain: true } + const expected = { + clientId: client.id, + brokerId: originalId, + topic: 'hello/died42', + payload: Buffer.from('muahahha'), + qos: 0, + retain: true + } - instance.putWill(client, toWrite1, (err, c) => { - assert.ifError(err, 'no error') - assert.equal(c, client, 'client matches') - instance.broker.id = 'anotherBroker' - instance.putWill(anotherClient, toWrite2, (err, c) => { - assert.ifError(err, 'no error') - assert.equal(c, anotherClient, 'client matches') - streamForEach(instance.streamWill({ - anotherBroker: Date.now() - }), (chunk) => { - assert.deepEqual(chunk, { - clientId: client.id, - brokerId: originalId, - topic: 'hello/died42', - payload: Buffer.from('muahahha'), - qos: 0, - retain: true - }, 'packet matches') - instance.delWill(client, (err, result, client) => { - assert.ifError(err, 'no error') - instance.destroy() - }) - }) - }) - }) + const c = await putWill(instance, client, toWrite1) + t.assert.equal(c, client, 'client matches') + instance.broker.id = 'anotherBroker' + const c2 = await putWill(instance, anotherClient, toWrite2) + t.assert.equal(c2, anotherClient, 'client matches') + const stream = iterableStream(instance.streamWill({ + anotherBroker: Date.now() + })) + const list = await getArrayFromStream(stream) + t.assert.equal(list.length, 1, 'must return only one packet') + t.assert.deepEqual(list[0], expected, 'packet matches') + await delWill(instance, client) + await doCleanup(t, instance) }) - testInstance('delete wills from dead brokers', (t, instance) => { + test('delete wills from dead brokers', async (t) => { + const instance = await persistence(t) const client = { id: '42' } @@ -1577,27 +1564,22 @@ function abstractPersistence (opts) { retain: true } - instance.putWill(client, toWrite1, (err, c) => { - assert.ifError(err, 'no error') - assert.equal(c, client, 'client matches') - instance.broker.id = 'anotherBroker' - client.brokerId = instance.broker.id - instance.delWill(client, (err, result, client) => { - assert.ifError(err, 'no error') - instance.destroy() - }) - }) + const c = await putWill(instance, client, toWrite1) + t.assert.equal(c, client, 'client matches') + instance.broker.id = 'anotherBroker' + client.brokerId = instance.broker.id + await delWill(instance, client) + await doCleanup(t, instance) }) - testInstance('do not error if unkown messageId in outoingClearMessageId', (t, instance) => { + test('do not error if unkown messageId in outoingClearMessageId', async (t) => { + const instance = await persistence(t) const client = { id: 'abc-123' } - instance.outgoingClearMessageId(client, 42, err => { - assert.ifError(err) - instance.destroy() - }) + await outgoingClearMessageId(instance, client, 42) + await doCleanup(t, instance) }) } diff --git a/test.js b/test.js index b29cc03..106532d 100644 --- a/test.js +++ b/test.js @@ -1,8 +1,29 @@ -const test = require('node:test') -const memory = require('./') +const { test } = require('node:test') +const events = require('node:events') +const memory = require('./persistence') const abs = require('./abstract') abs({ test, persistence: memory }) + +// create a memory instance that includes an event emitter +// to test the on-ready functionality +function createAsyncMemory (opts) { + const mem = memory(opts) + mem.emitter = new events.EventEmitter() + mem.on = mem.emitter.on.bind(mem.emitter) + mem.off = mem.emitter.removeListener.bind(mem.emitter) + mem.emit = mem.emitter.emit.bind(mem.emitter) + mem.once = mem.emitter.once.bind(mem.emitter) + mem.removeAllListeners = mem.emitter.removeAllListeners.bind(mem.emitter) + // wait 100ms before emitting ready, to simulate setup activities + setTimeout(() => mem.emit('ready'), 100) + return mem +} + +abs({ + test, + persistence: createAsyncMemory +})