Skip to content

Commit 297d227

Browse files
committed
applied copilot suggestions
1 parent c5dc4d1 commit 297d227

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

test/helper.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,18 @@ export async function withTimeout (promise, timeoutMs, timeoutResult) {
253253
}
254254

255255
/**
256-
* @param {Object} parser - The emitter object
257-
* @param {Readable} sourceStream - The source stream to read MQTT packets from
258-
* @param {Object} opts - low and high water mark
259-
* @returns {AsyncGenerator} An async generator that yields MQTT packets
256+
* Asynchronously yields MQTT packets parsed from a source stream, with backpressure management.
257+
*
258+
* Backpressure is controlled using a queue and the `highWaterMark`/`lowWaterMark` parameters:
259+
* - When the number of buffered packets in the queue reaches `highWaterMark`, the source stream is paused to prevent overload.
260+
* - When the queue size drops to or below `lowWaterMark`, the source stream is resumed to allow more data to flow.
261+
*
262+
* @param {Object} parser - The emitter object that parses MQTT packets and emits 'packet' events.
263+
* @param {Readable} sourceStream - The source stream to read MQTT packets from.
264+
* @param {Object} [opts] - Options for backpressure control.
265+
* @param {number} [opts.highWaterMark=2] - Maximum number of packets to buffer before pausing the source stream.
266+
* @param {number} [opts.lowWaterMark=0] - Minimum number of packets in the buffer to resume the source stream.
267+
* @returns {AsyncGenerator<Object|null, void, unknown>} An async generator that yields MQTT packets, or null when done.
260268
*/
261269
async function * packetGenerator (parser, sourceStream, opts = {
262270
highWaterMark: 2,

test/will.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ test('delivers a will', async (t) => {
5555

5656
const { packet } = await oneWillFromBroker(s.broker)
5757
t.assert.equal(packet.topic, opts.will.topic, 'topic matches')
58-
t.assert.deepEqual(structuredClone(packet).payload, opts.will.payload, 'payload matches')
58+
t.assert.deepEqual(packet.payload, opts.will.payload, 'payload matches')
5959
t.assert.equal(packet.qos, opts.will.qos, 'qos matches')
6060
t.assert.equal(packet.retain, opts.will.retain, 'retain matches')
6161
})
@@ -109,7 +109,7 @@ test('delivers old will in case of a crash', async (t) => {
109109
const { packet, received } = await oneWillFromBroker(broker)
110110
t.assert.ok(Date.now() - start >= 3 * interval, 'the will needs to be emitted after 3 heartbeats')
111111
t.assert.equal(packet.topic, will.topic, 'topic matches')
112-
t.assert.deepEqual(structuredClone(packet).payload, will.payload, 'payload matches')
112+
t.assert.deepEqual(packet.payload, will.payload, 'payload matches')
113113
t.assert.equal(packet.qos, will.qos, 'qos matches')
114114
t.assert.equal(packet.retain, will.retain, 'retain matches')
115115
t.assert.equal(authorized, true, 'authorization called')

0 commit comments

Comments
 (0)