Skip to content

Commit 286ef86

Browse files
committed
close_socket_by_other_party.js
1 parent fb85652 commit 286ef86

File tree

2 files changed

+147
-110
lines changed

2 files changed

+147
-110
lines changed

test/close_socket_by_other_party.js

Lines changed: 143 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,93 +1,133 @@
1-
import { test } from 'tap'
2-
import EventEmitter from 'node:events'
3-
import { setup, connect, subscribe } from './helper.js'
1+
import { test } from 'node:test'
2+
import EventEmitter, { once } from 'node:events'
3+
import { createServer } from 'node:net'
4+
import {
5+
connect,
6+
createAndConnect,
7+
delay,
8+
setup,
9+
subscribe,
10+
withTimeout
11+
} from './helperAsync.js'
412
import { Aedes } from '../aedes.js'
513
import mqtt from 'mqtt'
6-
import { createServer } from 'node:net'
714

8-
test('aedes is closed before client authenticate returns', function (t) {
15+
test('aedes is closed before client authenticate returns', async (t) => {
916
t.plan(1)
1017

1118
const evt = new EventEmitter()
12-
Aedes.createBroker({
19+
const broker = await Aedes.createBroker({
1320
authenticate: (client, username, password, done) => {
1421
evt.emit('AuthenticateBegin', client)
15-
setTimeout(function () {
22+
setTimeout(() => {
1623
done(null, true)
17-
}, 2000)
24+
}, 20)
1825
}
19-
}).then((broker) => {
20-
broker.on('client', function (client) {
21-
t.fail('should no client registration')
22-
})
23-
broker.on('connackSent', function () {
24-
t.fail('should no connack be sent')
25-
})
26-
broker.on('clientError', function (client, err) {
27-
t.error(err)
28-
})
26+
})
27+
broker.on('client', () => {
28+
t.assert.fail('should no client registration')
29+
})
30+
broker.on('connackSent', () => {
31+
t.assert.fail('should no connack be sent')
32+
})
33+
broker.on('clientError', () => {
34+
t.assert.fail('should not error')
35+
})
2936

30-
connect(setup(broker))
37+
const s = setup(broker)
3138

32-
evt.on('AuthenticateBegin', function (client) {
33-
t.equal(broker.connectedClients, 0)
34-
broker.close()
35-
})
36-
})
39+
const waitForAuthEvent = async () => {
40+
await once(evt, 'AuthenticateBegin')
41+
t.assert.equal(broker.connectedClients, 0)
42+
broker.close()
43+
}
44+
45+
// run parallel
46+
await Promise.all([
47+
waitForAuthEvent(),
48+
withTimeout(connect(s), 0, 'connect timed out'), // connect will never finish
49+
])
3750
})
3851

39-
test('client is closed before authenticate returns', function (t) {
52+
test('client is closed before authenticate returns', async (t) => {
4053
t.plan(1)
4154

4255
const evt = new EventEmitter()
43-
Aedes.createBroker({
44-
authenticate: async (client, username, password, done) => {
56+
const broker = await Aedes.createBroker({
57+
authenticate: (client, username, password, done) => {
4558
evt.emit('AuthenticateBegin', client)
46-
setTimeout(function () {
59+
setTimeout(() => {
4760
done(null, true)
48-
}, 2000)
61+
}, 20)
4962
}
50-
}).then((broker) => {
51-
t.teardown(broker.close.bind(broker))
52-
53-
broker.on('client', function (client) {
54-
t.fail('should no client registration')
55-
})
56-
broker.on('connackSent', function () {
57-
t.fail('should no connack be sent')
58-
})
59-
broker.on('clientError', function (client, err) {
60-
t.error(err)
61-
})
63+
})
6264

63-
connect(setup(broker))
65+
t.after(() => broker.close())
6466

65-
evt.on('AuthenticateBegin', function (client) {
66-
t.equal(broker.connectedClients, 0)
67-
client.close()
68-
})
67+
broker.on('client', () => {
68+
t.assert.fail('should no client registration')
69+
})
70+
broker.on('connackSent', () => {
71+
t.assert.fail('should no connack be sent')
6972
})
73+
broker.on('clientError', () => {
74+
t.assert.fail('should not error')
75+
})
76+
77+
const s = setup(broker)
78+
79+
const waitForAuthEvent = async () => {
80+
const [client] = await once(evt, 'AuthenticateBegin')
81+
t.assert.equal(broker.connectedClients, 0)
82+
client.close()
83+
}
84+
85+
// run parallel
86+
await Promise.all([
87+
waitForAuthEvent(),
88+
// connect will never finish, but the client close will produce a null in the generator
89+
withTimeout(connect(s, { verifyIsConnack: false }), 10, 'connect timed out'),
90+
])
7091
})
7192

72-
test('client is closed before authorizePublish returns', function (t) {
73-
t.plan(3)
93+
test('client is closed before authorizePublish returns', async (t) => {
94+
t.plan(4)
7495

7596
const evt = new EventEmitter()
76-
Aedes.createBroker({
97+
const broker = await Aedes.createBroker({
7798
authorizePublish: (client, packet, done) => {
7899
evt.emit('AuthorizePublishBegin', client)
79100
// simulate latency writing to persistent store.
80-
setTimeout(function () {
101+
setTimeout(() => {
81102
done()
82103
evt.emit('AuthorizePublishEnd', client)
83-
}, 2000)
104+
}, 50)
84105
}
85-
}).then((broker) => {
86-
broker.on('clientError', function (client, err) {
87-
t.equal(err.message, 'connection closed')
88-
})
89-
90-
const s = connect(setup(broker))
106+
})
107+
t.after(() => broker.close())
108+
109+
const s = setup(broker)
110+
await connect(s)
111+
112+
const connectionClosed = async () => {
113+
const [client, err] = await once(broker, 'clientError')
114+
t.assert.ok(client, 'client exists')
115+
t.assert.equal(err.message, 'connection closed', 'connection is closed')
116+
}
117+
118+
const publishBegin = async () => {
119+
const [client] = await once(evt, 'AuthorizePublishBegin')
120+
t.assert.equal(broker.connectedClients, 1, '1 client connected')
121+
await delay(0) // give the eventloop some time
122+
client.close()
123+
}
124+
125+
const publishEnd = async () => {
126+
await once(evt, 'AuthorizePublishEnd')
127+
t.assert.equal(broker.connectedClients, 0, 'no client connected')
128+
}
129+
130+
const publish = () => {
91131
s.inStream.write({
92132
cmd: 'publish',
93133
topic: 'hello',
@@ -96,79 +136,74 @@ test('client is closed before authorizePublish returns', function (t) {
96136
messageId: 10,
97137
retain: false
98138
})
99-
100-
evt.on('AuthorizePublishBegin', function (client) {
101-
t.equal(broker.connectedClients, 1)
102-
client.close()
103-
})
104-
evt.on('AuthorizePublishEnd', function (client) {
105-
t.equal(broker.connectedClients, 0)
106-
broker.close()
107-
})
108-
})
139+
}
140+
// run parallel
141+
await Promise.all([
142+
connectionClosed(),
143+
publishBegin(),
144+
publishEnd(),
145+
publish()
146+
])
109147
})
110148

111-
test('close client when its socket is closed', function (t) {
149+
test('close client when its socket is closed', async (t) => {
112150
t.plan(4)
113151

114-
Aedes.createBroker().then((broker) => {
115-
t.teardown(broker.close.bind(broker))
116-
117-
const subscriber = connect(setup(broker))
118-
119-
subscribe(t, subscriber, 'hello', 1, function () {
120-
subscriber.inStream.end()
121-
subscriber.conn.on('close', function () {
122-
t.equal(broker.connectedClients, 0, 'no connected client')
123-
})
124-
})
125-
})
152+
const s = await createAndConnect(t)
153+
await subscribe(t, s, 'hello', 1)
154+
s.inStream.end()
155+
await once(s.client.conn, 'close')
156+
await delay(10)
157+
t.assert.equal(s.broker.connectedClients, 0, 'no connected client')
126158
})
127159

128-
test('multiple clients subscribe same topic, and all clients still receive message except the closed one', function (t) {
160+
test('multiple clients subscribe same topic, and all clients still receive message except the closed one', async (t) => {
129161
t.plan(5)
130-
Aedes.createBroker().then((broker) => {
131-
let client2
132162

133-
t.teardown(() => {
134-
client2.end()
135-
broker.close()
136-
server.close()
137-
})
163+
const broker = await Aedes.createBroker()
164+
let client2
138165

139-
const server = createServer(broker.handle)
140-
const port = 1883
141-
server.listen(port)
142-
broker.on('clientError', function (client, err) {
143-
t.error(err)
144-
})
166+
t.after(() => {
167+
client2.end()
168+
broker.close()
169+
server.close()
170+
})
145171

146-
const _sameTopic = 'hello'
172+
const server = createServer(broker.handle)
173+
const port = 1883
174+
server.listen(port)
175+
broker.on('clientError', () => {
176+
t.assert.fail('should not get clientError event')
177+
})
147178

148-
// client 1
149-
const client1 = mqtt.connect('mqtt://localhost', { clientId: 'client1', resubscribe: false, reconnectPeriod: -1 })
150-
client1.on('message', () => {
151-
t.fail('client1 receives message')
152-
})
179+
const sameTopic = 'hello'
180+
181+
// client 1
182+
const client1 = mqtt.connect('mqtt://localhost', { clientId: 'client1', resubscribe: false, reconnectPeriod: -1 })
183+
client1.on('message', () => {
184+
t.assert.fail('client1 receives message')
185+
})
153186

154-
client1.subscribe(_sameTopic, { qos: 0, retain: false }, () => {
155-
t.pass('client1 sub callback')
187+
await new Promise(resolve => {
188+
client1.subscribe(sameTopic, { qos: 0, retain: false }, () => {
189+
t.assert.ok(true, 'client1 sub callback')
156190
// stimulate closed socket by users
157191
client1.stream.destroy()
158192

159193
// client 2
160194
client2 = mqtt.connect('mqtt://localhost', { clientId: 'client2', resubscribe: false })
161195
client2.on('message', () => {
162-
t.pass('client2 receives message')
163-
t.equal(broker.connectedClients, 1)
196+
t.assert.ok(true, 'client2 receives message')
197+
t.assert.equal(broker.connectedClients, 1)
198+
resolve()
164199
})
165-
client2.subscribe(_sameTopic, { qos: 0, retain: false }, () => {
166-
t.pass('client2 sub callback')
200+
client2.subscribe(sameTopic, { qos: 0, retain: false }, () => {
201+
t.assert.ok(true, 'client2 sub callback')
167202

168203
// pubClient
169204
const pubClient = mqtt.connect('mqtt://localhost', { clientId: 'pubClient' })
170-
pubClient.publish(_sameTopic, 'world', { qos: 0, retain: false }, () => {
171-
t.pass('pubClient publish event')
205+
pubClient.publish(sameTopic, 'world', { qos: 0, retain: false }, () => {
206+
t.assert.ok(true, 'pubClient publish event')
172207
pubClient.end()
173208
})
174209
})

test/helperAsync.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ export async function connect (s, opts = {}) {
5858
connect.password = connect.password || 'my pass'
5959
connect.keepalive = connect.keepalive || 0
6060
const expectedReturnCode = opts.expectedReturnCode || 0
61-
const verifyReturnedOk = opts.verifyReturnedOk !== false
61+
const verifyIsConnack = opts.verifyIsConnack !== false
62+
const verifyReturnedOk = verifyIsConnack ? opts.verifyReturnedOk !== false : false
63+
6264
if (opts.autoClientId) {
6365
connect.clientId = 'my-client-' + clients++
6466
}
@@ -73,7 +75,7 @@ export async function connect (s, opts = {}) {
7375

7476
s.inStream.write(connect)
7577
const { value: connack } = await s.outStream.next()
76-
if (connack.cmd !== 'connack') {
78+
if (verifyIsConnack && connack?.cmd !== 'connack') {
7779
throw new Error('Expected connack')
7880
}
7981
if (verifyReturnedOk && (connack.returnCode !== expectedReturnCode)) {

0 commit comments

Comments
 (0)