@@ -3,11 +3,11 @@ import { once } from 'node:events'
3
3
import { Duplex } from 'node:stream'
4
4
import {
5
5
brokerPublish ,
6
+ checkNoPacket ,
6
7
connect ,
7
8
createAndConnect ,
8
9
delay ,
9
10
nextPacket ,
10
- nextPacketWithTimeOut ,
11
11
setup ,
12
12
subscribe ,
13
13
subscribeMultiple ,
@@ -373,8 +373,7 @@ test('unsubscribe', async (t) => {
373
373
resolve ( )
374
374
} )
375
375
} )
376
- const noPacket = await nextPacketWithTimeOut ( s , 10 )
377
- t . assert . ok ( ! noPacket , 'expected no packet' )
376
+ await checkNoPacket ( t , s )
378
377
} )
379
378
380
379
test ( 'unsubscribe without subscribe' , async ( t ) => {
@@ -411,11 +410,6 @@ test('unsubscribe on disconnect for a clean=true client', async (t) => {
411
410
s . conn . destroy ( null )
412
411
t . assert . equal ( s . conn . destroyed , true , 'closed streams' )
413
412
414
- const noPacket = async ( ) => {
415
- const packet = await nextPacketWithTimeOut ( s , 10 )
416
- t . assert . ok ( ! packet , 'should not receive any more messages' )
417
- }
418
-
419
413
const emittedUnsubscribe = async ( ) => {
420
414
await once ( s . broker , 'unsubscribe' )
421
415
t . assert . ok ( true , 'should emit unsubscribe' )
@@ -430,7 +424,7 @@ test('unsubscribe on disconnect for a clean=true client', async (t) => {
430
424
t . assert . ok ( true , 'calls the callback' )
431
425
}
432
426
// run parallel
433
- await Promise . all ( [ noPacket ( ) , emittedUnsubscribe ( ) , publishPacket ( ) ] )
427
+ await Promise . all ( [ checkNoPacket ( t , s ) , emittedUnsubscribe ( ) , publishPacket ( ) ] )
434
428
} )
435
429
436
430
test ( 'unsubscribe on disconnect for a clean=false client' , async ( t ) => {
@@ -443,14 +437,10 @@ test('unsubscribe on disconnect for a clean=false client', async (t) => {
443
437
s . conn . destroy ( null , ( ) => {
444
438
t . assert . ok ( true , 'closed streams' )
445
439
} )
446
- const noPacket = async ( ) => {
447
- const packet = await nextPacketWithTimeOut ( s , 10 )
448
- t . assert . ok ( ! packet , 'should not receive any more messages' )
449
- }
450
440
451
- const emittedUnsubscribe = async ( ) => {
452
- await once ( s . broker , 'unsubscribe' )
453
- t . assert . ok ( true , 'should emit unsubscribe' )
441
+ const emittedNoUnsubscribe = async ( ) => {
442
+ const result = await withTimeout ( once ( s . broker , 'unsubscribe' ) , 10 , null )
443
+ t . assert . equal ( result , null , 'should not emit unsubscribe' )
454
444
}
455
445
const publishPacket = async ( ) => {
456
446
await brokerPublish ( s , {
@@ -461,7 +451,7 @@ test('unsubscribe on disconnect for a clean=false client', async (t) => {
461
451
t . assert . ok ( true , 'calls the callback' )
462
452
}
463
453
// run parallel
464
- await Promise . all ( [ noPacket ( ) , emittedUnsubscribe ( ) , publishPacket ( ) ] )
454
+ await Promise . all ( [ checkNoPacket ( t , s ) , emittedNoUnsubscribe ( ) , publishPacket ( ) ] )
465
455
} )
466
456
467
457
test ( 'disconnect' , async ( t ) => {
@@ -770,8 +760,7 @@ test('do not restore QoS 0 subscriptions when clean', async (t) => {
770
760
payload : 'world' ,
771
761
qos : 0
772
762
} )
773
- const packet = await nextPacketWithTimeOut ( subscriber2 , 10 )
774
- t . assert . ok ( ! packet , 'no packet received' )
763
+ await checkNoPacket ( t , subscriber2 )
775
764
} )
776
765
777
766
test ( 'double sub does not double deliver' , async ( t ) => {
@@ -799,8 +788,7 @@ test('double sub does not double deliver', async (t) => {
799
788
800
789
const packet = await nextPacket ( s )
801
790
t . assert . deepEqual ( structuredClone ( packet ) , expected , 'packet matches' )
802
- const noPacket = await nextPacketWithTimeOut ( s , 10 )
803
- t . assert . ok ( ! noPacket , 'no packet received' )
791
+ await checkNoPacket ( t , s )
804
792
} )
805
793
806
794
test ( 'overlapping sub does not double deliver' , async ( t ) => {
@@ -827,8 +815,7 @@ test('overlapping sub does not double deliver', async (t) => {
827
815
828
816
const packet = await nextPacket ( s )
829
817
t . assert . deepEqual ( structuredClone ( packet ) , expected , 'packet matches' )
830
- const noPacket = await nextPacketWithTimeOut ( s , 10 )
831
- t . assert . ok ( ! noPacket , 'no packet received' )
818
+ await checkNoPacket ( t , s )
832
819
} )
833
820
834
821
test ( 'clear drain' , async ( t ) => {
0 commit comments