Skip to content

Commit 3674538

Browse files
committed
qos1.js
1 parent 7a8804b commit 3674538

File tree

2 files changed

+733
-797
lines changed

2 files changed

+733
-797
lines changed

test/helperAsync.js

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import { Aedes } from '../aedes.js'
55

66
let clients = 0
77

8-
export function setup (broker) {
8+
export function setup(broker) {
99
const [client, server] = duplexPair()
1010
const inStream = new Transform(
1111
{
1212
objectMode: true,
13-
transform (chunk, enc, callback) {
13+
transform(chunk, enc, callback) {
1414
this.push(mqtt.generate(chunk))
1515
callback()
1616
}
@@ -47,7 +47,7 @@ export function setup (broker) {
4747
* @returns {Object} the connack packet
4848
*/
4949

50-
export async function connect (s, opts = {}) {
50+
export async function connect(s, opts = {}) {
5151
const connect = opts.connect || {}
5252
connect.cmd = 'connect'
5353
connect.protocolId = connect.protocolId || 'MQTT'
@@ -60,6 +60,7 @@ export async function connect (s, opts = {}) {
6060
const expectedReturnCode = opts.expectedReturnCode || 0
6161
const verifyIsConnack = opts.verifyIsConnack !== false
6262
const verifyReturnedOk = verifyIsConnack ? opts.verifyReturnedOk !== false : false
63+
const noWait = opts.noWait
6364

6465
if (opts.autoClientId) {
6566
connect.clientId = 'my-client-' + clients++
@@ -74,6 +75,9 @@ export async function connect (s, opts = {}) {
7475
}
7576

7677
s.inStream.write(connect)
78+
if (noWait) {
79+
return
80+
}
7781
const { value: connack } = await s.outStream.next()
7882
if (verifyIsConnack && connack?.cmd !== 'connack') {
7983
throw new Error('Expected connack')
@@ -89,21 +93,40 @@ export async function connect (s, opts = {}) {
8993
* @param {Object} t - Test assertion object
9094
* @returns {Object} Connection state
9195
*/
92-
export async function createAndConnect (t, opts = {}) {
96+
export async function createAndConnect(t, opts = {}) {
9397
const broker = await Aedes.createBroker(opts.broker)
9498
t.after(() => broker.close())
9599
const s = setup(broker)
96100
await connect(s, opts)
97101
return s
98102
}
99103

104+
/**
105+
* Creates a new MQTT connection and establishes it for publisher and subscriber
106+
* @param {Object} t - Test assertion object
107+
* @returns {Object} Connection state { broker, publisher, subscriber }
108+
*/
109+
export async function createPubSub(t, opts = {}) {
110+
const publisherOpts = opts.publisher || { clientId: 'publisher' }
111+
const subscriberOpts = opts.subscriber || { clientId: 'subscriber' }
112+
113+
const broker = await Aedes.createBroker()
114+
t.after(() => broker.close())
115+
116+
const publisher = setup(broker)
117+
const subscriber = setup(broker)
118+
await connect(publisher, { connect: publisherOpts })
119+
await connect(subscriber, { connect: subscriberOpts })
120+
return { broker, publisher, subscriber }
121+
}
122+
100123
/**
101124
* Sets up error handling for the broker connection
102125
* @param {Object} s - The connection state object
103126
* @param {Object} t - Test assertion object
104127
* @returns {Object} Connection state with error handling
105128
*/
106-
export function noError (s, t) {
129+
export function noError(s, t) {
107130
s.broker.on('clientError', (client, err) => {
108131
if (err) throw err
109132
t.assert.equal(err, undefined, 'must not error')
@@ -116,25 +139,54 @@ export function noError (s, t) {
116139
* @param {Object} packet - The packet to publish
117140
* @returns {Promise} - Promise that resolves when the packet is published
118141
*/
119-
export async function brokerPublish (s, packet) {
142+
export async function brokerPublish(s, packet) {
120143
return new Promise((resolve) => {
121144
s.broker.publish(packet, () => {
122145
setImmediate(resolve)
123146
})
124147
})
125148
}
126149

150+
/**
151+
* publish a packet to the broker
152+
* @param {Object} t - Test assertion object
153+
* @param {Object} s - The connection state object
154+
* @param {Object} packet - The packet to publish
155+
* @returns {Promise} - Promise that resolves when the packet is published
156+
*/
157+
export async function publish(t, s, packet) {
158+
s.inStream.write(packet)
159+
if (packet.qos === 1) {
160+
const { value: puback } = await s.outStream.next()
161+
t.assert.equal(puback.cmd, 'puback')
162+
return puback
163+
}
164+
if (packet.qos === 2) {
165+
const { value: pubrec } = await s.outStream.next()
166+
t.assert.equal(pubrec.cmd, 'pubrec')
167+
s.inStream.write({
168+
cmd: 'pubrel',
169+
messageId: pubrec.messageId
170+
})
171+
const { value: pubcomp } = await s.outStream.next()
172+
t.assert.equal(pubcomp.cmd, 'pubcomp')
173+
return pubcomp
174+
}
175+
return null
176+
}
127177
/**
128178
* Subscribes to a single MQTT topic
129179
* @param {Object} t - Test assertion object
130180
* @param {Object} subscriber - The subscriber client
131181
* @param {string} topic - Topic to subscribe to
132182
* @param {number} qos - Quality of Service level
183+
* @param {number} messageId - Message ID for the subscription
184+
* @returns {Object} The subscription packet
133185
*/
134-
export async function subscribe (t, subscriber, topic, qos) {
186+
export async function subscribe(t, subscriber, topic, qos, messageId = 24) {
135187
subscriber.inStream.write({
136188
cmd: 'subscribe',
137-
messageId: 24,
189+
messageId,
138190
subscriptions: [{
139191
topic,
140192
qos
@@ -144,7 +196,7 @@ export async function subscribe (t, subscriber, topic, qos) {
144196
const { value: packet } = await subscriber.outStream.next()
145197
t.assert.equal(packet.cmd, 'suback')
146198
t.assert.equal(packet.granted[0], qos)
147-
t.assert.equal(packet.messageId, 24)
199+
t.assert.equal(packet.messageId, messageId)
148200
return packet
149201
}
150202

@@ -155,7 +207,7 @@ export async function subscribe (t, subscriber, topic, qos) {
155207
* @param {Array<Object>} subs - Array of subscription objects with topic and qos
156208
* @param {Array<number>} expectedGranted - Expected QoS levels granted
157209
*/
158-
export async function subscribeMultiple (t, subscriber, subs, expectedGranted) {
210+
export async function subscribeMultiple(t, subscriber, subs, expectedGranted) {
159211
subscriber.inStream.write({
160212
cmd: 'subscribe',
161213
messageId: 24,
@@ -176,7 +228,7 @@ export async function subscribeMultiple (t, subscriber, subs, expectedGranted) {
176228
* @param {Error} err - Error to throw on timeout
177229
* @returns {Promise} Promise that rejects if timeout occurs
178230
*/
179-
export async function withTimeout (promise, timeoutMs, timeoutResult) {
231+
export async function withTimeout(promise, timeoutMs, timeoutResult) {
180232
const timeoutPromise = delay(timeoutMs, timeoutResult)
181233
return Promise.race([promise, timeoutPromise])
182234
}
@@ -187,7 +239,7 @@ export async function withTimeout (promise, timeoutMs, timeoutResult) {
187239
* @param {Object} opts - low and high water mark
188240
* @returns {AsyncGenerator} An async generator that yields MQTT packets
189241
*/
190-
async function * packetGenerator (parser, sourceStream, opts = {
242+
async function* packetGenerator(parser, sourceStream, opts = {
191243
highWaterMark: 2,
192244
lowWaterMark: 0
193245
}) {
@@ -264,7 +316,7 @@ async function * packetGenerator (parser, sourceStream, opts = {
264316
* @param {Object} s - The connection state object
265317
* @returns
266318
*/
267-
export async function nextPacket (s) {
319+
export async function nextPacket(s) {
268320
const { value: packet } = await s.outStream.next()
269321
return packet
270322
}
@@ -275,7 +327,7 @@ export async function nextPacket (s) {
275327
* @param {number} timeoutMs - Timeout in milliseconds
276328
* @returns
277329
*/
278-
export async function nextPacketWithTimeOut (s, timeoutMs) {
330+
export async function nextPacketWithTimeOut(s, timeoutMs) {
279331
return withTimeout(nextPacket(s), timeoutMs, null)
280332
}
281333

@@ -286,7 +338,7 @@ export async function nextPacketWithTimeOut (s, timeoutMs) {
286338
* @param {number} timeoutMs - Timeout in milliseconds
287339
* @returns
288340
*/
289-
export async function checkNoPacket (t, s, timeoutMs = 10) {
341+
export async function checkNoPacket(t, s, timeoutMs = 10) {
290342
const result = await nextPacketWithTimeOut(s, timeoutMs)
291343
t.assert.equal(result, null, 'no packet received')
292344
}
@@ -297,7 +349,7 @@ export async function checkNoPacket (t, s, timeoutMs = 10) {
297349
* @param {string} rawPacket - space separated string of hex values
298350
* @example rawWrite(s, "10 0C 00 04 4D 51 54 54 04 00 00 00 00 00")
299351
*/
300-
export function rawWrite (s, rawPacket) {
352+
export function rawWrite(s, rawPacket) {
301353
s.conn.write(Buffer.from(rawPacket.replace(/ /g, ''), 'hex'))
302354
}
303355
/**

0 commit comments

Comments
 (0)