Skip to content

fetchSocket not returning response for redis-stream-adapter ioredis #13

@nilkanth987

Description

@nilkanth987

Bug description

Hello Guys, I am facing an issue where fetchSockets is not returning a response in cluster mode. I have tried using ioredis as well as redis connecter, removed polling so do not require sticky, tried using both redis-adapter and redis-stream-adapter but same results.

Steps to reproduce

I was able to reproduce it via a mocha test. Here is my configuration.

node v19.7.0
// package.json
"ioredis": "^5.3.2",
"socket.io": "^4.7.4",
"@socket.io/redis-streams-adapter": "^0.2.2",

socket.js

import { createAdapter } from '@socket.io/redis-streams-adapter'
import { getIoRedisClientForSocket } from 'redis.connection.js'
const io = new Server(httpServer, {
      transports: ['websocket']
    })
    io.adapter(createAdapter(getIoRedisClientForSocket()))
    io.on('connection', (socket) => {
        socket.on('join_a_room', async (params, callback) => {
        try {
          const socketsInRoom = (await io.in(params.roomName).fetchSockets())?.length
          console.log(':::::::::: SOCKETS IN ROOM ::::::: ', socketsInRoom)
          const isRoomEmpty = socketsInRoom === 0
          if (isRoomEmpty) {
            // Do some action here
          }
          socket.join(params.roomName)
          callback(true, { message: 'Joined room', socketsInRoom })
        } catch (error) {
          console.log('>>> ERRR::: ', error)
          callback(false, { message: error.message })
        }
      })
    })

redis.connection.js

import { Redis } from 'ioredis'
export const getIoRedisClientForSocket = () => {
  return new Redis(config.redis.url)
}

socket.test.js

describe.only('Multiple instance testing', () => {
    before(() => {
       // Default server is listing to 4001. Creating new server to listen to 4002. 
      const httpServer = http.Server(app)
      new Socket().listen(httpServer)
      httpServer.listen(4002, () => { console.log('Test server started on 4002') })
    })
    it.only('should get proper response', async () => {
      const userA = await createUser()
      const userB = await createUser({ phone: '9898989888', email: 'test2@test.com' })
      const userAToken = await getLoginToken(userA)
      const userBToken = await getLoginToken(userB)
      const userAsocket = await connectUserSocket(userAToken)
      const userBsocket = await connectUserSocket(userBToken, 'ws://127.0.0.1:4002')
      await new Promise((resolve) => {
        const eventCallback = (isSuccess, data) => {
          console.log('>>>>> userA joined:: ', data)
          expect(isSuccess).to.equal(true)
          resolve()
        }
        userAsocket.emit('join_a_room', {
          roomName: 'test'
        }, eventCallback)
      })
      await new Promise((resolve) => {
        const eventCallback = (isSuccess, data) => {
          console.log('>>>>> userB joined:: ', data)
          expect(isSuccess).to.equal(true)
          resolve()
        }
        userBsocket.emit('join_a_room', {
          roomName: 'test'
        }, eventCallback)
      })
    })
})

OUTPUT

Have added logs at some places

>>>> Stream Adapter >>  {"type":1,"uid":"c42282ebea7991ca","nsp":"/"}
Test server started on 4002
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":1}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":1} 1716207983034-0
[0d79ac3ff828190f] new event of type 1 from c42282ebea7991ca
>>>> Stream Adapter >>  {"type":2,"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[c42282ebea7991ca] new event of type 2 from 0d79ac3ff828190f
>>>> SOC : broadcast operator rooms Set(1) { '664b416ea5b08951db0a5c5d' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  6a404d7bfcfbcffa
>>>> SOC : broadcast operator rooms Set(1) { '664b416fa5b08951db0a5c5f' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[c42282ebea7991ca] new event of type 8 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  49de3c88dbea8de6
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[0d79ac3ff828190f] ignore message from self
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  de66c29c37ca54b5
:::::::::: SOCKETS IN ROOM :::::::  0
>>>>> userA joined::  { message: 'Joined room', socketsInRoom: 0 }
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[c42282ebea7991ca] ignore message from self
>>> ERRR:::  Error: timeout reached: missing 1 responses
    at Timeout._onTimeout (/Users/nilkanthparmar/Documents/Projects/Ellu/ellu_backend/node_modules/socket.io-adapter/dist/cluster-adapter.js:611:28)
    at listOnTimeout (node:internal/timers:568:17)
    at processTimers (node:internal/timers:511:7)
>>>> Stream Adapter >>  {"type":2,"uid":"977b7eaed0b6bef1","nsp":"/"}
>>>>> userB joined::  { message: 'timeout reached: missing 1 responses' }
      1) should get proper response
      1) should get proper response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions