From 0093f6409809ad22b52938ecde8587386f3acca8 Mon Sep 17 00:00:00 2001 From: Hans Klunder Date: Sun, 27 Apr 2025 15:39:13 +0200 Subject: [PATCH] feature: externalize promisified versions of the persistence interface for reuse --- abstract.js | 774 ++++++++++++++++--------------------------------- package.json | 1 + promisified.js | 299 +++++++++++++++++++ 3 files changed, 552 insertions(+), 522 deletions(-) create mode 100644 promisified.js diff --git a/abstract.js b/abstract.js index f94fce3..b4e6962 100644 --- a/abstract.js +++ b/abstract.js @@ -1,276 +1,32 @@ -const { Readable } = require('node:stream') const Packet = require('aedes-packet') - -// promisified versions of the instance methods -// to avoid deep callbacks while testing -function storeRetained (instance, packet) { - return new Promise((resolve, reject) => { - instance.storeRetained(packet, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function addSubscriptions (instance, client, subs) { - return new Promise((resolve, reject) => { - instance.addSubscriptions(client, subs, (err, reClient) => { - if (err) { - reject(err) - } else { - resolve(reClient) - } - }) - }) -} - -async function removeSubscriptions (instance, client, subs) { - return new Promise((resolve, reject) => { - instance.removeSubscriptions(client, subs, (err, reClient) => { - if (err) { - reject(err) - } else { - resolve(reClient) - } - }) - }) -} - -async function subscriptionsByClient (instance, client) { - return new Promise((resolve, reject) => { - instance.subscriptionsByClient(client, (err, resubs, reClient) => { - if (err) { - reject(err) - } else { - resolve({ resubs, reClient }) - } - }) - }) -} - -async function subscriptionsByTopic (instance, topic) { - return new Promise((resolve, reject) => { - instance.subscriptionsByTopic(topic, (err, resubs) => { - if (err) { - reject(err) - } else { - resolve(resubs) - } - }) - }) -} - -async function cleanSubscriptions (instance, client) { - return new Promise((resolve, reject) => { - instance.cleanSubscriptions(client, (err) => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function countOffline (instance) { - return new Promise((resolve, reject) => { - instance.countOffline((err, subsCount, clientsCount) => { - if (err) { - reject(err) - } else { - resolve({ subsCount, clientsCount }) - } - }) - }) -} - -async function outgoingEnqueue (instance, sub, packet) { - return new Promise((resolve, reject) => { - instance.outgoingEnqueue(sub, packet, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function outgoingEnqueueCombi (instance, subs, packet) { - return new Promise((resolve, reject) => { - instance.outgoingEnqueueCombi(subs, packet, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function outgoingClearMessageId (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.outgoingClearMessageId(client, packet, (err, repacket) => { - if (err) { - reject(err) - } else { - resolve(repacket) - } - }) - }) -} - -async function outgoingUpdate (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.outgoingUpdate(client, packet, (err, reclient, repacket) => { - if (err) { - reject(err) - } else { - resolve({ reclient, repacket }) - } - }) - }) -} - -async function incomingStorePacket (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.incomingStorePacket(client, packet, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} -async function incomingGetPacket (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.incomingGetPacket(client, packet, (err, retrieved) => { - if (err) { - reject(err) - } else { - resolve(retrieved) - } - }) - }) -} - -async function incomingDelPacket (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.incomingDelPacket(client, packet, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function putWill (instance, client, packet) { - return new Promise((resolve, reject) => { - instance.putWill(client, packet, (err, reClient) => { - if (err) { - reject(err) - } else { - resolve(reClient) - } - }) - }) -} - -async function getWill (instance, client) { - return new Promise((resolve, reject) => { - instance.getWill(client, (err, packet, reClient) => { - if (err) { - reject(err) - } else { - resolve({ packet, reClient }) - } - }) - }) -} - -async function delWill (instance, client) { - return new Promise((resolve, reject) => { - instance.delWill(client, (err, packet, reClient) => { - if (err) { - reject(err) - } else { - resolve({ packet, reClient }) - } - }) - }) -} -// end of promisified versions of instance methods +const { PromisifiedPersistence, waitForEvent, getArrayFromStream } = require('./promisified.js') // helper functions -function waitForEvent (obj, resolveEvt) { - return new Promise((resolve, reject) => { - obj.once(resolveEvt, () => { - resolve() - }) - obj.once('error', reject) - }) -} -async function doCleanup (t, instance) { - const instanceDestroy = new Promise((resolve, reject) => { - instance.destroy((err) => { - if (err) { - reject(err) - return - } - resolve() - }) - }) - await instanceDestroy +async function doCleanup (t, prInstance) { + await prInstance.destroy() t.diagnostic('instance cleaned up') } -// legacy third party streams are typically not iterable -function iterableStream (stream) { - if (typeof stream[Symbol.asyncIterator] !== 'function') { - return new Readable({ objectMode: true }).wrap(stream) - } - return stream -} -// end of legacy third party streams support - -function outgoingStream (instance, client) { - return iterableStream(instance.outgoingStream(client)) -} - -async function getArrayFromStream (stream) { - const list = [] - for await (const item of iterableStream(stream)) { - list.push(item) - } - return list -} - -async function storeRetainedPacket (instance, opts = {}) { +async function storeRetainedPacket (prInstance, opts = {}) { const packet = { cmd: 'publish', - id: instance.broker.id, + id: prInstance.broker.id, topic: opts.topic || 'hello/world', payload: opts.payload || Buffer.from('muahah'), qos: 0, retain: true } - await storeRetained(instance, packet) + await prInstance.storeRetained(packet) return packet } -async function enqueueAndUpdate (t, instance, client, sub, packet, messageId) { - await outgoingEnqueueCombi(instance, [sub], packet) +async function enqueueAndUpdate (t, prInstance, client, sub, packet, messageId) { + await prInstance.outgoingEnqueueCombi([sub], packet) const updated = new Packet(packet) updated.messageId = messageId - const { reclient, repacket } = await outgoingUpdate(instance, client, updated) + const { reclient, repacket } = await prInstance.outgoingUpdate(client, updated) t.assert.equal(reclient, client, 'client matches') t.assert.equal(repacket, updated, 'packet matches') return repacket @@ -279,7 +35,6 @@ async function enqueueAndUpdate (t, instance, client, sub, packet, messageId) { function testPacket (t, packet, expected) { if (packet.messageId === null) packet.messageId = undefined t.assert.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') - // deepLooseEqual? t.assert.deepEqual(structuredClone(packet), expected, 'must return the packet') } @@ -310,7 +65,7 @@ function abstractPersistence (opts) { const instance = await _persistence() if (instance) { - // instance.broker must be set first because setting it triggers + // prInstance.broker must be set first because setting it triggers // the call of instance._setup if aedes-cached-persistence is being used // instance._setup then fires the 'ready'event instance.broker = broker @@ -321,25 +76,26 @@ function abstractPersistence (opts) { await waitForEvent(instance, 'ready') } t.diagnostic('instance created') - return instance + const prInstance = new PromisifiedPersistence(instance) + return prInstance } throw new Error('no instance') } async function matchRetainedWithPattern (t, pattern) { - const instance = await persistence(t) - const packet = await storeRetainedPacket(instance) + const prInstance = await persistence(t) + const packet = await storeRetainedPacket(prInstance) let stream if (Array.isArray(pattern)) { - stream = instance.createRetainedStreamCombi(pattern) + stream = prInstance.createRetainedStreamCombi(pattern) } else { - stream = instance.createRetainedStream(pattern) + stream = prInstance.createRetainedStream(pattern) } t.diagnostic('created stream') const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [packet], 'must return the packet') t.diagnostic('stream was ok') - await doCleanup(t, instance) + await doCleanup(t, prInstance) } // testing starts here @@ -370,7 +126,7 @@ function abstractPersistence (opts) { test('store multiple retained messages in order', async (t) => { t.plan(1000) - const instance = await persistence(t) + const prInstance = await persistence(t) const totalMessages = 1000 const retained = { @@ -382,45 +138,45 @@ function abstractPersistence (opts) { } for (let i = 0; i < totalMessages; i++) { - const packet = new Packet(retained, instance.broker) - await storeRetainedPacket(instance, packet) + const packet = new Packet(retained, prInstance.broker) + await storeRetainedPacket(prInstance, packet) t.assert.equal(packet.brokerCounter, i + 1, 'packet stored in order') } - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('remove retained message', async (t) => { t.plan(1) - const instance = await persistence(t) - await storeRetainedPacket(instance, {}) - await storeRetainedPacket(instance, { + const prInstance = await persistence(t) + await storeRetainedPacket(prInstance, {}) + await storeRetainedPacket(prInstance, { payload: Buffer.alloc(0) }) - const stream = instance.createRetainedStream('#') + const stream = prInstance.createRetainedStream('#') const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [], 'must return an empty list') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('storing twice a retained message should keep only the last', async (t) => { t.plan(1) - const instance = await persistence(t) - await storeRetainedPacket(instance, {}) - const packet = await storeRetainedPacket(instance, { + const prInstance = await persistence(t) + await storeRetainedPacket(prInstance, {}) + const packet = await storeRetainedPacket(prInstance, { payload: Buffer.from('ahah') }) - const stream = instance.createRetainedStream('#') + const stream = prInstance.createRetainedStream('#') const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [packet], 'must return the last packet') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('Create a new packet while storing a retained message', async (t) => { t.plan(1) - const instance = await persistence(t) + const prInstance = await persistence(t) const packet = { cmd: 'publish', - id: instance.broker.id, + id: prInstance.broker.id, topic: opts.topic || 'hello/world', payload: opts.payload || Buffer.from('muahah'), qos: 0, @@ -428,18 +184,18 @@ function abstractPersistence (opts) { } const newPacket = Object.assign({}, packet) - await storeRetained(instance, packet) + await prInstance.storeRetained(packet) // packet reference change to check if a new packet is stored always packet.retain = false - const stream = instance.createRetainedStream('#') + const stream = prInstance.createRetainedStream('#') const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [newPacket], 'must return the last packet') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('store and look up subscriptions by client', async (t) => { t.plan(3) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -461,17 +217,17 @@ function abstractPersistence (opts) { nl: false }] - const reClient = await addSubscriptions(instance, client, subs) + const reClient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient, client, 'client must be the same') - const { resubs, reClient: reClient2 } = await subscriptionsByClient(instance, client) + const { resubs, reClient: reClient2 } = await prInstance.subscriptionsByClient(client) t.assert.equal(reClient2, client, 'client must be the same') t.assert.deepEqual(resubs, subs) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('remove subscriptions by client', async (t) => { t.plan(4) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -487,11 +243,11 @@ function abstractPersistence (opts) { nl: false }] - const reclient1 = await addSubscriptions(instance, client, subs) + const reclient1 = await prInstance.addSubscriptions(client, subs) t.assert.equal(reclient1, client, 'client must be the same') - const reClient2 = await removeSubscriptions(instance, client, ['hello']) + const reClient2 = await prInstance.removeSubscriptions(client, ['hello']) t.assert.equal(reClient2, client, 'client must be the same') - const { resubs, reClient } = await subscriptionsByClient(instance, client) + const { resubs, reClient } = await prInstance.subscriptionsByClient(client) t.assert.equal(reClient, client, 'client must be the same') t.assert.deepEqual(resubs, [{ topic: 'matteo', @@ -500,12 +256,12 @@ function abstractPersistence (opts) { rap: true, nl: false }]) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('store and look up subscriptions by topic', async (t) => { t.plan(2) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -527,9 +283,9 @@ function abstractPersistence (opts) { nl: false }] - const reclient = await addSubscriptions(instance, client, subs) + const reclient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reclient, client, 'client must be the same') - const resubs = await subscriptionsByTopic(instance, 'hello') + const resubs = await prInstance.subscriptionsByTopic('hello') t.assert.deepEqual(resubs, [{ clientId: client.id, topic: 'hello/#', @@ -545,12 +301,12 @@ function abstractPersistence (opts) { rap: true, nl: false }]) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('get client list after subscriptions', async (t) => { t.plan(1) - const instance = await persistence(t) + const prInstance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'efghi' } const subs = [{ @@ -558,52 +314,52 @@ function abstractPersistence (opts) { qos: 1 }] - await addSubscriptions(instance, client1, subs) - await addSubscriptions(instance, client2, subs) - const stream = instance.getClientList(subs[0].topic) + await prInstance.addSubscriptions(client1, subs) + await prInstance.addSubscriptions(client2, subs) + const stream = prInstance.getClientList(subs[0].topic) const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [client1.id, client2.id]) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('get client list after an unsubscribe', async (t) => { t.plan(1) - const instance = await persistence(t) + const prInstance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'efghi' } const subs = [{ topic: 'helloagain', qos: 1 }] - await addSubscriptions(instance, client1, subs) - await addSubscriptions(instance, client2, subs) - await removeSubscriptions(instance, client2, [subs[0].topic]) - const stream = instance.getClientList(subs[0].topic) + await prInstance.addSubscriptions(client1, subs) + await prInstance.addSubscriptions(client2, subs) + await prInstance.removeSubscriptions(client2, [subs[0].topic]) + const stream = prInstance.getClientList(subs[0].topic) const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [client1.id]) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('get subscriptions list after an unsubscribe', async (t) => { t.plan(1) - const instance = await persistence(t) + const prInstance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'efghi' } const subs = [{ topic: 'helloagain', qos: 1 }] - await addSubscriptions(instance, client1, subs) - await addSubscriptions(instance, client2, subs) - await removeSubscriptions(instance, client2, [subs[0].topic]) - const clients = await subscriptionsByTopic(instance, subs[0].topic) + await prInstance.addSubscriptions(client1, subs) + await prInstance.addSubscriptions(client2, subs) + await prInstance.removeSubscriptions(client2, [subs[0].topic]) + const clients = await prInstance.subscriptionsByTopic(subs[0].topic) t.assert.deepEqual(clients[0].clientId, client1.id) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('QoS 0 subscriptions, restored but not matched', async (t) => { t.plan(2) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -625,10 +381,10 @@ function abstractPersistence (opts) { nl: false }] - await addSubscriptions(instance, client, subs) - const { resubs } = await subscriptionsByClient(instance, client) + await prInstance.addSubscriptions(client, subs) + const { resubs } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(resubs, subs) - const resubs2 = await subscriptionsByTopic(instance, 'hello') + const resubs2 = await prInstance.subscriptionsByTopic('hello') t.assert.deepEqual(resubs2, [{ clientId: client.id, topic: 'hello/#', @@ -637,12 +393,12 @@ function abstractPersistence (opts) { rap: true, nl: false }]) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('clean subscriptions', async (t) => { t.plan(4) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -652,37 +408,37 @@ function abstractPersistence (opts) { qos: 1 }] - await addSubscriptions(instance, client, subs) - await cleanSubscriptions(instance, client) - const resubs = await subscriptionsByTopic(instance, 'hello') + await prInstance.addSubscriptions(client, subs) + await prInstance.cleanSubscriptions(client) + const resubs = await prInstance.subscriptionsByTopic('hello') t.assert.deepEqual(resubs, [], 'no subscriptions') - const { resubs: resubs2 } = await subscriptionsByClient(instance, client) + const { resubs: resubs2 } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(resubs2, null, 'no subscriptions') - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() t.assert.equal(subsCount, 0, 'no subscriptions added') t.assert.equal(clientsCount, 0, 'no clients added') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('clean subscriptions with no active subscriptions', async (t) => { t.plan(4) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } - await cleanSubscriptions(instance, client) - const resubs = await subscriptionsByTopic(instance, 'hello') + await prInstance.cleanSubscriptions(client) + const resubs = await prInstance.subscriptionsByTopic('hello') t.assert.deepEqual(resubs, [], 'no subscriptions') - const { resubs: resubs2 } = await subscriptionsByClient(instance, client) + const { resubs: resubs2 } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(resubs2, null, 'no subscriptions') - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() t.assert.equal(subsCount, 0, 'no subscriptions added') t.assert.equal(clientsCount, 0, 'no clients added') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('same topic, different QoS', async (t) => { t.plan(5) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -698,9 +454,9 @@ function abstractPersistence (opts) { nl: false }] - const reClient = await addSubscriptions(instance, client, subs) + const reClient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient, client, 'client must be the same') - const { resubs } = await subscriptionsByClient(instance, client) + const { resubs } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(resubs, [{ topic: 'hello', qos: 1, @@ -709,7 +465,7 @@ function abstractPersistence (opts) { nl: false }]) - const resubs2 = await subscriptionsByTopic(instance, 'hello') + const resubs2 = await prInstance.subscriptionsByTopic('hello') t.assert.deepEqual(resubs2, [{ clientId: 'abcde', topic: 'hello', @@ -719,15 +475,15 @@ function abstractPersistence (opts) { nl: false }]) - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() t.assert.equal(subsCount, 1, 'one subscription added') t.assert.equal(clientsCount, 1, 'one client added') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('replace subscriptions', async (t) => { t.plan(25) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const sub = { topic, rh: 0, rap: true, nl: false } @@ -735,13 +491,13 @@ function abstractPersistence (opts) { async function check (qos) { sub.qos = subByTopic.qos = qos - const reClient = await addSubscriptions(instance, client, [sub]) + const reClient = await prInstance.addSubscriptions(client, [sub]) t.assert.equal(reClient, client, 'client must be the same') - const { resubs } = await subscriptionsByClient(instance, client) + const { resubs } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(resubs, [sub]) - const subsForTopic = await subscriptionsByTopic(instance, topic) + const subsForTopic = await prInstance.subscriptionsByTopic(topic) t.assert.deepEqual(subsForTopic, qos === 0 ? [] : [subByTopic]) - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() if (qos === 0) { t.assert.equal(subsCount, 0, 'no subscriptions added') } else { @@ -755,12 +511,12 @@ function abstractPersistence (opts) { await check(2) await check(1) await check(0) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('replace subscriptions in same call', async (t) => { t.plan(5) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const subs = [ @@ -770,21 +526,21 @@ function abstractPersistence (opts) { { topic, qos: 1, rh: 0, rap: true, nl: false }, { topic, qos: 0, rh: 0, rap: true, nl: false } ] - const reClient = await addSubscriptions(instance, client, subs) + const reClient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient, client, 'client must be the same') - const { resubs: subsForClient } = await subscriptionsByClient(instance, client) + const { resubs: subsForClient } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(subsForClient, [{ topic, qos: 0, rh: 0, rap: true, nl: false }]) - const subsForTopic = await subscriptionsByTopic(instance, topic) + const subsForTopic = await prInstance.subscriptionsByTopic(topic) t.assert.deepEqual(subsForTopic, []) - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() t.assert.equal(subsCount, 0, 'no subscriptions added') t.assert.equal(clientsCount, 1, 'one client added') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('store and count subscriptions', async (t) => { t.plan(11) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs = [{ topic: 'hello', @@ -797,33 +553,33 @@ function abstractPersistence (opts) { qos: 0 }] - const reclient = await addSubscriptions(instance, client, subs) + const reclient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reclient, client, 'client must be the same') - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() t.assert.equal(subsCount, 2, 'two subscriptions added') t.assert.equal(clientsCount, 1, 'one client added') - await removeSubscriptions(instance, client, ['hello']) - const { subsCount: subsCount2, clientsCount: clientsCount2 } = await countOffline(instance) + await prInstance.removeSubscriptions(client, ['hello']) + const { subsCount: subsCount2, clientsCount: clientsCount2 } = await prInstance.countOffline() t.assert.equal(subsCount2, 1, 'one subscription added') t.assert.equal(clientsCount2, 1, 'one client added') - await removeSubscriptions(instance, client, ['matteo']) - const { subsCount: subsCount3, clientsCount: clientsCount3 } = await countOffline(instance) + await prInstance.removeSubscriptions(client, ['matteo']) + const { subsCount: subsCount3, clientsCount: clientsCount3 } = await prInstance.countOffline() t.assert.equal(subsCount3, 0, 'zero subscriptions added') t.assert.equal(clientsCount3, 1, 'one client added') - await removeSubscriptions(instance, client, ['noqos']) - const { subsCount: subsCount4, clientsCount: clientsCount4 } = await countOffline(instance) + await prInstance.removeSubscriptions(client, ['noqos']) + const { subsCount: subsCount4, clientsCount: clientsCount4 } = await prInstance.countOffline() t.assert.equal(subsCount4, 0, 'zero subscriptions added') t.assert.equal(clientsCount4, 0, 'zero clients added') - await removeSubscriptions(instance, client, ['noqos']) - const { subsCount: subsCount5, clientsCount: clientsCount5 } = await countOffline(instance) + await prInstance.removeSubscriptions(client, ['noqos']) + const { subsCount: subsCount5, clientsCount: clientsCount5 } = await prInstance.countOffline() t.assert.equal(subsCount5, 0, 'zero subscriptions added') t.assert.equal(clientsCount5, 0, 'zero clients added') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('count subscriptions with two clients', async (t) => { t.plan(26) - const instance = await persistence(t) + const prInstance = await persistence(t) const client1 = { id: 'abcde' } const client2 = { id: 'fghij' } const subs = [{ @@ -838,16 +594,16 @@ function abstractPersistence (opts) { }] async function remove (client, subs, expectedSubs, expectedClients) { - const reClient = await removeSubscriptions(instance, client, subs) + const reClient = await prInstance.removeSubscriptions(client, subs) t.assert.equal(reClient, client, 'client must be the same') - const { subsCount, clientsCount } = await countOffline(instance) + const { subsCount, clientsCount } = await prInstance.countOffline() t.assert.equal(subsCount, expectedSubs, 'subscriptions added') t.assert.equal(clientsCount, expectedClients, 'clients added') } - const reClient1 = await addSubscriptions(instance, client1, subs) + const reClient1 = await prInstance.addSubscriptions(client1, subs) t.assert.equal(reClient1, client1, 'client must be the same') - const reClient2 = await addSubscriptions(instance, client2, subs) + const reClient2 = await prInstance.addSubscriptions(client2, subs) t.assert.equal(reClient2, client2, 'client must be the same') await remove(client1, ['foobar'], 4, 2) await remove(client1, ['hello'], 3, 2) @@ -857,12 +613,12 @@ function abstractPersistence (opts) { await remove(client2, ['hello'], 1, 1) await remove(client2, ['matteo'], 0, 1) await remove(client2, ['noqos'], 0, 0) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add duplicate subs to persistence for qos > 0', async (t) => { t.plan(3) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const subs = [{ @@ -873,19 +629,19 @@ function abstractPersistence (opts) { nl: false }] - const reClient = await addSubscriptions(instance, client, subs) + const reClient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient, client, 'client must be the same') - const reClient2 = await addSubscriptions(instance, client, subs) + const reClient2 = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient2, client, 'client must be the same') subs[0].clientId = client.id - const subsForTopic = await subscriptionsByTopic(instance, topic) + const subsForTopic = await prInstance.subscriptionsByTopic(topic) t.assert.deepEqual(subsForTopic, subs) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add duplicate subs to persistence for qos 0', async (t) => { t.plan(3) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } const topic = 'hello' const subs = [{ @@ -896,18 +652,18 @@ function abstractPersistence (opts) { nl: false }] - const reClient = await addSubscriptions(instance, client, subs) + const reClient = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient, client, 'client must be the same') - const reClient2 = await addSubscriptions(instance, client, subs) + const reClient2 = await prInstance.addSubscriptions(client, subs) t.assert.equal(reClient2, client, 'client must be the same') - const { resubs: subsForClient } = await subscriptionsByClient(instance, client) + const { resubs: subsForClient } = await prInstance.subscriptionsByClient(client) t.assert.deepEqual(subsForClient, subs) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('get topic list after concurrent subscriptions of a client', async (t) => { - t.plan(3) - const instance = await persistence(t) + t.plan(1) + const prInstance = await persistence(t) const client = { id: 'abcde' } const subs1 = [{ topic: 'hello1', @@ -923,33 +679,17 @@ function abstractPersistence (opts) { rap: true, nl: false }] - let calls = 2 - - await new Promise((resolve, reject) => { - async function done () { - if (!--calls) { - const { resubs } = await subscriptionsByClient(instance, client) - resubs.sort((a, b) => a.topic.localeCompare(b.topic, 'en')) - t.assert.deepEqual(resubs, [subs1[0], subs2[0]]) - await doCleanup(t, instance) - resolve() - } - } - - instance.addSubscriptions(client, subs1, err => { - t.assert.ok(!err, 'no error for hello1') - done() - }) - instance.addSubscriptions(client, subs2, err => { - t.assert.ok(!err, 'no error for hello2') - done() - }) - }) + await prInstance.addSubscriptions(client, subs1) + await prInstance.addSubscriptions(client, subs2) + const { resubs } = await prInstance.subscriptionsByClient(client) + resubs.sort((a, b) => a.topic.localeCompare(b.topic, 'en')) + t.assert.deepEqual(resubs, [subs1[0], subs2[0]]) + await doCleanup(t, prInstance) }) test('add outgoing packet and stream it', async (t) => { t.plan(2) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -966,7 +706,7 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42 } const expected = { @@ -976,21 +716,21 @@ function abstractPersistence (opts) { qos: 1, retain: false, dup: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42, messageId: undefined } - await outgoingEnqueue(instance, sub, packet) - const stream = outgoingStream(instance, client) + await prInstance.outgoingEnqueue(sub, packet) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) testPacket(t, list[0], expected) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add outgoing packet for multiple subs and stream to all', async (t) => { t.plan(4) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1016,7 +756,7 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42 } const expected = { @@ -1026,25 +766,25 @@ function abstractPersistence (opts) { qos: 1, retain: false, dup: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42, messageId: undefined } - await outgoingEnqueueCombi(instance, subs, packet) - const stream = outgoingStream(instance, client) + await prInstance.outgoingEnqueueCombi(subs, packet) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) testPacket(t, list[0], expected) - const stream2 = outgoingStream(instance, client2) + const stream2 = prInstance.outgoingStream(client2) const list2 = await getArrayFromStream(stream2) testPacket(t, list2[0], expected) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add outgoing packet as a string and pump', async (t) => { t.plan(7) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1059,7 +799,7 @@ function abstractPersistence (opts) { payload: Buffer.from('world'), qos: 1, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 10 } const packet2 = { @@ -1068,17 +808,17 @@ function abstractPersistence (opts) { payload: Buffer.from('matteo'), qos: 1, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 50 } const queue = [] - const updated1 = await enqueueAndUpdate(t, instance, client, sub, packet1, 42) - const updated2 = await enqueueAndUpdate(t, instance, client, sub, packet2, 43) - const stream = outgoingStream(instance, client) + const updated1 = await enqueueAndUpdate(t, prInstance, client, sub, packet1, 42) + const updated2 = await enqueueAndUpdate(t, prInstance, client, sub, packet2, 43) + const stream = prInstance.outgoingStream(client) async function clearQueue (data) { - const { repacket } = await outgoingUpdate(instance, client, data) + const { repacket } = await prInstance.outgoingUpdate(client, data) t.diagnostic('packet received') queue.push(repacket) } @@ -1090,12 +830,12 @@ function abstractPersistence (opts) { t.assert.equal(queue.length, 2) t.assert.deepEqual(deClassed(queue[0]), deClassed(updated1)) t.assert.deepEqual(deClassed(queue[1]), deClassed(updated2)) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add outgoing packet as a string and stream', async (t) => { t.plan(2) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1112,7 +852,7 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42 } const expected = { @@ -1122,21 +862,21 @@ function abstractPersistence (opts) { qos: 1, retain: false, dup: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42, messageId: undefined } - await outgoingEnqueueCombi(instance, [sub], packet) - const stream = outgoingStream(instance, client) + await prInstance.outgoingEnqueueCombi([sub], packet) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) testPacket(t, list[0], expected) - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add outgoing packet and stream it twice', async (t) => { t.plan(5) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', @@ -1153,7 +893,7 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42, messageId: 4242 } @@ -1164,25 +904,25 @@ function abstractPersistence (opts) { qos: 1, retain: false, dup: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42, messageId: undefined } - await outgoingEnqueueCombi(instance, [sub], packet) - const stream = outgoingStream(instance, client) + await prInstance.outgoingEnqueueCombi([sub], packet) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) testPacket(t, list[0], expected) - const stream2 = outgoingStream(instance, client) + const stream2 = prInstance.outgoingStream(client) const list2 = await getArrayFromStream(stream2) testPacket(t, list2[0], expected) t.assert.notEqual(packet, expected, 'packet must be a different object') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add outgoing packet and update messageId', async (t) => { t.plan(5) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1197,24 +937,24 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42 } - const updated = await enqueueAndUpdate(t, instance, client, sub, packet, 42) + const updated = await enqueueAndUpdate(t, prInstance, client, sub, packet, 42) updated.messageId = undefined - const stream = outgoingStream(instance, client) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) list[0].messageId = undefined t.assert.notEqual(list[0], updated, 'must not be the same object') t.assert.deepEqual(deClassed(list[0]), deClassed(updated), 'must return the packet') t.assert.equal(list.length, 1, 'must return only one packet') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add 2 outgoing packet and clear messageId', async (t) => { t.plan(10) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1229,7 +969,7 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42 } const packet2 = { @@ -1240,29 +980,29 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 43 } - const updated1 = await enqueueAndUpdate(t, instance, client, sub, packet1, 42) - const updated2 = await enqueueAndUpdate(t, instance, client, sub, packet2, 43) - const pkt = await outgoingClearMessageId(instance, client, updated1) + const updated1 = await enqueueAndUpdate(t, prInstance, client, sub, packet1, 42) + const updated2 = await enqueueAndUpdate(t, prInstance, client, sub, packet2, 43) + const pkt = await prInstance.outgoingClearMessageId(client, updated1) t.assert.deepEqual(pkt.messageId, 42, 'must have the same messageId') t.assert.deepEqual(pkt.payload.toString(), packet1.payload.toString(), 'must have original payload') t.assert.deepEqual(pkt.topic, packet1.topic, 'must have original topic') - const stream = outgoingStream(instance, client) + const stream = prInstance.outgoingStream(client) updated2.messageId = undefined const list = await getArrayFromStream(stream) list[0].messageId = undefined t.assert.notEqual(list[0], updated2, 'must not be the same object') t.assert.deepEqual(deClassed(list[0]), deClassed(updated2), 'must return the packet') t.assert.equal(list.length, 1, 'must return only one packet') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add many outgoing packets and clear messageIds', async (t) => { // t.plan() is called below after we know the high watermark - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1280,43 +1020,33 @@ function abstractPersistence (opts) { } // we just need a stream to figure out the high watermark - const stream = (outgoingStream(instance, client)) + const stream = prInstance.outgoingStream(client) const total = stream.readableHighWaterMark * 2 t.plan(total * 2) for (let i = 0; i < total; i++) { - const p = new Packet(packet, instance.broker) + const p = new Packet(packet, prInstance.broker) p.messageId = i - await outgoingEnqueue(instance, sub, p) - await outgoingUpdate(instance, client, p) + await prInstance.outgoingEnqueue(sub, p) + await prInstance.outgoingUpdate(client, p) } - let queued = 0 - for await (const p of (outgoingStream(instance, client))) { - if (p) { - queued++ - } - } - t.assert.equal(queued, total, `outgoing queue must hold ${total} items`) + const queued = await getArrayFromStream(prInstance.outgoingStream(client)) + t.assert.equal(queued.length, total, `outgoing queue must hold ${total} items`) - for await (const p of (outgoingStream(instance, client))) { - const received = await outgoingClearMessageId(instance, client, p) + for await (const p of (prInstance.outgoingStream(client))) { + const received = await prInstance.outgoingClearMessageId(client, p) t.assert.deepEqual(received, p, 'must return the packet') } - let queued2 = 0 - for await (const p of (outgoingStream(instance, client))) { - if (p) { - queued2++ - } - } - t.assert.equal(queued2, 0, 'outgoing queue is empty') - await doCleanup(t, instance) + const queued2 = await getArrayFromStream(prInstance.outgoingStream(client)) + t.assert.equal(queued2.length, 0, 'outgoing queue is empty') + await doCleanup(t, prInstance) }) test('update to publish w/ same messageId', async (t) => { t.plan(5) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1331,7 +1061,7 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42, messageId: 42 } @@ -1343,28 +1073,28 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 50, messageId: 42 } - await outgoingEnqueue(instance, sub, packet1) - await outgoingEnqueue(instance, sub, packet2) - await outgoingUpdate(instance, client, packet1) - await outgoingUpdate(instance, client, packet2) - const stream = outgoingStream(instance, client) + await prInstance.outgoingEnqueue(sub, packet1) + await prInstance.outgoingEnqueue(sub, packet2) + await prInstance.outgoingUpdate(client, packet1) + await prInstance.outgoingUpdate(client, packet2) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) t.assert.equal(list.length, 2, 'must have two items in queue') t.assert.equal(list[0].brokerCounter, packet1.brokerCounter, 'brokerCounter must match') t.assert.equal(list[0].messageId, packet1.messageId, 'messageId must match') t.assert.equal(list[1].brokerCounter, packet2.brokerCounter, 'brokerCounter must match') t.assert.equal(list[1].messageId, packet2.messageId, 'messageId must match') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('update to pubrel', async (t) => { t.plan(3) - const instance = await persistence(t) + const prInstance = await persistence(t) const sub = { clientId: 'abcde', topic: 'hello', qos: 1 } @@ -1379,14 +1109,14 @@ function abstractPersistence (opts) { dup: false, length: 14, retain: false, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, brokerCounter: 42 } - await outgoingEnqueueCombi(instance, [sub], packet) + await prInstance.outgoingEnqueueCombi([sub], packet) const updated = new Packet(packet) updated.messageId = 42 - const { reclient, repacket } = await outgoingUpdate(instance, client, updated) + const { reclient, repacket } = await prInstance.outgoingUpdate(client, updated) t.assert.equal(reclient, client, 'client matches') t.assert.equal(repacket, updated, 'packet matches') @@ -1395,16 +1125,16 @@ function abstractPersistence (opts) { messageId: updated.messageId } - await outgoingUpdate(instance, client, pubrel) - const stream = outgoingStream(instance, client) + await prInstance.outgoingUpdate(client, pubrel) + const stream = prInstance.outgoingStream(client) const list = await getArrayFromStream(stream) t.assert.deepEqual(list, [pubrel], 'must return the packet') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('add incoming packet, get it, and clear with messageId', async (t) => { t.plan(3) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abcde' } @@ -1418,8 +1148,8 @@ function abstractPersistence (opts) { retain: false, messageId: 42 } - await incomingStorePacket(instance, client, packet) - const retrieved = await incomingGetPacket(instance, client, { + await prInstance.incomingStorePacket(client, packet) + const retrieved = await prInstance.incomingGetPacket(client, { messageId: packet.messageId }) // adjusting the objects so they match @@ -1432,22 +1162,22 @@ function abstractPersistence (opts) { result.payload = Buffer.from(result.payload) t.assert.deepEqual(result, packet, 'retrieved packet must be deeply equal') t.assert.notEqual(retrieved, packet, 'retrieved packet must not be the same object') - await incomingDelPacket(instance, client, retrieved) + await prInstance.incomingDelPacket(client, retrieved) try { - await incomingGetPacket(instance, client, { + await prInstance.incomingGetPacket(client, { messageId: packet.messageId }) t.assert.ok(false, 'must error') } catch (err) { t.assert.ok(err, 'must error') - await doCleanup(t, instance) + await doCleanup(t, prInstance) } }) test('store, fetch and delete will message', async (t) => { t.plan(7) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: '12345' } @@ -1458,27 +1188,27 @@ function abstractPersistence (opts) { retain: true } - const c = await putWill(instance, client, expected) + const c = await prInstance.putWill(client, expected) t.assert.equal(c, client, 'client matches') - const { packet: p1, reClient: c1 } = await getWill(instance, client) + const { packet: p1, reClient: c1 } = await prInstance.getWill(client) t.assert.deepEqual(p1, expected, 'will matches') t.assert.equal(c1, client, 'client matches') client.brokerId = p1.brokerId - const { packet: p2, reClient: c2 } = await delWill(instance, client) + const { packet: p2, reClient: c2 } = await prInstance.delWill(client) t.assert.deepEqual(p2, expected, 'will matches') t.assert.equal(c2, client, 'client matches') - const { packet: p3, reClient: c3 } = await getWill(instance, client) + const { packet: p3, reClient: c3 } = await prInstance.getWill(client) t.assert.ok(!p3, 'no will after del') t.assert.equal(c3, client, 'client matches') - await doCleanup(t, instance) + await doCleanup(t, prInstance) }) test('stream all will messages', async (t) => { t.plan(3) - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: '12345', - brokerId: instance.broker.id + brokerId: prInstance.broker.id } const toWrite = { topic: 'hello/died', @@ -1489,34 +1219,34 @@ function abstractPersistence (opts) { const expected = { clientId: client.id, - brokerId: instance.broker.id, + brokerId: prInstance.broker.id, topic: 'hello/died', payload: Buffer.from('muahahha'), qos: 0, retain: true } - const c = await putWill(instance, client, toWrite) + const c = await prInstance.putWill(client, toWrite) t.assert.equal(c, client, 'client matches') - const stream = iterableStream(instance.streamWill()) + const stream = prInstance.streamWill() const list = await getArrayFromStream(stream) t.assert.equal(list.length, 1, 'must return only one packet') t.assert.deepEqual(list[0], expected, 'packet matches') - await delWill(instance, client) - await doCleanup(t, instance) + await prInstance.delWill(client) + await doCleanup(t, prInstance) }) test('stream all will message for unknown brokers', async (t) => { t.plan(4) - const instance = await persistence(t) - const originalId = instance.broker.id + const prInstance = await persistence(t) + const originalId = prInstance.broker.id const client = { id: '42', - brokerId: instance.broker.id + brokerId: prInstance.broker.id } const anotherClient = { id: '24', - brokerId: instance.broker.id + brokerId: prInstance.broker.id } const toWrite1 = { topic: 'hello/died42', @@ -1539,23 +1269,23 @@ function abstractPersistence (opts) { retain: true } - const c = await putWill(instance, client, toWrite1) + const c = await prInstance.putWill(client, toWrite1) t.assert.equal(c, client, 'client matches') - instance.broker.id = 'anotherBroker' - const c2 = await putWill(instance, anotherClient, toWrite2) + prInstance.broker.id = 'anotherBroker' + const c2 = await prInstance.putWill(anotherClient, toWrite2) t.assert.equal(c2, anotherClient, 'client matches') - const stream = iterableStream(instance.streamWill({ + const stream = prInstance.streamWill({ anotherBroker: Date.now() - })) + }) const list = await getArrayFromStream(stream) t.assert.equal(list.length, 1, 'must return only one packet') t.assert.deepEqual(list[0], expected, 'packet matches') - await delWill(instance, client) - await doCleanup(t, instance) + await prInstance.delWill(client) + await doCleanup(t, prInstance) }) test('delete wills from dead brokers', async (t) => { - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: '42' } @@ -1567,22 +1297,22 @@ function abstractPersistence (opts) { retain: true } - const c = await putWill(instance, client, toWrite1) + const c = await prInstance.putWill(client, toWrite1) t.assert.equal(c, client, 'client matches') - instance.broker.id = 'anotherBroker' - client.brokerId = instance.broker.id - await delWill(instance, client) - await doCleanup(t, instance) + prInstance.broker.id = 'anotherBroker' + client.brokerId = prInstance.broker.id + await prInstance.delWill(client) + await doCleanup(t, prInstance) }) test('do not error if unkown messageId in outoingClearMessageId', async (t) => { - const instance = await persistence(t) + const prInstance = await persistence(t) const client = { id: 'abc-123' } - await outgoingClearMessageId(instance, client, 42) - await doCleanup(t, instance) + await prInstance.outgoingClearMessageId(client, 42) + await doCleanup(t, prInstance) }) } diff --git a/package.json b/package.json index f50af11..ed91bd7 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "test:typescript": "tsd", "test": "npm run lint && npm run unit && tsd", "coverage": "c8 --reporter=lcov node --test test.js", + "coverage:report": "c8 report", "test:ci": "npm run lint && npm run coverage && npm run test:typescript", "license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause'", "release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics" diff --git a/promisified.js b/promisified.js new file mode 100644 index 0000000..98d08b1 --- /dev/null +++ b/promisified.js @@ -0,0 +1,299 @@ +// promisified versions of the persistence interface +// to avoid deep callbacks while testing + +class PromisifiedPersistence { + constructor (instance) { + this.instance = instance + } + + get broker () { + return this.instance.broker + } + + /* c8 ignore next 3 */ + set broker (newValue) { + this.instance.broker = newValue + } + + storeRetained (packet) { + return new Promise((resolve, reject) => { + this.instance.storeRetained(packet, err => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } + + createRetainedStreamCombi (patterns) { + return this.instance.createRetainedStreamCombi(patterns) + } + + createRetainedStream (pattern) { + return this.instance.createRetainedStream(pattern) + } + + async addSubscriptions (client, subs) { + return new Promise((resolve, reject) => { + this.instance.addSubscriptions(client, subs, (err, reClient) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve(reClient) + } + }) + }) + } + + async removeSubscriptions (client, subs) { + return new Promise((resolve, reject) => { + this.instance.removeSubscriptions(client, subs, (err, reClient) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve(reClient) + } + }) + }) + } + + async subscriptionsByClient (client) { + return new Promise((resolve, reject) => { + this.instance.subscriptionsByClient(client, (err, resubs, reClient) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve({ resubs, reClient }) + } + }) + }) + } + + async subscriptionsByTopic (topic) { + return new Promise((resolve, reject) => { + this.instance.subscriptionsByTopic(topic, (err, resubs) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve(resubs) + } + }) + }) + } + + async cleanSubscriptions (client) { + return new Promise((resolve, reject) => { + this.instance.cleanSubscriptions(client, (err) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } + + async countOffline () { + return new Promise((resolve, reject) => { + this.instance.countOffline((err, subsCount, clientsCount) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve({ subsCount, clientsCount }) + } + }) + }) + } + + async outgoingEnqueue (sub, packet) { + return new Promise((resolve, reject) => { + this.instance.outgoingEnqueue(sub, packet, err => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } + + async outgoingEnqueueCombi (subs, packet) { + return new Promise((resolve, reject) => { + this.instance.outgoingEnqueueCombi(subs, packet, err => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } + + async outgoingClearMessageId (client, packet) { + return new Promise((resolve, reject) => { + this.instance.outgoingClearMessageId(client, packet, (err, repacket) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve(repacket) + } + }) + }) + } + + async outgoingUpdate (client, packet) { + return new Promise((resolve, reject) => { + this.instance.outgoingUpdate(client, packet, (err, reclient, repacket) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve({ reclient, repacket }) + } + }) + }) + } + + outgoingStream (client) { + return this.instance.outgoingStream(client) + } + + async incomingStorePacket (client, packet) { + return new Promise((resolve, reject) => { + this.instance.incomingStorePacket(client, packet, err => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } + + async incomingGetPacket (client, packet) { + return new Promise((resolve, reject) => { + this.instance.incomingGetPacket(client, packet, (err, retrieved) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve(retrieved) + } + }) + }) + } + + async incomingDelPacket (client, packet) { + return new Promise((resolve, reject) => { + this.instance.incomingDelPacket(client, packet, err => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } + + async putWill (client, packet) { + return new Promise((resolve, reject) => { + this.instance.putWill(client, packet, (err, reClient) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve(reClient) + } + }) + }) + } + + async getWill (client) { + return new Promise((resolve, reject) => { + this.instance.getWill(client, (err, packet, reClient) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve({ packet, reClient }) + } + }) + }) + } + + async delWill (client) { + return new Promise((resolve, reject) => { + this.instance.delWill(client, (err, packet, reClient) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve({ packet, reClient }) + } + }) + }) + } + + streamWill (brokers) { + return this.instance.streamWill(brokers) + } + + getClientList (topic) { + return this.instance.getClientList(topic) + } + + async destroy () { + return new Promise((resolve, reject) => { + this.instance.destroy((err) => { + /* c8 ignore next 2 */ + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + } +} +// end of promisified versions ofthis.instance methods + +// helper functions +function waitForEvent (obj, resolveEvt) { + return new Promise((resolve, reject) => { + obj.once(resolveEvt, () => { + resolve() + }) + obj.once('error', reject) + }) +} + +// stream.toArray() sometimes returns undefined or [undefined] instead of [] +async function getArrayFromStream (stream) { + const list = [] + for await (const item of stream) { + if (item !== undefined && item !== null) { + list.push(item) + } + } + return list +} + +module.exports = { + PromisifiedPersistence, + waitForEvent, + getArrayFromStream +}