1
1
#! /usr/bin/env node
2
-
2
+ const { parseArgs } = require ( 'node:util' )
3
+ const { hrtime } = require ( 'node:process' )
3
4
const mqtt = require ( 'mqtt' )
4
- const convertHrtime = require ( 'convert-hrtime' )
5
- const mode = require ( 'compute-mode' )
6
- const client = mqtt . connect ( { port : 1883 , host : 'localhost' , clean : true , keepalive : 0 } )
5
+
7
6
const interval = 5000
8
7
9
- let sent = 0
10
- const latencies = [ ]
8
+ function processsLatencies ( latencies , counter ) {
9
+ let total = 0
10
+ let count = 0
11
+ let median
12
+ let perc95
13
+ let perc99
14
+ const posMedian = Math . floor ( counter / 2 )
15
+ const pos95 = Math . floor ( counter * 0.95 )
16
+ const pos99 = Math . floor ( counter * 0.99 )
17
+ // sort keys from smallest to largest
18
+ const keys = Object . keys ( latencies ) . sort ( ( a , b ) => a - b )
19
+ for ( const key of keys ) {
20
+ const value = latencies [ key ]
21
+ total += value * key
22
+ count += value
23
+
24
+ if ( count >= posMedian && median === undefined ) {
25
+ median = key
26
+ }
27
+ if ( count >= pos95 && perc95 === undefined ) {
28
+ perc95 = key
29
+ }
30
+ if ( count >= pos99 && perc99 === undefined ) {
31
+ perc99 = key
32
+ }
33
+ }
34
+ return {
35
+ buckets : keys . length ,
36
+ mean : Math . floor ( total / counter ) ,
37
+ minimum : keys [ 0 ] ,
38
+ maximum : keys . pop ( ) ,
39
+ median,
40
+ perc95,
41
+ perc99,
42
+ }
43
+ }
44
+
45
+ let counter = 0
46
+ let latencies = { }
47
+
48
+ const { values } = parseArgs ( {
49
+ options : {
50
+ qos : {
51
+ type : 'string' ,
52
+ default : '0' ,
53
+ choices : [ '0' , '1' ] ,
54
+ description : 'QoS level to use for publishing messages' ,
55
+ short : 'q'
56
+ } ,
57
+ help : {
58
+ type : 'boolean' ,
59
+ default : false ,
60
+ description : 'Show this help message' ,
61
+ short : 'h'
62
+ }
63
+ }
64
+ } )
65
+
66
+ if ( values . help ) {
67
+ console . log ( 'Usage: node pingpong.js [options]' )
68
+ console . log ( 'Options:' )
69
+ console . log ( ' -q, --qos <0|1> QoS level to use for publishing messages (default: 0)' )
70
+ console . log ( ' -h, --help Show this help message' )
71
+ process . exit ( 0 )
72
+ }
73
+
74
+ if ( ! process . send ) {
75
+ console . error ( `Starting pingpong with options: qos=${ values . qos } ` )
76
+ }
77
+ const client = mqtt . connect ( { port : 1883 , host : 'localhost' , clean : true , encoding : 'binary' , keepalive : 0 } )
78
+
79
+ const qosOpts = {
80
+ qos : parseInt ( values . qos , 10 ) ,
81
+ }
11
82
12
83
function count ( ) {
13
- console . log ( 'sent/s' , sent / interval * 1000 )
14
- sent = 0
84
+ // reset latencies while keeping the counts
85
+
86
+ const latencyResult = processsLatencies ( latencies , counter )
87
+ latencies = { }
88
+ if ( process . send ) {
89
+ process . send ( { type : 'latency' , data : latencyResult } )
90
+ } else {
91
+ console . log ( 'latencies' , latencyResult )
92
+ }
93
+ counter = 0
15
94
}
16
95
17
96
setInterval ( count , interval )
18
97
19
98
function publish ( ) {
20
- sent ++
21
- client . publish ( 'test' , JSON . stringify ( process . hrtime ( ) ) , { qos : 1 } )
99
+ counter ++
100
+ client . publish ( 'test' , process . hrtime . bigint ( ) . toString ( ) , qosOpts )
22
101
}
23
102
24
103
function subscribe ( ) {
25
- client . subscribe ( 'test' , { qos : 1 } , publish )
104
+ client . subscribe ( 'test' , qosOpts , publish )
26
105
}
27
106
28
107
client . on ( 'connect' , subscribe )
29
108
client . on ( 'message' , publish )
30
109
client . on ( 'message' , function ( topic , payload ) {
31
- const sentAt = JSON . parse ( payload )
32
- const diff = process . hrtime ( sentAt )
33
- latencies . push ( convertHrtime ( diff ) . ms )
110
+ const receivedAt = hrtime . bigint ( )
111
+ const sentAt = BigInt ( payload . toString ( ) )
112
+ const msDiff = Math . floor ( Number ( receivedAt - sentAt ) / 1e6 ) // Convert from nanoseconds to milliseconds
113
+ latencies [ msDiff ] = ( latencies [ msDiff ] || 0 ) + 1
34
114
} )
35
115
36
116
client . on ( 'offline' , function ( ) {
@@ -41,13 +121,3 @@ client.on('error', function () {
41
121
console . log ( 'reconnect!' )
42
122
client . stream . end ( )
43
123
} )
44
-
45
- process . on ( 'SIGINT' , function ( ) {
46
- const total = latencies . reduce ( function ( acc , num ) {
47
- return acc + num
48
- } )
49
- console . log ( 'total' , total )
50
- console . log ( 'average' , total / latencies . length )
51
- console . log ( 'mode' , mode ( latencies ) )
52
- process . exit ( 0 )
53
- } )
0 commit comments