Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Add multiple conference nodes serve for one room feature #1020

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions source/agent/audio/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,13 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
if (message.purpose === 'conference' && controller) {
if ((message.type === 'node' && message.id === controller) ||
(message.type === 'worker' && controller.startsWith(message.id))) {
log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.');
that.deinit();
process.exit();
if (message.tasks && message.tasks.length > 0) {
controller = message.tasks[Math.floor(Math.random() * message.tasks.length)];
} else {
log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.');
that.deinit();
process.exit();
}
}
}
};
Expand Down
39 changes: 39 additions & 0 deletions source/agent/conference/accessController.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio
var that = {},
cluster_name = spec.clusterName,
self_rpc_id = spec.selfRpcId,
redisClient = spec.redisClient,
in_room = spec.inRoom;

/*
Expand All @@ -27,6 +28,34 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio
*/
var sessions = {};

var updateFromRedis = (channel, msg) => {
var name = channel.substring(in_room.length);
if (name === "sessions") {
if (msg.type === 'add') {
terminals[msg.id] = msg.data;
} else {
if (terminals[msg.id]) {
delete terminals[msg.id];
}
}
}
}

var loadFromRedis = (moduleName, id) => {
var sessionKey = in_room + "sessions";
redisClient.subscribeChannel(sessionKey);
return redisClient.getItems(sessionKey)
.then(function(streamList){
if (Object.keys(streamList).length > 0) {
sessions = streamList;
}
return Promise.resolve('ok');
}).catch(function(err) {
log.error('Load data from redis failed, reason:', err);
return Promise.reject(err);
});
}


//Should terminateSession always succeed?
const terminateSession = (sessionId) => {
Expand All @@ -38,12 +67,14 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio
return rpcReq.recycleWorkerNode(session.locality.agent, session.locality.node, {room: in_room, task: sessionId})
}).then(function() {
delete sessions[sessionId];
redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]);
})
.catch(function(reason) {
log.debug('AccessNode not recycled', session.locality);
});
} else {
delete sessions[sessionId];
redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]);
return Promise.resolve('ok');
}
};
Expand Down Expand Up @@ -249,9 +280,11 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio
return Promise.reject('Session has been aborted');
}
sessions[sessionId].state = 'connecting';
redisClient.updateToRedis("add", "accesscontroller", "sessions", sessionId, sessions[sessionId]);
return 'ok';
}, (e) => {
delete sessions[sessionId];
redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]);
return Promise.reject(e.message ? e.message : e);
});
};
Expand Down Expand Up @@ -339,9 +372,11 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio
return Promise.reject('Session has been aborted');
}
sessions[sessionId].state = 'connecting';
redisClient.updateToRedis("add", "accesscontroller", "sessions", sessionId, sessions[sessionId]);
return 'ok';
}, (e) => {
delete sessions[sessionId];
redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]);
return Promise.reject(e.message ? e.message : e);
});
};
Expand Down Expand Up @@ -375,6 +410,10 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio
}
};

loadFromRedis("all")
.then(function() {
log.info("Load items from redis");
});
return that;
};

Loading