-
Notifications
You must be signed in to change notification settings - Fork 12
Open
Description
Bug description
Hello Guys, I am facing an issue where fetchSockets
is not returning a response in cluster mode. I have tried using ioredis
as well as redis
connecter, removed polling
so do not require sticky, tried using both redis-adapter
and redis-stream-adapter
but same results.
Steps to reproduce
I was able to reproduce it via a mocha test. Here is my configuration.
node v19.7.0
// package.json
"ioredis": "^5.3.2",
"socket.io": "^4.7.4",
"@socket.io/redis-streams-adapter": "^0.2.2",
socket.js
import { createAdapter } from '@socket.io/redis-streams-adapter'
import { getIoRedisClientForSocket } from 'redis.connection.js'
const io = new Server(httpServer, {
transports: ['websocket']
})
io.adapter(createAdapter(getIoRedisClientForSocket()))
io.on('connection', (socket) => {
socket.on('join_a_room', async (params, callback) => {
try {
const socketsInRoom = (await io.in(params.roomName).fetchSockets())?.length
console.log(':::::::::: SOCKETS IN ROOM ::::::: ', socketsInRoom)
const isRoomEmpty = socketsInRoom === 0
if (isRoomEmpty) {
// Do some action here
}
socket.join(params.roomName)
callback(true, { message: 'Joined room', socketsInRoom })
} catch (error) {
console.log('>>> ERRR::: ', error)
callback(false, { message: error.message })
}
})
})
redis.connection.js
import { Redis } from 'ioredis'
export const getIoRedisClientForSocket = () => {
return new Redis(config.redis.url)
}
socket.test.js
describe.only('Multiple instance testing', () => {
before(() => {
// Default server is listing to 4001. Creating new server to listen to 4002.
const httpServer = http.Server(app)
new Socket().listen(httpServer)
httpServer.listen(4002, () => { console.log('Test server started on 4002') })
})
it.only('should get proper response', async () => {
const userA = await createUser()
const userB = await createUser({ phone: '9898989888', email: 'test2@test.com' })
const userAToken = await getLoginToken(userA)
const userBToken = await getLoginToken(userB)
const userAsocket = await connectUserSocket(userAToken)
const userBsocket = await connectUserSocket(userBToken, 'ws://127.0.0.1:4002')
await new Promise((resolve) => {
const eventCallback = (isSuccess, data) => {
console.log('>>>>> userA joined:: ', data)
expect(isSuccess).to.equal(true)
resolve()
}
userAsocket.emit('join_a_room', {
roomName: 'test'
}, eventCallback)
})
await new Promise((resolve) => {
const eventCallback = (isSuccess, data) => {
console.log('>>>>> userB joined:: ', data)
expect(isSuccess).to.equal(true)
resolve()
}
userBsocket.emit('join_a_room', {
roomName: 'test'
}, eventCallback)
})
})
})
OUTPUT
Have added logs at some places
>>>> Stream Adapter >> {"type":1,"uid":"c42282ebea7991ca","nsp":"/"}
Test server started on 4002
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":1}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":1} 1716207983034-0
[0d79ac3ff828190f] new event of type 1 from c42282ebea7991ca
>>>> Stream Adapter >> {"type":2,"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[c42282ebea7991ca] new event of type 2 from 0d79ac3ff828190f
>>>> SOC : broadcast operator rooms Set(1) { '664b416ea5b08951db0a5c5d' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >> {"type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage :: {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}}
>>>> Stream Adapter >> {"type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage :: {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received :: 6a404d7bfcfbcffa
>>>> SOC : broadcast operator rooms Set(1) { '664b416fa5b08951db0a5c5f' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >> {"type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage :: {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}}
>>>> Stream Adapter >> {"type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[c42282ebea7991ca] new event of type 8 from 0d79ac3ff828190f
>>>> SOC onMessage :: {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received :: 49de3c88dbea8de6
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[0d79ac3ff828190f] ignore message from self
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >> {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message :: {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage :: {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>> Stream Adapter >> {"type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage :: {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received :: de66c29c37ca54b5
:::::::::: SOCKETS IN ROOM ::::::: 0
>>>>> userA joined:: { message: 'Joined room', socketsInRoom: 0 }
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >> {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage :: {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>>> Stream Adapter :: OnRawMessage {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message :: {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[c42282ebea7991ca] ignore message from self
>>> ERRR::: Error: timeout reached: missing 1 responses
at Timeout._onTimeout (/Users/nilkanthparmar/Documents/Projects/Ellu/ellu_backend/node_modules/socket.io-adapter/dist/cluster-adapter.js:611:28)
at listOnTimeout (node:internal/timers:568:17)
at processTimers (node:internal/timers:511:7)
>>>> Stream Adapter >> {"type":2,"uid":"977b7eaed0b6bef1","nsp":"/"}
>>>>> userB joined:: { message: 'timeout reached: missing 1 responses' }
1) should get proper response
1) should get proper response
Metadata
Metadata
Assignees
Labels
No labels