diff --git a/source/agent/audio/index.js b/source/agent/audio/index.js index a37b441fc..cd3ba8497 100644 --- a/source/agent/audio/index.js +++ b/source/agent/audio/index.js @@ -348,9 +348,13 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) { if (message.purpose === 'conference' && controller) { if ((message.type === 'node' && message.id === controller) || (message.type === 'worker' && controller.startsWith(message.id))) { - log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.'); - that.deinit(); - process.exit(); + if (message.tasks && message.tasks.length > 0) { + controller = message.tasks[Math.floor(Math.random() * message.tasks.length)]; + } else { + log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.'); + that.deinit(); + process.exit(); + } } } }; diff --git a/source/agent/conference/accessController.js b/source/agent/conference/accessController.js index ca21dbb81..26032e247 100644 --- a/source/agent/conference/accessController.js +++ b/source/agent/conference/accessController.js @@ -12,6 +12,7 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio var that = {}, cluster_name = spec.clusterName, self_rpc_id = spec.selfRpcId, + redisClient = spec.redisClient, in_room = spec.inRoom; /* @@ -27,6 +28,34 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio */ var sessions = {}; + var updateFromRedis = (channel, msg) => { + var name = channel.substring(in_room.length); + if (name === "sessions") { + if (msg.type === 'add') { + terminals[msg.id] = msg.data; + } else { + if (terminals[msg.id]) { + delete terminals[msg.id]; + } + } + } + } + + var loadFromRedis = (moduleName, id) => { + var sessionKey = in_room + "sessions"; + redisClient.subscribeChannel(sessionKey); + return redisClient.getItems(sessionKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + sessions = streamList; + } + return Promise.resolve('ok'); + }).catch(function(err) { + log.error('Load data from redis failed, reason:', err); + return Promise.reject(err); + }); + } + //Should terminateSession always succeed? const terminateSession = (sessionId) => { @@ -38,12 +67,14 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio return rpcReq.recycleWorkerNode(session.locality.agent, session.locality.node, {room: in_room, task: sessionId}) }).then(function() { delete sessions[sessionId]; + redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]); }) .catch(function(reason) { log.debug('AccessNode not recycled', session.locality); }); } else { delete sessions[sessionId]; + redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]); return Promise.resolve('ok'); } }; @@ -249,9 +280,11 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio return Promise.reject('Session has been aborted'); } sessions[sessionId].state = 'connecting'; + redisClient.updateToRedis("add", "accesscontroller", "sessions", sessionId, sessions[sessionId]); return 'ok'; }, (e) => { delete sessions[sessionId]; + redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]); return Promise.reject(e.message ? e.message : e); }); }; @@ -339,9 +372,11 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio return Promise.reject('Session has been aborted'); } sessions[sessionId].state = 'connecting'; + redisClient.updateToRedis("add", "accesscontroller", "sessions", sessionId, sessions[sessionId]); return 'ok'; }, (e) => { delete sessions[sessionId]; + redisClient.updateToRedis("delete", "accesscontroller", "sessions", sessionId, sessions[sessionId]); return Promise.reject(e.message ? e.message : e); }); }; @@ -375,6 +410,10 @@ module.exports.create = function(spec, rpcReq, on_session_established, on_sessio } }; + loadFromRedis("all") + .then(function() { + log.info("Load items from redis"); + }); return that; }; diff --git a/source/agent/conference/conference.js b/source/agent/conference/conference.js index d1a2fa916..5aabd9cb3 100644 --- a/source/agent/conference/conference.js +++ b/source/agent/conference/conference.js @@ -11,6 +11,7 @@ var AccessController = require('./accessController'); var RoomController = require('./roomController'); var dataAccess = require('./data_access'); var Participant = require('./participant'); +var Redis = require('./redisUtil'); const { QuicController } = require('./quicController'); // Logger @@ -99,6 +100,7 @@ var Conference = function (rpcClient, selfRpcId) { var that = {}, is_initializing = false, room_id, + redis, roomController, accessController; @@ -237,6 +239,198 @@ var Conference = function (rpcClient, selfRpcId) { var rpcChannel = require('./rpcChannel')(rpcClient), rpcReq = require('./rpcRequest')(rpcChannel); + var updateFromRedis = (channel, msg) => { + var name = channel.substring(room_id.length); + switch (name) { + case 'participants': + if (participants[msg.id]) { + if (msg.type === "add") { + participants[msg.id].updateSpec(msg.data); + } else { + delete participants[msg.id]; + } + } else { + if (msg.type === "add") { + participants[msg.id] = Participant(msg.data, rpcReq); + } + } + break; + case 'streams': + if (streams[msg.id]) { + if (msg.type === "add") { + if (msg.type === "forward") { + streams[msg.id].update(msg.data); + } + } else { + delete streams[msg.id]; + } + } else { + if (msg.type === "add") { + var streamInfo; + if (msg.data.type === "mixed") { + streamInfo = new MixedStream(msg.id, msg.data.info.label); + } else { + if (msg.data.info.type === "selecting") { + streamInfo = new SelectedStream(msg.id); + } else { + streamInfo = new ForwardStream(msg.id, msg.data.media, msg.data.data, msg.data.info, msg.data.locality); + } + } + streams[msg.id] = streamInfo; + } + } + break; + case 'subscriptions': + if (subscriptions[msg.id]) { + if (msg.type === "add") { + subscriptions[msg.id].updateSubscription(msg.data); + } else { + delete subscriptions[msg.id]; + } + } else { + if (msg.type === "add") { + subscriptions[msg.id] = new Subscription(msg.data.id, msg.data.media, msg.data.data, msg.data.locality, msg.data.info); + } + } + break; + case 'trackowners': + if (msg.type === 'add') { + trackOwners[msg.id] = msg.data; + } else { + if (trackOwners[msg.id]) { + delete trackOwners[msg.id]; + } + } + break; + default: + log.info('Invalid conference update items'); + } + + } + + var onSubMessage = (channel, message) => { + var msg = JSON.parse(message); + if (msg.self === selfRpcId) { + log.debug("Get own message and ignore:", message); + + } else { + if (msg.type === "leave") { + + } else { + switch (msg.module) { + case 'conference': + updateFromRedis(channel, msg); + break; + case 'roomcontroller': + if (roomController) { + roomController.updateFromRedis(channel, msg); + } + break; + case 'accesscontroller': + if (accessController) { + accessController.updateFromRedis(channel, msg); + } + break; + case 'rtccontroller': + if (rtcController) { + rtcController.updateFromRedis(channel, msg); + } + break; + case 'quiccontroller': + if (quicController) { + quicController.updateFromRedis(channel, msg); + } + break; + default: + log.info('Invalid conference module'); + } + } + } + + } + + var loadFromRedis = (room, moduleName, id) => { + if (moduleName === "all") { + var participantKey = room + "participants"; + redis.subscribeChannel(participantKey); + redis.getItems(participantKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + for (let item in streamList) { + if (!Object.keys(participants).includes(item)) { + var participantInfo = Participant(streamList[item], rpcReq); + participants[item] = participantInfo; + } + } + } + return Promise.resolve('ok'); + }) + .then(function() { + var streamsKey = room + "streams"; + redis.subscribeChannel(streamsKey); + return redis.getItems(streamsKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + for (let item in streamList) { + if (!Object.keys(streams).includes(item)) { + var streamInfo; + if (streamList[item].type === "mixed") { + streamInfo = new MixedStream(item, streamList[item].view); + } else { + if (streamList[item].info.type === "selecting") { + streamInfo = new SelectedStream(item); + } else { + streamInfo = new ForwardStream(item, streamList[item].media, streamList[item].data, streamList[item].info, streamList[item].locality); + } + } + streams[item] = streamInfo; + } + } + } + return Promise.resolve('ok'); + }); + }) + .then(function() { + var subscriptionKey = room + "subscriptions"; + redis.subscribeChannel(subscriptionKey); + return redis.getItems(subscriptionKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + for (let item in streamList) { + if (!Object.keys(subscriptions).includes(item)) { + var subscriptionInfo = new Subscription(streamList[item].id, streamList[item].media, streamList[item].data, streamList[item].locality, streamList[item].info); + subscriptions[item] = subscriptionInfo; + } + } + } + return Promise.resolve('ok'); + }); + }) + .then(function() { + var trackOwnersKey = room + "trackowners"; + redis.subscribeChannel(trackOwnersKey); + return redis.getItems(trackOwnersKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + trackOwners = streamList + } + return Promise.resolve('ok');; + }); + }).catch(function(err) { + log.error('Load data from redis failed, reason:', err); + return Promise.reject(err); + }); + } else if (id !== undefined) { + var key = room + moduleName; + return redis.getItem(key, id); + } else { + var key = room + moduleName; + return redis.getItems(key); + } + + } + + var onSessionEstablished = (participantId, sessionId, direction, sessionInfo) => { log.debug('onSessionEstablished, participantId:', participantId, 'sessionId:', sessionId, 'direction:', direction, 'sessionInfo:', JSON.stringify(sessionInfo)); if (direction === 'in') { @@ -315,6 +509,8 @@ var Conference = function (rpcClient, selfRpcId) { if (origin === undefined) { origin = {isp:"isp", region:"region"}; } + + var storeConfig = false; if (is_initializing) { return new Promise(function(resolve, reject) { var interval = setInterval(function() { @@ -336,13 +532,42 @@ var Conference = function (rpcClient, selfRpcId) { } } else { is_initializing = true; - return dataAccess.room.config(roomId) + room_id = roomId; + redis = Redis.create(true, roomId, selfRpcId,onSubMessage); + var cluster_name = global.config.cluster.name || 'owt-cluster'; + return new Promise(function(resolve, reject) { + rpcClient.remoteCall(cluster_name, 'getScheduledWorkers', ['conference', roomId], {callback: (agentList) => { + log.info("Get agent list:", agentList, " self:", selfRpcId); + if (agentList.length === 0 || (agentList.length === 1 && selfRpcId.startsWith(agentList[0]))) { + log.info("Clean room data for room:", roomId, " agentList:", agentList); + redis.cleanRoomdata(roomId); + } + resolve('ok'); + }}); + }) + .then(function() { + return loadFromRedis(roomId, "config", roomId); + }) .then(function(config) { - //log.debug('initializing room:', roomId, 'got config:', JSON.stringify(config)); + if (config === null) { + storeConfig = true; + return dataAccess.room.config(roomId) + } else { + return Promise.resolve(config); + } + }) + .then(function(config) { + log.info('initializing room:', roomId, 'got config:', JSON.stringify(config)); room_config = config; + if (storeConfig) { + redis.updateToRedis("add", "conference", "config", roomId, room_config); + } + room_config.internalConnProtocol = global.config.internal.protocol; StreamConfigure(room_config); - + return loadFromRedis(roomId, "all"); + }) + .then(function() { return new Promise(function(resolve, reject) { RoomController.create( { @@ -352,47 +577,60 @@ var Conference = function (rpcClient, selfRpcId) { room: roomId, config: room_config, origin: origin, - selfRpcId: selfRpcId + selfRpcId: selfRpcId, + redisClient: redis }, function onOk(rmController) { log.debug('room controller init ok'); - roomController = rmController; - room_id = roomId; + roomController = rmController; is_initializing = false; roomController.getMixedStreams().forEach(({streamId, view}) => { - const mixedStreamInfo = new MixedStream(streamId, view); - streams[streamId] = mixedStreamInfo; - streams[streamId].info.origin = origin; - log.debug('Mixed stream info:', mixedStreamInfo); - room_config.notifying.streamChange && - sendMsg('room', 'all', 'stream', - {id: streamId, status: 'add', data: mixedStreamInfo.toPortalFormat()}); + if(!streams[streamId]) { + const mixedStreamInfo = new MixedStream(streamId, view); + streams[streamId] = mixedStreamInfo; + streams[streamId].info.origin = origin; + log.debug('Mixed stream info:', mixedStreamInfo); + var streamInfo = streams[streamId].getStreamInfo(); + streamInfo.view = view; + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); + room_config.notifying.streamChange && + sendMsg('room', 'all', 'stream', + {id: streamId, status: 'add', data: mixedStreamInfo.toPortalFormat()}); + } }); roomController.getActiveAudioStreams().forEach((streamId) => { - const selectedStreamInfo = new SelectedStream(streamId); - streams[streamId] = selectedStreamInfo; - streams[streamId].info.origin = origin; - log.debug('Selected stream info:', selectedStreamInfo); - room_config.notifying.streamChange && - sendMsg('room', 'all', 'stream', - {id: streamId, status: 'add', data: selectedStreamInfo.toPortalFormat()}); + if(!streams[streamId]) { + const selectedStreamInfo = new SelectedStream(streamId); + streams[streamId] = selectedStreamInfo; + streams[streamId].info.origin = origin; + log.debug('Selected stream info:', selectedStreamInfo); + var streamInfo = streams[streamId].getStreamInfo(); + streamInfo.view = view; + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); + room_config.notifying.streamChange && + sendMsg('room', 'all', 'stream', + {id: streamId, status: 'add', data: selectedStreamInfo.toPortalFormat()}); + } }); - participants['admin'] = Participant({ - id: 'admin', - user: 'admin', - role: 'admin', - portal: undefined, - origin: origin, - permission: { - subscribe: {audio: true, video: true}, - publish: {audio: true, video: true} - } - }, rpcReq); - - rtcController = new RtcController(room_id, rpcReq, selfRpcId, global.config.cluster.name || 'owt-cluster'); + if(!participants['admin']) { + participants['admin'] = Participant({ + id: 'admin', + user: 'admin', + role: 'admin', + portal: undefined, + origin: origin, + permission: { + subscribe: {audio: true, video: true}, + publish: {audio: true, video: true} + } + }, rpcReq); + } + + rtcController = new RtcController(room_id, rpcReq, selfRpcId, global.config.cluster.name || 'owt-cluster', redis); + rtcController.loadData(); // Events rtcController.on('transport-established', transportId => { const transport = rtcController.getTransport(transportId); @@ -427,7 +665,7 @@ var Conference = function (rpcClient, selfRpcId) { onSessionAborted(data.owner, sessionId, data.direction, data.reason); }); - quicController = new QuicController(room_id, rpcReq, selfRpcId, global.config.clusterName || 'owt-cluster'); + quicController = new QuicController(room_id, rpcReq, selfRpcId, global.config.clusterName || 'owt-cluster', redis); quicController.on('session-established', (sessionInfo) => { const sessionId = sessionInfo.id; const media = { tracks: sessionInfo.tracks }; @@ -449,6 +687,7 @@ var Conference = function (rpcClient, selfRpcId) { accessController = AccessController.create({clusterName: global.config.cluster.name || 'owt-cluster', selfRpcId: selfRpcId, inRoom: room_id, + redisClient: redis, mediaIn: room_config.mediaIn, mediaOut: room_config.mediaOut}, rpcReq, @@ -457,6 +696,7 @@ var Conference = function (rpcClient, selfRpcId) { onLocalSessionSignaling, rtcController, quicController); + redis.saveList(roomId,selfRpcId); resolve('ok'); }, function onError(reason) { @@ -464,6 +704,7 @@ var Conference = function (rpcClient, selfRpcId) { is_initializing = false; reject('roomController init failed. reason: ' + reason); }); + }); }).catch(function(err) { log.error('Init room failed, reason:', err); @@ -476,6 +717,14 @@ var Conference = function (rpcClient, selfRpcId) { } }; + var cleanRedis = function() { + var items = ["streams", "rtcoperations", "rtctracks", "trackowners", "rtctransports", "rtctracks", "subscriptions", "mixviews", "terminals", "config", "roomstreams"]; + for (var i in items) { + var key = room_id + items[i]; + redis.delByKey(key); + } + } + var destroyRoom = function() { const doClean = () => { accessController && accessController.destroy(); @@ -483,8 +732,17 @@ var Conference = function (rpcClient, selfRpcId) { roomController && roomController.destroy(); roomController = undefined; subscriptions = {}; +/* + for (var stream_id in streams) { + redis.updateToRedis("delete", "conference", "streams", stream_id); + } +*/ + cleanRedis(); streams = {}; participants = {}; + //redis.updateToRedis("delete", "conference", "config", room_id); + + redis.deleteList(room_id, selfRpcId); selfCleanTimer && clearTimeout(selfCleanTimer); selfCleanTimer = null; room_id = undefined; @@ -546,14 +804,18 @@ var Conference = function (rpcClient, selfRpcId) { }; const addParticipant = function(participantInfo, permission) { - participants[participantInfo.id] = Participant({ - id: participantInfo.id, - user: participantInfo.user, - role: participantInfo.role, - portal: participantInfo.portal, - origin: participantInfo.origin, - permission: permission - }, rpcReq); + var participant = { + id: participantInfo.id, + user: participantInfo.user, + role: participantInfo.role, + portal: participantInfo.portal, + origin: participantInfo.origin, + permission: permission + }; + + participants[participantInfo.id] = Participant(participant, rpcReq); + + redis.updateToRedis("add", "conference", "participants", participantInfo.id, participant); room_config.notifying.participantActivities && sendMsg(participantInfo.id, 'others', 'participant', {action: 'join', data: {id: participantInfo.id, user: participantInfo.user, role: participantInfo.role}}); return Promise.resolve('ok'); }; @@ -575,6 +837,8 @@ var Conference = function (rpcClient, selfRpcId) { var participant = participants[participantId]; var left_user = participant.getInfo(); delete participants[participantId]; + + redis.updateToRedis("delete", "conference", "participants", participantId); room_config.notifying.participantActivities && sendMsg('room', 'all', 'participant', {action: 'leave', data: left_user.id}); } }; @@ -607,12 +871,13 @@ var Conference = function (rpcClient, selfRpcId) { return Promise.reject('Stream already exists'); } - streams[id] = { + var streamInfo = { id: id, type: 'forward', info: info, isInConnecting: true - }; + } + streams[id] = streamInfo; return Promise.resolve('ok'); }; @@ -652,8 +917,12 @@ var Conference = function (rpcClient, selfRpcId) { return Promise.all(pubs).then(() => { if (participants[info.owner]) { streams[id] = fwdStream; + var streamInfo = streams[id].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", id, streamInfo); + pubArgs.forEach(pubArg => { trackOwners[pubArg.id] = id; + redis.updateToRedis("add", "conference", "trackowners", pubArg.id, id); if (room_config.selectActiveAudio) { if (pubArg.media.audio) { roomController.selectAudio(pubArg.id, () => { @@ -687,6 +956,8 @@ var Conference = function (rpcClient, selfRpcId) { function() { if (participants[info.owner]) { streams[id] = fwdStream; + var streamInfo = streams[id].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", id, streamInfo); if (!isReadded) { setTimeout(() => { room_config.notifying.streamChange && @@ -706,6 +977,8 @@ var Conference = function (rpcClient, selfRpcId) { const updateStreamInfo = (streamId, info) => { if (!streams[streamId].isInConnecting && streams[streamId].update(info)) { + var streamInfo = streams[streamId].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); room_config.notifying.streamChange && sendMsg('room', 'all', 'stream', { id: streamId, @@ -734,6 +1007,8 @@ var Conference = function (rpcClient, selfRpcId) { }); } delete streams[streamId]; + redis.updateToRedis("delete", "conference", "streams", streamId); + setTimeout(() => { room_config.notifying.streamChange && sendMsg('room', 'all', 'stream', {id: streamId, status: 'remove'}); }, 10); @@ -755,6 +1030,7 @@ var Conference = function (rpcClient, selfRpcId) { info: info, isInConnecting: true }; + redis.updateToRedis("add", "conference", "subscriptions", id, subscriptions[id]); return Promise.resolve('ok'); }; @@ -802,6 +1078,8 @@ var Conference = function (rpcClient, selfRpcId) { return Promise.all(subs).then(() => { if (participants[info.owner]) { subscriptions[id] = subscription; + var subscriptionInfo = subscriptions[id].getSubscriptionInfo(); + redis.updateToRedis("add", "conference", "subscriptions", id, subscriptionInfo); return Promise.resolve('ok'); } else { subArgs.forEach(subArg => { @@ -823,6 +1101,7 @@ var Conference = function (rpcClient, selfRpcId) { }); } delete subscriptions[subscriptionId]; + redis.updateToRedis("delete", "conference", "subscriptions", subscriptionId); } resolve('ok'); }); @@ -833,6 +1112,12 @@ var Conference = function (rpcClient, selfRpcId) { if (streams[streamId].info.type === 'sip' || streams[streamId].info.type === 'analytics') { return removeStream(streamId); } else { + for (var key in trackOwners) { + if (trackOwners[key] === streamId) { + delete trackOwners[key]; + redis.updateToRedis("delete", "conference", "trackowners", key); + } + } return accessController.terminate(streamId, 'in', 'Participant terminate'); } } else { @@ -1489,6 +1774,8 @@ var Conference = function (rpcClient, selfRpcId) { return Promise.all(mixOps).then(() => { if (streams[streamId].info.inViews.indexOf(toView) === -1) { streams[streamId].info.inViews.push(toView); + var streamInfo = streams[streamId].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); } return Promise.resolve('ok'); }).catch(reason => { @@ -1514,6 +1801,8 @@ var Conference = function (rpcClient, selfRpcId) { })); return Promise.all(unmixOps).then(() => { streams[streamId].info.inViews.splice(streams[streamId].info.inViews.indexOf(fromView), 1); + var streamInfo = streams[streamId].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); return Promise.resolve('ok'); }).catch(reason => { log.info('roomController.unmix failed, reason:', reason); @@ -1545,6 +1834,8 @@ var Conference = function (rpcClient, selfRpcId) { } }); var updateFields = (track === 'av') ? ['audio.status', 'video.status'] : [track + '.status']; + var streamInfo = streams[streamId].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); room_config.notifying.streamChange && updateFields.forEach((fieldData) => { sendMsg('room', 'all', 'stream', {status: 'update', id: streamId, data: {field: fieldData, value: status}}); }); @@ -1610,6 +1901,8 @@ var Conference = function (rpcClient, selfRpcId) { roomController.setLayout(streams[streamId].info.label, layout, function(updated) { if (streams[streamId]) { streams[streamId].info.layout = convertLayout(updated); + var streamInfo = streams[streamId].getStreamInfo(); + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo); resolve('ok'); } else { reject('stream early terminated'); @@ -1780,6 +2073,9 @@ var Conference = function (rpcClient, selfRpcId) { t.status = status; } }); + + var subscriptionfo = subscriptions[subscriptionId].getSubscriptionInfo(); + redis.updateToRedis("add", "conference", "subscriptions", subscriptionId, subscriptionfo); return 'ok'; }); }; @@ -1881,6 +2177,9 @@ var Conference = function (rpcClient, selfRpcId) { if (streams[streamId]) { layout = convertLayout(layout); streams[streamId].info.layout = layout; + var streamInfo = streams[streamId].getStreamInfo(); + streamInfo.view = view; + redis.updateToRedis("add", "conference", "streams", streamId, streamInfo) room_config.notifying.streamChange && sendMsg('room', 'all', 'stream', {status: 'update', id: streamId, data: {field: 'video.layout', value: layout}}); callback('callback', 'ok'); } else { @@ -1916,6 +2215,9 @@ var Conference = function (rpcClient, selfRpcId) { if (streams[mixedId] instanceof MixedStream) { if (streams[mixedId].info.activeInput !== input) { streams[mixedId].info.activeInput = input; + var streamInfo = streams[mixedId].getStreamInfo(); + streamInfo.view = view; + redis.updateToRedis("add", "conference", "streams", mixedId, streamInfo); room_config.notifying.streamChange && sendMsg('room', 'all', 'stream', {id: mixedId, status: 'update', data: {field: 'activeInput', value: input}}); @@ -1930,6 +2232,9 @@ var Conference = function (rpcClient, selfRpcId) { if (streams[activeAudioId].info.activeInput !== input) { streams[activeAudioId].info.activeInput = input; streams[activeAudioId].info.activeOwner = target.owner; + var streamInfo = streams[activeAudioId].getStreamInfo(); + streamInfo.view = view; + redis.updateToRedis("add", "conference", "streams", activeAudioId, streamInfo); room_config.notifying.streamChange && sendMsg('room', 'all', 'stream', {id: activeAudioId, status: 'update', data: {field: 'activeInput', value: input}}); @@ -1978,6 +2283,8 @@ var Conference = function (rpcClient, selfRpcId) { } }) ).then(() => { + var participantInfo = participants[participantId].getPartcipant(); + redis.updateToRedis("add", "conference", "participants", participantId, participantInfo); callback('callback', participants[participantId].getDetail()); }, (err) => { callback('callback', 'error', err.message ? err.message : err); @@ -2721,6 +3028,10 @@ var Conference = function (rpcClient, selfRpcId) { callback('callback', 'Success'); }; + that.close = function() { + + }; + //This interface is for fault tolerance. that.onFaultDetected = function (message) { if (message.purpose === 'portal' || message.purpose === 'sip') { @@ -2736,6 +3047,11 @@ var Conference = function (rpcClient, selfRpcId) { } else if (message.purpose === 'audio' || message.purpose === 'video') { roomController && roomController.onFaultDetected(message.purpose, message.type, message.id); + } else if (message.purpose === 'conference') { + //Reload redis data since one conference node or agent crashes + if (message.tasks && message.tasks.includes(room_id)) { + log.info("conference node serve for the same room exit:", message); + } } }; diff --git a/source/agent/conference/dist.json b/source/agent/conference/dist.json index 0dc5d53e0..442cd6cb9 100644 --- a/source/agent/conference/dist.json +++ b/source/agent/conference/dist.json @@ -18,6 +18,7 @@ "quicController.js", "stream.js", "subscription.js", + "redisUtil.js", "../index.js", "../networkHelper.js", "../../common/formatUtil.js", diff --git a/source/agent/conference/log4js_configuration.json b/source/agent/conference/log4js_configuration.json index 3a9b63f19..bb53a5f77 100644 --- a/source/agent/conference/log4js_configuration.json +++ b/source/agent/conference/log4js_configuration.json @@ -24,6 +24,7 @@ "Subscription": "INFO", "WorkingAgent": "INFO", "Connections": "INFO", + "Redis": "INFO", "InternalConnectionFactory": "INFO" } } diff --git a/source/agent/conference/package.json b/source/agent/conference/package.json index a959bc25d..3691e7861 100644 --- a/source/agent/conference/package.json +++ b/source/agent/conference/package.json @@ -8,6 +8,7 @@ "toml": "*", "mongoose": "^5.9.6", "uuid": "^8.0.0", + "redis": "3.0.2", "fraction.js": "^4.0.12" }, "devDependencies": { diff --git a/source/agent/conference/participant.js b/source/agent/conference/participant.js index 88a6795ba..fc2c77a8c 100644 --- a/source/agent/conference/participant.js +++ b/source/agent/conference/participant.js @@ -13,6 +13,27 @@ var Participant = function(spec, rpcReq) { origin = spec.origin, permission = spec.permission; + that.updateSpec = (spec) => { + id = spec.id; + role = spec.role; + user = spec.user; + portal = spec.portal; + origin = spec.origin; + permission = spec.permission; + + } + + that.getPartcipant = () => { + var participant = { + id: id, + role: role, + user: user, + portal: portal, + origin: origin, + permission: permission + } + } + that.update = (op, path, value) => { switch (path) { case '/permission/subscribe': diff --git a/source/agent/conference/quicController.js b/source/agent/conference/quicController.js index ae66ccfc5..2300c1eb4 100644 --- a/source/agent/conference/quicController.js +++ b/source/agent/conference/quicController.js @@ -18,11 +18,12 @@ const PENDING = 'pending'; class Transport { - constructor(id, owner, origin) { + constructor(id, owner, origin, redis) { this.id = id; this.owner = owner; this.origin = origin; this.locality = null; + this.redis = redis; this.state = INITIALIZING; } @@ -30,6 +31,18 @@ class Transport { this.locality = locality; this.state = PENDING; } + + getInfo() { + var info = { + id: this.id, + owner: this.owner, + origin: this.origin, + locality: this.locality, + state: this.state + } + + return info; + } } class Operation { @@ -44,6 +57,20 @@ class Operation { this.data = data; this.promise = Promise.resolve(); } + + getInfo() { + var info = { + id: this.id, + transport: this.transport, + transportId: this.transportId, + direction: this.direction, + tracks: this.tracks, + legacy: this.legacy, + attributes: this.attributes + } + + return info; + } } /* Events @@ -61,19 +88,49 @@ class QuicController extends EventEmitter { * onTransportSignaling, mediaOnOff, sendMsg * } */ - constructor(roomId, rpcReq, roomRpcId, clusterRpcId) { + constructor(roomId, rpcReq, roomRpcId, clusterRpcId, redis) { log.debug(`constructor ${roomId}, ${roomRpcId}, ${clusterRpcId}`); super(); this.roomId = roomId; this.roomRpcId = roomRpcId; this.rpcReq = rpcReq; this.clusterRpcId = clusterRpcId; + this.redis = redis; // Map {transportId => Transport} this.transports = new Map(); // Map {operationId => Operation} this.operations = new Map(); } + updateFromRedis(channel, msg) { + var name = channel.substring(room_id.length); + switch (name) { + case 'quictransports': + if (msg.type === 'add') { + const op = new Transport(msg.data.id, msg.data.owner, msg.data.origin); + op.setup(msg.data.locality); + this.transports.set(msg.data.id, op); + } else { + if (this.transports.has(msg.id)) { + this.transports.delete(msg.id); + } + } + break + case 'quicoperations': + if (msg.type === 'add') { + const op = new Operation(msg.data.sessionId, msg.data.transport, msg.data.direction, msg.data.tracks, msg.data.legacy, msg.data.attributes); + this.operations.set(msg.data.sessionId, op); + } else { + if (this.operations.has(msg.id)) { + this.operations.delete(msg.id); + } + } + break; + default: + log.info('Invalid roomControl update items'); + } + } + // Return Transport getTransport(transportId) { return this.transports.get(transportId); @@ -84,21 +141,6 @@ class QuicController extends EventEmitter { return this.operations.get(operationId); } - onSessionProgress(sessionId, status) - { - if (!status.data) { - log.error('onSessionProgress is called by QUIC connections.'); - return; - } - if (status.type === 'ready') { - if (!this.operations.get(sessionId)) { - log.error('Invalid session ID.'); - return; - } - this.emit('session-established', this.operations.get(sessionId)); - } - } - onSessionProgress(sessionId, status) { if (!status.data) { @@ -116,7 +158,9 @@ class QuicController extends EventEmitter { _createTransportIfNeeded(ownerId, sessionId, origin, tId) { if (!this.transports.has(tId)) { - this.transports.set(tId, new Transport(tId, ownerId, origin)); + var transport = new Transport(tId, ownerId, origin); + this.transports.set(tId, transport); + this.redis.updateToRedis("add", "quiccontroller", "quictransports", tId, transport.getInfo()); const taskConfig = {room: this.roomId, task: sessionId}; log.debug(`getWorkerNode ${this.clusterRpcId}, ${taskConfig}, ${origin}`); return this.rpcReq.getWorkerNode(this.clusterRpcId, 'quic', taskConfig, origin) @@ -159,6 +203,7 @@ class QuicController extends EventEmitter { } const op = new Operation(sessionId, transport, direction, tracks, data); this.operations.set(sessionId, op); + this.redis.updateToRedis("add", "quiccontroller", "quicoperations", sessionId, op.getInfo()); // Save promise for this operation const options = {transport:{id:transportId, type:'quic'}, tracks, controller: this.roomRpcId, data}; op.promise = this.rpcReq.initiate(locality.node, sessionId, 'quic', direction, options); @@ -189,6 +234,7 @@ class QuicController extends EventEmitter { const abortData = { direction: operation.direction, owner, reason }; this.emit('session-aborted', sessionId, abortData); this.operations.delete(sessionId); + this.redis.updateToRedis("delete", "quiccontroller", "quicoperations", sessionId); let emptyTransport = true; for (const [id, op] of this.operations) { @@ -206,6 +252,7 @@ class QuicController extends EventEmitter { }) .then(() => { this.transports.delete(transport.id); + this.redis.updateToRedis("delete", "quiccontroller", "quictransports", transport.id); }); } } diff --git a/source/agent/conference/redisUtil.js b/source/agent/conference/redisUtil.js new file mode 100644 index 000000000..13feb3624 --- /dev/null +++ b/source/agent/conference/redisUtil.js @@ -0,0 +1,283 @@ +// Copyright (C) <2019> Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +'use strict'; + +const redis = require('redis') +const bluebird = require('bluebird') +const log = require('./logger').logger.getLogger('Redis'); + +// Using promises +bluebird.promisifyAll(redis) + +module.exports.create = function (messageChannel, roomId, selfRpcId, onSubMessage) { + var that = {}, + room_id, selfRpcId, callback, redisClient, pubClient, subClient; + + var storeConnectState = false; + var pubConnectState = false; + + var retry_strategy = function(options) { + if (options.error && options.error.code === "ECONNREFUSED") { + // End reconnecting on a specific error and flush all commands with + // a individual error + log.error('The server refused the connection'); + return 1000; + } + if (options.total_retry_time > 1000 * 60 * 60) { + // End reconnecting after a specific timeout and flush all commands + // with a individual error + log.error('Retry time exhausted'); + return 1000; + } + if (options.attempt > 10000) { + // End reconnecting with built in error + return 1000; + } + // reconnect after + return Math.min(options.attempt * 1000, 3000); + }; + + if (messageChannel) { + room_id = roomId; + selfRpcId = selfRpcId; + callback = onSubMessage; + redisClient = redis.createClient({retry_strategy}), + pubClient = redis.createClient({retry_strategy}), + subClient = redis.createClient({retry_strategy}); + } else { + redisClient = redis.createClient({retry_strategy}); + } + + var addItem = function (key, id, data) { + if (!storeConnectState) { + log.error('not add item to redis for redis disconnected'); + return undefined; + } else { + return redisClient.hsetAsync(key, id, JSON.stringify(data)) + .then( + () => log.debug("Item:", key, " id:", id, " added"), + err => log.error("addItem with error:", err) + ) + } + }; + + var delItem = function (key, id) { + if (!storeConnectState) { + log.error('not delete item in redis for redis disconnected'); + return undefined; + } else { + return redisClient.hdelAsync(key, id) + .then( + res => log.debug("Item:", key, " id:", id, " removed with:", res), + err => log.error("delItem with error:", err) + ) + } + }; + + that.initRedisClient = function (messageChannel, roomId, selfRpcId, onSubMessage) { + if (messageChannel) { + room_id = roomId; + selfRpcId = selfRpcId; + callback = onSubMessage; + redisClient = redis.createClient(), + pubClient = redis.createClient(), + subClient = redis.createClient(); + } else { + redisClient = redis.createClient(); + } + } + + that.delByKey = function (key) { + log.info("delete key from redis:", key); + if (!storeConnectState) { + log.error('not delete key from redis for redis disconnected'); + return undefined; + } else { + return redisClient.delAsync(key) + .then( + res => log.info("Item:", key, " removed with:", res), + err => log.error("delItem with error:", err) + ); + } + }; + + that.getItem = function (key, id) { + return new Promise((resolve, reject) => { + if (!storeConnectState) { + resolve(null); + } else { + redisClient.hgetAsync(key, id) + .then(user => { + resolve(JSON.parse(user)); + }, error => { + reject(error); + }); + } + }) + }; + + that.getItems = function (key) { + return new Promise((resolve, reject) => { + if (!storeConnectState) { + reject('getItems failed for redis disconnected'); + } else { + redisClient.hgetallAsync(key) + .then(users => { + const userList = {} + for (let user in users) { + var item = JSON.parse(users[user]); + userList[user] = item; + } + resolve(userList); + }, error => { + reject(error); + }); + } + }); + }; + + that.subscribeChannel = function (channel) { + subClient.subscribe(channel, function(e) { + log.debug("Subscribed channel:", channel, " with:", e); + }); + } + + var publishMsg = function (channel, type, module, id, data) { + var message = { + type: type, + module: module, + id: id, + self: selfRpcId, + data: data + }; + pubConnectState && pubClient.publish(channel, JSON.stringify(message), function(e) { + log.debug("Published message:", message, " to channel:", channel, " with e:", e); + }); + + } + + that.updateToRedis = function (type, module, name, id, data) { + log.debug("update to redis type:", type, " module:", module, " name:", name, " id:", id, " data", data); + var key = room_id + name; + if (type === 'add') { + addItem(key, id, data); + publishMsg(key, type, module, id, data); + } else { + delItem(key, id); + publishMsg(key, type, module, id); + } + } + + that.saveList = function (key, value) { + return new Promise((resolve, reject) => { + if (!storeConnectState) { + reject('saveList failed for redis disconnected'); + } else { + redisClient.lpush(key, value, function(err, data) { + if (err) { + log.error("lpush key:", key, "with value:", data, " error", err); + } + resolve(data); + }); + } + }); + } + + that.getList = (key) => { + return new Promise((resolve, reject) => { + if (!storeConnectState) { + log.error("getList failed for redis disconnected"); + resolve([]); + } else { + redisClient.lrange(key, 0, -1, function(err, data) { + if (err) { + log.error("lpush key:", key, "with value:", data, " error", err); + } + resolve(data); + }); + } + }); + } + + that.deleteList = (key, value) => { + return new Promise((resolve, reject) => { + if (!storeConnectState) { + reject('deleteList failed for redis disconnected'); + } else { + redisClient.lrem(key, 0, value, function(err, data) { + if (err) { + log.error("lrem key:", key, "with value:", data, " error", err); + } + resolve(data); + }); + } + }); + } + + that.cleanRoomdata = (roomid) => { + var key = roomid + "*"; + if (!storeConnectState) { + log.error('cleanRoomdata failed for redis disconnected'); + } else { + redisClient.keys(key, function(err, rows) { + log.info("rows are:", rows); + if (rows.length > 0) { + redisClient.del(rows); + } + }); + } + } + + if (subClient) { + subClient.on("message", function (channel, message) { + callback(channel, message); + }); + + subClient.on("error", function (err) { + log.error("redis sub client connection error:", err); + }); + } + + if (pubClient) { + pubClient.on("ready", function () { + pubConnectState = true; + log.info("redis pub client connection ready"); + }); + + pubClient.on("end", function (channel, message) { + pubConnectState = false; + log.error("redis pub client connection to server ended"); + }); + + pubClient.on("error", function (err) { + log.error("redis pub client connection error:", err); + }); + } + + redisClient.on("connect", function () { + log.info("redis client connected"); + }); + + redisClient.on("ready", function () { + storeConnectState = true; + log.info("redis client connection ready"); + }); + + redisClient.on("reconnecting", function () { + log.info("Try to reconnect to redis server"); + }); + + redisClient.on("error", function (err) { + log.error("redis client connection error:", err); + }); + + redisClient.on("end", function (channel, message) { + storeConnectState = false; + log.error("redis client connection to server ended"); + }); + + return that; +}; + diff --git a/source/agent/conference/roomController.js b/source/agent/conference/roomController.js index 052a6143a..5cc041c93 100644 --- a/source/agent/conference/roomController.js +++ b/source/agent/conference/roomController.js @@ -44,6 +44,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { room_id = spec.room, origin = spec.origin, selfRpcId = spec.selfRpcId, + redisClient = spec.redisClient, enable_audio_transcoding = config.transcoding && !!config.transcoding.audio, enable_video_transcoding = config.transcoding && !!config.transcoding.video, internal_conn_protocol = config.internalConnProtocol; @@ -123,6 +124,52 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { var mediaPreference = getMediaPreference(); + var loadFromRedis = function(moduleName, id) { + if (moduleName === "all") { + var terminalKey = room_id + "terminals"; + redisClient.subscribeChannel(terminalKey); + return redisClient.getItems(terminalKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + terminals = streamList; + } + return Promise.resolve('ok'); + }) + .then(function() { + var roomstreamsKey = room_id + "roomstreams"; + redisClient.subscribeChannel(roomstreamsKey); + return redisClient.getItems(roomstreamsKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + streams = streamList; + } + return Promise.resolve('ok'); + }); + }) + .then(function() { + var mixviewsKey = room_id + "mixviews"; + redisClient.subscribeChannel(mixviewsKey); + return redisClient.getItems(mixviewsKey) + .then(function(streamList){ + if (Object.keys(streamList).length > 0) { + mix_views = streamList; + } + return Promise.resolve('ok'); + }); + }).catch(function(err) { + log.error('Load data from redis failed, reason:', err); + return Promise.resolve(err); + }); + + } else if (id !== undefined) { + var key = room_id + moduleName; + return redisClient.getItem(key, id); + } else { + var key = room_id + moduleName; + return redisClient.getItems(key); + } + }; + // Length 20 number ID generator var randomId = function() { var length = 20; @@ -246,6 +293,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { mixer: video_mixer, supported_formats: supportedVideo.codecs }; + redisClient.updateToRedis("add", "roomcontroller", "mixviews", view, mix_views[view]); // Enable AV coordination if specified enableAVCoordination(view); @@ -261,6 +309,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { mix_views[view].video = { mixer: null, supported_formats: { encode: [], decode: [] } }; onInitOk(); } + redisClient.updateToRedis("add", "roomcontroller", "mixviews", view, mix_views[view]); }, function onAudioFail(reason) { onInitError(reason); @@ -285,6 +334,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { function onTerminalReady() { log.debug('new terminal ok. terminal_id', selectorId); terminals[selectorId].published = activeAudio.streams; + redisClient.updateToRedis("add", "roomcontroller", "terminals", selectorId, terminals[selectorId]); initMediaProcessor(selectorId, ['selecting', selectConfig, room_id, selfRpcId, 'placehodler']) .then(function(initMediaResult) { @@ -299,6 +349,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { video: undefined, spread: [] }; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", streamId, streams[streamId]); }); onOk(initMediaResult); }).catch(function(reason) { @@ -316,55 +367,61 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { var initialize = function (on_ok, on_error) { log.debug('initialize room', room_id); + return loadFromRedis("all") + .then(() => { + // Mix stream ID is room ID followed by view index + if (config.views.length > 0) { + // Mutiple views configuration + if (!Object.keys(mix_views).length > 0) { + var viewProcessed = []; + var errorReason; + + config.views.forEach(function(viewSettings) { + var viewLabel = viewSettings.label; + // Initialize mixer engine for each view + mix_views[viewLabel] = {}; + // Save view init promises + viewProcessed.push(new Promise(function(resolve, reject) { + initView(viewLabel, viewSettings, + function onOk() { + log.debug('init ok for view:', viewLabel); + resolve(viewLabel); + }, + function onError(reason) { + log.error('init fail. view:', viewLabel, 'reason:', reason); + errorReason = reason; + delete mix_views[viewLabel]; + redisClient.updateToRedis("delete", "roomcontroller", "mixviews", viewLabel); + resolve(null); + }); + })); + }); - // Mix stream ID is room ID followed by view index - if (config.views.length > 0) { - // Mutiple views configuration - var viewProcessed = []; - var errorReason; - - config.views.forEach(function(viewSettings) { - var viewLabel = viewSettings.label; - // Initialize mixer engine for each view - mix_views[viewLabel] = {}; - - // Save view init promises - viewProcessed.push(new Promise(function(resolve, reject) { - initView(viewLabel, viewSettings, - function onOk() { - log.debug('init ok for view:', viewLabel); - resolve(viewLabel); - }, - function onError(reason) { - log.error('init fail. view:', viewLabel, 'reason:', reason); - errorReason = reason; - delete mix_views[viewLabel]; - resolve(null); - }); - })); - }); - - Promise.all(viewProcessed).then(function(results) { - // Result for callback - var viewCount = results.filter(function(re) { return re !== null; }).length; - if (viewCount < results.length) { - log.debug("Views incomplete initialization", viewCount); - on_error(errorReason); - } else if (config.selectActiveAudio) { - initSelector(function onOk() { - on_ok(that); - }, on_error); + Promise.all(viewProcessed).then(function(results) { + // Result for callback + var viewCount = results.filter(function(re) { return re !== null; }).length; + if (viewCount < results.length) { + log.debug("Views incomplete initialization", viewCount); + on_error(errorReason); + } else if (config.selectActiveAudio) { + initSelector(function onOk() { + on_ok(that); + }, on_error); + } else { + on_ok(that); + } + }).catch(function(reason) { + log.error("Error initialize views:", reason); + on_error(reason); + }); } else { on_ok(that); } - }).catch(function(reason) { - log.error("Error initialize views:", reason); - on_error(reason); - }); - } else { - log.debug('Room disable mixing init ok'); - on_ok(that); - } + } else { + log.debug('Room disable mixing init ok'); + on_ok(that); + } + }); }; var deinitialize = function () { @@ -411,13 +468,17 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { return nodeLocality .then(function(locality) { - terminals[terminal_id] = { + var terminalInfo = { owner: owner, origin: origin, type: terminal_type, locality: locality, published: [], - subscribed: {}}; + subscribed: {} + }; + + terminals[terminal_id] = terminalInfo; + redisClient.updateToRedis("add", "roomcontroller", "terminals", terminal_id, terminalInfo); on_ok(); }, function(err) { on_error(err.message? err.message : err); @@ -443,6 +504,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { }); } delete terminals[terminal_id]; + redisClient.updateToRedis("delete", "roomcontroller", "terminals", terminal_id); } }; @@ -497,6 +559,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { } } streams[stream_id].spread.push({target: target_node, status: 'connecting', waiting: []}); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); var on_spread_failed = function(reason, cancel_sub, cancel_pub, cancel_out, cancel_in) { log.error('spreadStream failed, stream_id:', stream_id, 'reason:', reason); @@ -506,6 +569,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { e.onError(reason); }); streams[stream_id].spread.splice(i, 1); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); } if (cancel_sub) { makeRPC(rpcClient, original_node, 'unsubscribe', [spread_id]); @@ -614,6 +678,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { }); streams[stream_id].spread[i].waiting = []; }); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); on_ok(); return Promise.resolve('ok'); } else { @@ -639,6 +704,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { var i = streams[stream_id].spread.findIndex((s) => {return s.target === target_node;}); if (i !== -1) { streams[stream_id].spread.splice(i, 1); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); } makeRPC( @@ -685,7 +751,11 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spreadStream(stream_id, target_node, 'amixer', function() { if (terminals[audio_mixer] && streams[stream_id]) { terminals[audio_mixer].subscribed[spread_id] = {audio: stream_id}; - (streams[stream_id].audio.subscribers.indexOf(audio_mixer) < 0) && streams[stream_id].audio.subscribers.push(audio_mixer); + if (streams[stream_id].audio.subscribers.indexOf(audio_mixer) < 0) { + streams[stream_id].audio.subscribers.push(audio_mixer); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + } + redisClient.updateToRedis("add", "roomcontroller", "terminals", audio_mixer, terminals[audio_mixer]); on_ok(); if (streams[stream_id].audio.status === 'inactive') { makeRPC( @@ -713,8 +783,10 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spread_id = stream_id + '@' + target_node, i = streams[stream_id].audio.subscribers.indexOf(audio_mixer); delete terminals[audio_mixer].subscribed[spread_id]; + redisClient.updateToRedis("add", "roomcontroller", "terminals", audio_mixer, terminals[audio_mixer]); if (i > -1) { streams[stream_id].audio.subscribers.splice(i, 1); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); shrinkStream(stream_id, target_node); } } @@ -729,7 +801,12 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spreadStream(stream_id, target_node, 'vmixer', function() { if (terminals[video_mixer] && streams[stream_id]) { terminals[video_mixer].subscribed[spread_id] = {video: stream_id}; - (streams[stream_id].video.subscribers.indexOf(video_mixer) < 0) && streams[stream_id].video.subscribers.push(video_mixer); + redisClient.updateToRedis("add", "roomcontroller", "terminals", video_mixer, terminals[video_mixer]); + if (streams[stream_id].video.subscribers.indexOf(video_mixer) < 0) { + streams[stream_id].video.subscribers.push(video_mixer); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + } + on_ok(); if (streams[stream_id].video.status === 'inactive') { makeRPC( @@ -757,8 +834,10 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spread_id = stream_id + '@' + target_node, i = streams[stream_id].video.subscribers.indexOf(video_mixer); delete terminals[video_mixer].subscribed[spread_id]; + redisClient.updateToRedis("add", "roomcontroller", "terminals", video_mixer, terminals[video_mixer]); if (i > -1) { streams[stream_id].video.subscribers.splice(i, 1); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); shrinkStream(stream_id, target_node); } } @@ -818,6 +897,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { video: undefined, spread: []}; terminals[audio_mixer].published.push(stream_id); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", audio_mixer, terminals[audio_mixer]); } on_ok(stream_id); } else { @@ -855,6 +936,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { subscribers: []}, spread: []}; terminals[video_mixer].published.push(stream.id); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream.id, streams[stream.id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", video_mixer, terminals[video_mixer]); } on_ok(stream.id); } else { @@ -924,6 +1007,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spread: [] }; terminals[axcoder].published.push(stream_id); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", axcoder, terminals[axcoder]); } on_ok(stream_id); } else { @@ -986,6 +1071,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { if (terminals[axcoder]) { terminals[axcoder].subscribed[spread_id] = {audio: stream_id}; streams[stream_id].audio.subscribers.push(axcoder); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", axcoder, terminals[axcoder]); on_ok(axcoder); } else { shrinkStream(stream_id, target_node); @@ -1032,6 +1119,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spread: [] }; terminals[vxcoder].published.push(stream.id); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream.id, streams[stream.id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", vxcoder, terminals[vxcoder]); } on_ok(stream.id); } else { @@ -1100,6 +1189,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { if (terminals[vxcoder]) { terminals[vxcoder].subscribed[spread_id] = {video: stream_id}; streams[stream_id].video.subscribers.push(vxcoder); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", vxcoder, terminals[vxcoder]); on_ok(vxcoder); } else { shrinkStream(stream_id, target_node); @@ -1129,9 +1220,13 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { 'degenerate', [stream_id]); delete streams[stream_id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); - var i = terminals[owner].published.indexOf(stream_id); - i > -1 && terminals[owner].published.splice(i ,1); + var i = terminals[owner].published.indexOf(stream_id); + if (i > -1) { + terminals[owner].published.splice(i ,1); + redisClient.updateToRedis("add", "roomcontroller", "terminals", owner, terminals[owner]); + } if (terminals[owner].published.length === 0 && (terminals[owner].type === 'axcoder' || terminals[owner].type === 'vxcoder')) { for (var subscription_id in terminals[owner].subscribed) { @@ -1336,9 +1431,11 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { } removeSubscriptions(stream_id); terminals[terminal_id] && terminals[terminal_id].published.splice(i, 1); + redisClient.updateToRedis("add", "roomcontroller", "terminals", terminal_id, terminals[terminal_id]); } stream.close && stream.close(); delete streams[stream_id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); } else { log.info('try to unpublish an unexisting stream:', stream_id); } @@ -1379,6 +1476,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { } delete terminals[subscriber].subscribed[subscription_id]; + redisClient.updateToRedis("add", "roomcontroller", "terminals", subscriber, terminals[subscriber]); } else { log.info('try to unsubscribe to an unexisting terminal:', subscriber); } @@ -1429,6 +1527,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { }); streams[stream_id] && (streams[stream_id].video.subscribers = []); } + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); } }; @@ -1445,6 +1544,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { terminals[old_st.owner].subscribers = {}; streams[streamId].spread = []; streams[streamId].video.subscribers = []; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", streamId, streams[streamId]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", old_st.owner, terminals[old_st.owner]); return Promise.all(old_st.spread.map(function(target_node) { return new Promise(function (res, rej) { @@ -1479,6 +1580,9 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { }); } } + + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", streamId, streams[streamId]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", t_id, terminals[t_id]); } }); }) @@ -1492,6 +1596,42 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { }); }; + that.updateFromRedis = function(channel, msg) { + var name = channel.substring(room_id.length); + switch (name) { + case 'terminals': + if (msg.type === 'add') { + terminals[msg.id] = msg.data; + } else { + if (terminals[msg.id]) { + delete terminals[msg.id]; + } + } + break; + case 'roomstreams': + if (msg.type === 'add') { + streams[msg.id] = msg.data; + } else { + if (streams[msg.id]) { + delete streams[msg.id]; + } + } + break; + case 'mixviews': + if (msg.type === 'add') { + mix_views[msg.id] = msg.data; + } else { + if (mix_views[msg.id]) { + delete mix_views[msg.id]; + } + } + break; + default: + log.info('Invalid roomControl update items'); + } + + }; + that.publish = function (participantId, streamId, accessNode, streamInfo, streamType, on_ok, on_error) { log.debug('publish, participantId: ', participantId, 'streamId:', streamId, 'accessNode:', accessNode.node, 'streamInfo:', JSON.stringify(streamInfo), ' origin is:', origin); if (streams[streamId] === undefined) { @@ -1511,6 +1651,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spread: [] }; terminals[terminal_id].published.push(streamId); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", streamId, streams[streamId]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", terminal_id, terminals[terminal_id]); on_ok(); }, function (error_reason) { on_error(error_reason); @@ -1626,8 +1768,10 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { streams[streamId][kind].subscriber = streams[streamId][kind].subscribers || []; streams[streamId][kind].subscribers.push(terminal_id); terminals[terminal_id].subscribed[subscriptionId][kind] = streamId; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", streamId, streams[streamId]); } } + redisClient.updateToRedis("add", "roomcontroller", "terminals", terminal_id, terminals[terminal_id]); on_ok('ok'); //FIXME: It is better to notify subscription connection to request key-frame. @@ -1799,6 +1943,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { if ((config.views.length > 0) && (status === 'active' || status === 'inactive')) { if ((track === 'video' || track === 'av') && streams[stream_id] && streams[stream_id].video) { streams[stream_id].video.status = status; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + for (var view in mix_views) { var video_mixer = mix_views[view].video.mixer; if (video_mixer && terminals[video_mixer] && (streams[stream_id].video.subscribers.indexOf(video_mixer) >= 0)) { @@ -1813,6 +1959,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { } } else if ((track === 'audio' || track === 'av') && streams[stream_id] && streams[stream_id].audio) { streams[stream_id].audio.status = status; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); for (var view in mix_views) { var audio_mixer = mix_views[view].audio.mixer; if (audio_mixer && terminals[audio_mixer] && (streams[stream_id].audio.subscribers.indexOf(audio_mixer) >= 0)) { @@ -1959,8 +2106,11 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { spreadStream(stream_id, target_node, 'axcoder', function() { if (terminals[audio_selector] && streams[stream_id]) { terminals[audio_selector].subscribed[spread_id] = {audio: stream_id}; - (streams[stream_id].audio.subscribers.indexOf(audio_selector) < 0) && + redisClient.updateToRedis("add", "roomcontroller", "terminals", audio_selector, terminals[audio_selector]); + if (streams[stream_id].audio.subscribers.indexOf(audio_selector) < 0) { streams[stream_id].audio.subscribers.push(audio_selector); + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + } on_ok(); } else { shrinkStream(stream_id, target_node); @@ -2007,6 +2157,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { mixer: vmixerId, supported_formats: supportedVideo.codecs }; + redisClient.updateToRedis("add", "roomcontroller", "mixviews", view, mix_views[view]); } // Enable AV coordination if specified @@ -2066,6 +2217,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { terminals[t_id] && shrinkStream(st_id, terminals[t_id].locality.node); }); delete streams[st_id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", st_id, streams[st_id]); } }); terminals[vmixerId].published = []; @@ -2114,6 +2266,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { streams[stream_id].video.subscribers = streams[stream_id].video.subscribers || []; streams[stream_id].video.subscribers.push(t_id); terminals[t_id].subscribed[sub_id].video = stream_id; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", t_id, terminals[t_id]); }, function (reason) { log.warn('Failed in resuming video subscription. reason:', reason); }); @@ -2168,6 +2322,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { terminals[t_id] && shrinkStream(st_id, terminals[t_id].locality.node); }); delete streams[st_id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", st_id, streams[st_id]); } }); terminals[vxcoderId].published = []; @@ -2203,6 +2358,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { streams[stream_id].video.subscribers = streams[stream_id].video.subscribers || []; streams[stream_id].video.subscribers.push(t_id); terminals[t_id].subscribed[sub_id].video = stream_id; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", t_id, terminals[t_id]); }, function (reason) { log.warn('Failed in resuming video subscription. reason:', reason); }); @@ -2241,6 +2398,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { mixer: amixerId, supported_formats: supportedAudio.codecs }; + redisClient.updateToRedis("add", "roomcontroller", "mixviews", view, mix_views[view]); } // Enable AV coordination if specified @@ -2285,6 +2443,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { }); outputs.push(backup); delete streams[st_id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", st_id, streams[st_id]); } }); terminals[amixerId].published = []; @@ -2334,6 +2493,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { streams[stream_id].audio.subscribers = streams[stream_id].audio.subscribers || []; streams[stream_id].audio.subscribers.push(t_id); terminals[t_id].subscribed[sub_id].audio = stream_id; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", t_id, terminals[t_id]); }, function (reason) { log.warn('Failed in resuming video subscription. reason:', reason); }); @@ -2386,6 +2547,7 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { terminals[t_id] && shrinkStream(st_id, terminals[t_id].locality.node); }); delete streams[st_id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", st_id, streams[st_id]); } }); terminals[axcoderId].published = []; @@ -2421,6 +2583,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { streams[stream_id].audio.subscribers = streams[stream_id].audio.subscribers || []; streams[stream_id].audio.subscribers.push(t_id); terminals[t_id].subscribed[sub_id].audio = stream_id; + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", stream_id, streams[stream_id]); + redisClient.updateToRedis("add", "roomcontroller", "terminals", t_id, terminals[t_id]); }, function (reason) { log.warn('Failed in resuming audio subscription. reason:', reason); }); @@ -2529,12 +2693,14 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { for (const rid in streams[streamId].video.simulcast) { const simInfo = streams[streamId].video.simulcast[rid]; delete streams[simInfo.id]; + redisClient.updateToRedis("delete", "roomcontroller", "roomstreams", simInfo.id, streams[simInfo.id]); } }; } } else if (update.firstrid && streams[streamId].video) { streams[streamId].video.rid = update.firstrid; } + redisClient.updateToRedis("add", "roomcontroller", "roomstreams", streamId, streams[streamId]); log.debug('updated stream info', JSON.stringify(streams[streamId])); } }; diff --git a/source/agent/conference/rtcController.js b/source/agent/conference/rtcController.js index a938f51d9..85b7c0ae1 100644 --- a/source/agent/conference/rtcController.js +++ b/source/agent/conference/rtcController.js @@ -27,6 +27,18 @@ class Transport { this.locality = locality; this.state = PENDING; } + + getInfo() { + var info = { + id: this.id, + owner: this.owner, + origin: this.origin, + locality: this.locality, + state: this.state + } + + return info; + } } class Track { @@ -93,6 +105,20 @@ class Operation { } return opTrack; } + + getInfo() { + var info = { + id: this.id, + transport: this.transport, + transportId: this.transportId, + direction: this.direction, + tracks: this.tracks, + legacy: this.legacy, + attributes: this.attributes + } + + return info; + } } /* Events @@ -111,7 +137,7 @@ class RtcController extends EventEmitter { * onTransportSignaling, mediaOnOff, sendMsg * } */ - constructor(roomId, rpcReq, roomRpcId, clusterRpcId) { + constructor(roomId, rpcReq, roomRpcId, clusterRpcId, redis) { log.debug(`constructor ${roomId}, ${roomRpcId}, ${clusterRpcId}`); super(); this.roomId = roomId; @@ -124,6 +150,91 @@ class RtcController extends EventEmitter { this.operations = new Map(); // Map {publicTrackId => Track} this.tracks = new Map(); + + this.redis = redis; + } + + loadData() { + var operationKey = this.roomId + "rtcoperations"; + this.redis.subscribeChannel(operationKey); + this.redis.getItems(operationKey) + .then((streamList) => { + if (Object.keys(streamList).length > 0) { + for (let item in streamList) { + const op = new Operation(streamList[item].id, streamList[item].transport, streamList[item].direction, streamList[item].tracks, streamList[item].legacy, streamList[item].attributes); + this.operations.set(streamList[item].id, op); + } + } + return Promise.resolve('ok'); + }) + .then(() => { + var transportKey = this.roomId + "rtctransports"; + this.redis.subscribeChannel(transportKey); + return this.redis.getItems(transportKey) + .then((streamList) => { + if (Object.keys(streamList).length > 0) { + for (let item in streamList) { + const op = new Transport(streamList[item].id, streamList[item].owner, streamList[item].origin); + op.setup(streamList[item].locality); + this.transports.set(streamList[item].id, op); + } + } + return Promise.resolve('ok'); + }) + }) + .then(() => { + var trackKey = this.roomId + "rtctracks"; + this.redis.subscribeChannel(trackKey); + return this.redis.getItems(trackKey) + .then((streamList) => { + if (Object.keys(streamList).length > 0) { + for (let item in streamList) { + this.tracks.set(streamList[item].id, streamList[item]); + } + } + }) + }) + .catch(function(err) { + log.error('Load data from redis failed, reason:', err); + }); + } + + updateFromRedis(channel, msg) { + var name = channel.substring(this.roomId.length); + switch (name) { + case 'rtctransports': + if (msg.type === 'add') { + const op = new Transport(msg.data.id, msg.data.owner, msg.data.origin); + op.setup(msg.data.locality); + this.transports.set(msg.data.id, op); + } else { + if (this.transports.has(msg.id)) { + this.transports.delete(msg.id); + } + } + break + case 'rtcoperations': + if (msg.type === 'add') { + const op = new Operation(msg.data.id, msg.data.transport, msg.data.direction, msg.data.tracks, msg.data.legacy, msg.data.attributes); + this.operations.set(msg.data.id, op); + } else { + if (this.operations.has(msg.id)) { + this.operations.delete(msg.id); + } + } + break; + case 'rtctracks': + if (msg.type === 'add') { + this.tracks.set(msg.id, msg.data); + } else { + if (this.tracks.has(msg.id)) { + this.tracks.delete(msg.id); + } + } + break; + default: + log.info('Invalid roomControl update items'); + } } // Return Transport @@ -169,6 +280,8 @@ class RtcController extends EventEmitter { const locality = transport.locality; log.debug(`to recycleWorkerNode: ${locality} task:, ${transportId}`); const taskConfig = {room: this.roomId, task: transportId}; + this.transports.delete(transportId); + this.redis.updateToRedis("delete", "rtccontroller", "rtctransports", transportId); return this.rpcReq.recycleWorkerNode(locality.agent, locality.node, taskConfig) }).catch((e) => log.debug(`Failed to recycleWorkerNode ${locality}`)); } else { @@ -216,6 +329,7 @@ class RtcController extends EventEmitter { const newTrack = operation.updateTrack(info); if (newTrack) { this.tracks.set(info.trackId, newTrack); + this.redis.updateToRedis("add", "rtccontroller", "rtctracks", info.trackId, newTrack); } if (operation.state === COMPLETED) { log.warn('Unexpected order for track-added event'); @@ -239,6 +353,7 @@ class RtcController extends EventEmitter { // this.emit('session-aborted', operationId, abortData); // } this.tracks.delete(info.trackId); + this.redis.updateToRedis("delete", "rtccontroller", "rtctracks", info.trackId); } } else if (info.type === 'tracks-complete') { const operation = this.operations.get(info.operationId); @@ -258,8 +373,10 @@ class RtcController extends EventEmitter { log.debug(`getWorkerNode ${this.clusterRpcId}, ${taskConfig}, ${origin}`); const locality = await this.rpcReq.getWorkerNode( this.clusterRpcId, 'webrtc', taskConfig, origin); - this.transports.set(tId, new Transport(tId, ownerId, origin)); + var transport = new Transport(tId, ownerId, origin) + this.transports.set(tId, transport) this.transports.get(tId).setup(locality); + this.redis.updateToRedis("add", "rtccontroller", "rtctransports", tId, transport.getInfo()); } return this.transports.get(tId); } @@ -284,6 +401,7 @@ class RtcController extends EventEmitter { } const op = new Operation(sessionId, transport, direction, tracks, legacy, attributes); this.operations.set(sessionId, op); + this.redis.updateToRedis("add", "rtccontroller", "rtcoperations", sessionId, op.getInfo()); // Return promise for this operation const options = {transportId, tracks, controller: this.roomRpcId, owner: ownerId}; return this.rpcReq.initiate(locality.node, sessionId, 'webrtc', direction, options); @@ -307,6 +425,7 @@ class RtcController extends EventEmitter { const abortData = { direction: operation.direction, owner, reason }; this.emit('session-aborted', sessionId, abortData); this.operations.delete(sessionId); + this.redis.updateToRedis("delete", "rtccontroller", "rtcoperations", sessionId); } }) .catch(reason => { diff --git a/source/agent/conference/stream.js b/source/agent/conference/stream.js index 5d32bc622..2c4f7d958 100644 --- a/source/agent/conference/stream.js +++ b/source/agent/conference/stream.js @@ -84,6 +84,32 @@ class Stream { this.data = data; } + getStreamInfo() { + var stream = { + id: this.id, + type: this.type, + info: this.info, + media: this.media, + data: this.data + } + + if (this.locality) { + stream.locality = this.locality; + } + return stream; + } + + updateStream(streamInfo) { + this.id = streamInfo.id; + this.type = streamInfo.type; + this.info = streamInfo.info; + this.media = streamInfo.media; + this.data = streamInfo.data; + if (streamInfo.locality) { + this.locality = streamInfo.locality; + } + } + _upgradeMediaIfNeeded(media) { if (!media.tracks) { /* diff --git a/source/agent/conference/subscription.js b/source/agent/conference/subscription.js index e5271440a..5f751ca55 100644 --- a/source/agent/conference/subscription.js +++ b/source/agent/conference/subscription.js @@ -39,6 +39,28 @@ class Subscription { this.origin = null; } + getSubscriptionInfo() { + var subscription = { + id: this.id, + info: this.info, + locality: this.locality, + media: this.media, + data: this.data, + origin: this.origin + } + + return subscription; + } + + updateSubscription(spec) { + this.id = spec.id; + this.info = spec.info; + this.locality = spec.locality; + this.media = spec.media; + this.data = spec.data; + this.origin = spec.data; + } + _upgradeMediaIfNeeded(media) { if (!media) { return media; diff --git a/source/agent/connections.js b/source/agent/connections.js index fd0fd82d2..e0bfd7ea1 100644 --- a/source/agent/connections.js +++ b/source/agent/connections.js @@ -158,8 +158,12 @@ module.exports = function Connections () { if (message.purpose === 'conference') { for (var conn_id in connections) { if ((message.type === 'node' && message.id === connections[conn_id].controller) || (message.type === 'worker' && connections[conn_id].controller.startsWith(message.id))) { - log.error('Fault detected on controller (type:', message.type, 'id:', message.id, ') of connection:', conn_id , 'and remove it'); - that.removeConnection(conn_id); + if (message.tasks && message.tasks.length > 0) { + connections[conn_id].controller = message.tasks[Math.floor(Math.random() * message.tasks.length)]; + } else { + log.error('Fault detected on controller (type:', message.type, 'id:', message.id, ') of connection:', conn_id , 'and remove it'); + that.removeConnection(conn_id); + } } } } diff --git a/source/agent/index.js b/source/agent/index.js index def7c9c8b..424564ae4 100644 --- a/source/agent/index.js +++ b/source/agent/index.js @@ -55,6 +55,7 @@ var nodeManager = require('./nodeManager'); var amqper = require('./amqpClient')(); var rpcClient; var monitoringTarget; +var redisClient; var worker; var manager; @@ -88,6 +89,11 @@ var joinCluster = function (on_ok) { } }; + if (myPurpose === 'conference') { + var redis = require('./redisUtil'); + redisClient = redis.create(false); + } + worker = clusterWorker({ rpcClient: rpcClient, purpose: myPurpose, @@ -133,14 +139,13 @@ var init_manager = () => { prerunNodeNum: config.agent.prerunProcesses, maxNodeNum: config.agent.maxProcesses, reuseNode: reuseNode, + redis: redisClient, consumeNodeByRoom: consumeNodeByRoom }, spawnOptions, (nodeId, tasks) => { - monitoringTarget && monitoringTarget.notify('abnormal', {purpose: myPurpose, id: nodeId, type: 'node'}); - tasks.forEach(() => { - worker && worker.removeTask(task); - }); + log.info("monitor abnormal purpose:", myPurpose, " id:", nodeId, " updateId:", tasks, " parentId:", myId); + monitoringTarget && monitoringTarget.notify('abnormal', {purpose: myPurpose, id: nodeId, tasks: tasks, type: 'node'}); }, (task) => { worker && worker.addTask(task); @@ -223,7 +228,9 @@ amqper.connect(config.rabbit, function () { ['SIGINT', 'SIGTERM'].map(function (sig) { process.on(sig, async function () { log.warn('Exiting on', sig); - manager && manager.dropAllNodes(true); + if (manager) { + await manager.dropAllNodes(true); + } worker && worker.quit(); try { await amqper.disconnect(); diff --git a/source/agent/nodeManager.js b/source/agent/nodeManager.js index 45f0bc6a7..e115292c9 100644 --- a/source/agent/nodeManager.js +++ b/source/agent/nodeManager.js @@ -77,12 +77,23 @@ module.exports = function (spec, spawnOptions, onNodeAbnormallyQuit, onTaskAdded child.on('close', function (code, signal) { log.debug('Node', id, 'exited with code:', code, 'signal:', signal); if (code !== 0) { - log.info('Node', id, 'is closed on unexpected code:', code); + log.info('Node', id, 'is closed on unexpected code:', code, ", signal:", signal); } if (processes[id]) { + if (spec.redis) { + var roomid = Object.keys(tasks[id])[0]; + spec.redis.deleteList(roomid, id); + spec.redis.getList(roomid) + .then(function(data) { + log.info("Get room list", data); + onNodeAbnormallyQuit && onNodeAbnormallyQuit(id, data); + cleanupNode(id); + }); + } else { onNodeAbnormallyQuit && onNodeAbnormallyQuit(id, tasksOnNode(id)); cleanupNode(id); + } } try { @@ -117,9 +128,19 @@ module.exports = function (spec, spawnOptions, onNodeAbnormallyQuit, onTaskAdded child.alive_count = 0; child.check_alive_interval = setInterval(function() { if (child.READY && (child.alive_count === 0)) { - log.info('Node(', id, ') is no longer responsive!'); + log.info('Node(', id, ') is no longer responsive!, report tasks keys:', tasks); + if (spec.redis) { + var roomid = Object.keys(tasks[id])[0]; + spec.redis.deleteList(roomid, id); + spec.redis.getList(roomid) + .then(function(data) { + onNodeAbnormallyQuit && onNodeAbnormallyQuit(id, data); + cleanupNode(id); + }); + } else { onNodeAbnormallyQuit && onNodeAbnormallyQuit(id, tasksOnNode(id)); - dropNode(id); + cleanupNode(id); + } } child.alive_count = 0; }, 3000); @@ -314,10 +335,29 @@ module.exports = function (spec, spawnOptions, onNodeAbnormallyQuit, onTaskAdded that.dropAllNodes = function(quietly) { spawn_failed = true; - Object.keys(processes).map(function (k) { - !quietly && onNodeAbnormallyQuit && onNodeAbnormallyQuit(k, tasksOnNode(k)); + var count = 0; + var length = Object.keys(processes).length; + return Promise.all( + Object.keys(processes).map(async (k) => { + if(Object.keys(tasks[k]).length > 0) { + if (spec.redis) { + var roomid = Object.keys(tasks[k])[0]; + log.info("Get room id:", roomid); + spec.redis.deleteList(roomid, k); + const data = await spec.redis.getList(roomid); + log.info("Get room list", data); + onNodeAbnormallyQuit && onNodeAbnormallyQuit(k, data); + if (data.length == 0) { + //Clean redis data for room since there is no connected conference node + spec.redis.cleanRoomdata(roomid); + } + } else { + !quietly && onNodeAbnormallyQuit && onNodeAbnormallyQuit(k, tasksOnNode(k)); + } + } dropNode(k); - }); + }) + ) }; fillNodes(); diff --git a/source/agent/sip/index.js b/source/agent/sip/index.js index a48654087..ca62ecaf6 100644 --- a/source/agent/sip/index.js +++ b/source/agent/sip/index.js @@ -976,11 +976,15 @@ module.exports = function (rpcC, selfRpcId, parentRpcId, clusterWorkerIP) { for (var client_id in calls) { if (calls[client_id].conference_controller && ((message.type === 'node' && message.id === calls[client_id].conference_controller) || (message.type === 'worker' && calls[client_id].conference_controller.startsWith(message.id)))){ - log.error('Fault detected on conference_controller:', message.id, 'of call:', client_id , ', terminate it'); - gateway.hangup(calls[client_id].peerURI); - teardownCall(client_id); - calls[client_id].conn && calls[client_id].conn.close(); - delete calls[client_id]; + if (message.tasks && message.tasks.length > 0) { + calls[client_id].conference_controller = message.tasks[Math.floor(Math.random() * message.tasks.length)]; + } else { + log.error('Fault detected on conference_controller:', message.id, 'of call:', client_id , ', terminate it'); + gateway.hangup(calls[client_id].peerURI); + teardownCall(client_id); + calls[client_id].conn && calls[client_id].conn.close(); + delete calls[client_id]; + } } } } diff --git a/source/agent/video/index.js b/source/agent/video/index.js index 0e1be5951..007bcb6a3 100644 --- a/source/agent/video/index.js +++ b/source/agent/video/index.js @@ -440,9 +440,13 @@ function VMixer(rpcClient, clusterIP) { if (message.purpose === 'conference' && controller) { if ((message.type === 'node' && message.id === controller) || (message.type === 'worker' && controller.startsWith(message.id))) { - log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.'); - that.deinit(); - process.exit(); + if (message.tasks && message.tasks.length > 0) { + controller = message.tasks[Math.floor(Math.random() * message.tasks.length)]; + } else { + log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.'); + that.deinit(); + process.exit(); + } } } }; @@ -909,9 +913,13 @@ function VTranscoder(rpcClient, clusterIP) { if (message.purpose === 'conference' && controller) { if ((message.type === 'node' && message.id === controller) || (message.type === 'worker' && controller.startsWith(message.id))) { - log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.'); - that.deinit(); - process.exit(); + if (message.tasks && message.tasks.length > 0) { + controller = message.tasks[Math.floor(Math.random() * message.tasks.length)]; + } else { + log.error('Conference controller (type:', message.type, 'id:', message.id, ') fault is detected, exit.'); + that.deinit(); + process.exit(); + } } } }; diff --git a/source/cluster_manager/clusterManager.js b/source/cluster_manager/clusterManager.js index 1e414f82d..e307581c5 100644 --- a/source/cluster_manager/clusterManager.js +++ b/source/cluster_manager/clusterManager.js @@ -58,8 +58,11 @@ var ClusterManager = function (clusterName, selfId, spec) { var workerQuit = function (worker) { log.debug('workerQuit, worker:', worker); if (workers[worker] && schedulers[workers[worker].purpose]) { + var task = schedulers[workers[worker].purpose].getInfo(worker).tasks; + //var task = Object.keys(tasks); + log.info("WorkerQuit, worker:", worker, " task:",task); schedulers[workers[worker].purpose].remove(worker); - monitoringTarget && monitoringTarget.notify('quit', {purpose: workers[worker].purpose, id: worker, type: 'worker'}); + //monitoringTarget && monitoringTarget.notify('quit', {purpose: workers[worker].purpose, task: task, id: worker, type: 'worker'}); delete workers[worker]; data_synchronizer && data_synchronizer({type: 'worker_quit', payload: {worker: worker}}); } @@ -90,16 +93,16 @@ var ClusterManager = function (clusterName, selfId, spec) { data_synchronizer && data_synchronizer({type: 'worker_pickup', payload: {worker: worker, tasks: tasks}}); }; - var layDownTask = function (worker, task) { - workers[worker] && schedulers[workers[worker].purpose] && schedulers[workers[worker].purpose].layDownTask(worker, task); - data_synchronizer && data_synchronizer({type: 'worker_laydown', payload: {worker: worker, task: task}}); + var layDownTask = function (worker, task, time) { + workers[worker] && schedulers[workers[worker].purpose] && schedulers[workers[worker].purpose].layDownTask(worker, task, time); + data_synchronizer && data_synchronizer({type: 'worker_laydown', payload: {worker: worker, task: task, time: time}}); }; var schedule = function (purpose, task, preference, reserveTime, on_ok, on_error) { log.debug('schedule, purpose:', purpose, 'task:', task, ', preference:', preference, 'reserveTime:', reserveTime, 'while state:', state); if (state === 'in-service') { if (schedulers[purpose]) { - schedulers[purpose].schedule(task, preference, reserveTime, function(worker, info) { + schedulers[purpose].schedule(purpose, task, preference, reserveTime, function(worker, info) { log.debug('schedule OK, got worker', worker); on_ok(worker, info); data_synchronizer && data_synchronizer({type: 'scheduled', payload: {purpose: purpose, task: task, worker: worker, reserve_time: reserveTime}}); @@ -159,6 +162,14 @@ var ClusterManager = function (clusterName, selfId, spec) { } }; + var getScheduledWorkers = function (purpose, task, on_ok) { + if (schedulers[purpose]) { + schedulers[purpose].getScheduledWorkers(task, on_ok); + } else { + on_ok([]); + } + } + var getTasks = function (worker, on_ok) { return workers[worker] && schedulers[workers[worker].purpose] ? schedulers[workers[worker].purpose].getTasks(worker) : []; }; @@ -304,6 +315,11 @@ var ClusterManager = function (clusterName, selfId, spec) { }, function (error_reason) { callback('callback', 'error', error_reason); }); + }, + getScheduledWorkers: function (purpose, task, callback) { + getScheduledWorkers(purpose, task, function (workerList) { + callback('callback', workerList); + }); } }; diff --git a/source/cluster_manager/scheduler.js b/source/cluster_manager/scheduler.js index 13d7f2f66..16b34e187 100644 --- a/source/cluster_manager/scheduler.js +++ b/source/cluster_manager/scheduler.js @@ -33,43 +33,58 @@ exports.Scheduler = function(spec) { schedule_reserve_time = spec.scheduleReserveTime; var reserveWorkerForTask = function (task, worker, time) { + log.info("reserveWorkerForTask task is:", task, " worker is:", worker); if (tasks[task]) { - tasks[task].reserve_timer && clearTimeout(tasks[task].reserve_timer); + if(tasks[task][worker]) { + tasks[task][worker].reserve_timer && clearTimeout(tasks[task][worker].reserve_timer); + } + } else { + tasks[task] = {}; } - tasks[task] = {worker: worker, - reserve_time: time, - reserve_timer: setTimeout(function () {repealTask(task);}, time)}; + tasks[task][worker] = { + id: worker, + reserve_time: time, + reserve_timer: setTimeout(function () {repealTask(task, worker);}, time)}; + log.info("reserveWorkerForTask tasks is:", tasks); }; - var repealTask = function (task) { - if (tasks[task]) { - tasks[task].reserve_timer && clearTimeout(tasks[task].reserve_timer); - delete tasks[task]; + var repealTask = function (task, worker) { + if (tasks[task] && tasks[task][worker]) { + log.info("repeal task:", task, " worker:", worker, " tasks:", tasks, " tasks[task][worker]:", tasks[task][worker]); + tasks[task][worker].reserve_timer && clearTimeout(tasks[task][worker].reserve_timer); + delete tasks[task][worker]; } }; var executeTask = function (worker, task) { + log.info("executeTask task is:", task, " worker is:", worker); + log.info("executeTask before tasks is:", tasks); if (workers[worker]) { if (workers[worker].tasks.indexOf(task) === -1) { workers[worker].tasks.push(task); } if (tasks[task]) { - tasks[task].reserve_timer && clearTimeout(tasks[task].reserve_timer); - - if (tasks[task].worker !== worker) { - log.warn('Worker conflicts for task:', task, 'and update to worker:', worker); - tasks[task].worker = worker; + if(tasks[task][worker]) { + tasks[task][worker].reserve_timer && clearTimeout(tasks[task][worker].reserve_timer); + } else{ + tasks[task][worker] = { + id: worker, + reserve_time: schedule_reserve_time}; } } else { - tasks[task] = {reserve_time: schedule_reserve_time, - worker: worker}; + tasks[task] = {}; + tasks[task][worker] = { + id: worker, + reserve_time: schedule_reserve_time}; } + + log.info("executeTask after tasks is:", tasks); } }; - var cancelExecution = function (worker, task) { + var cancelExecution = function (worker, task, time) { var needToReserve = false; if (workers[worker]) { @@ -81,16 +96,21 @@ exports.Scheduler = function(spec) { } if (tasks[task]) { + if (!time) { + repealTask(task, worker); + } else { if (needToReserve) { + log.info("cancel execution worker:", worker, " task:", task); reserveWorkerForTask(task, worker, tasks[task].reserve_time); } else { - repealTask(task); + repealTask(task, worker); } + } } }; var isTaskInExecution = function (task, worker) { - return tasks[task] && workers[worker] && worker === tasks[task].worker && workers[worker].tasks.indexOf(task) !== -1; + return tasks[task] && workers[worker] && tasks[task][worker] && workers[worker].tasks.indexOf(task) !== -1; }; that.add = function (worker, info) { @@ -109,8 +129,8 @@ exports.Scheduler = function(spec) { if (workers[worker]) { workers[worker].tasks.map(function (task) { - if (tasks[task] && tasks[task].worker === worker) { - repealTask(task); + if (tasks[task] && tasks[task][worker]) { + delete tasks[task][worker]; } }); delete workers[worker]; @@ -131,26 +151,30 @@ exports.Scheduler = function(spec) { } }; - that.layDownTask = function (worker, task) { + that.layDownTask = function (worker, task, time) { + log.info("lay down task:", worker, " task:", task, " time:", time); if (workers[worker]) { - cancelExecution(worker, task); + cancelExecution(worker, task, time); + } }; - that.schedule = function (task, preference, reserveTime, on_ok, on_error) { - if (tasks[task]) { - var newReserveTime = reserveTime && tasks[task].reserve_time < reserveTime ? reserveTime : tasks[task].reserve_time, - worker = tasks[task].worker; - - if (workers[worker]) { - if (isTaskInExecution(task, worker)) { - tasks[task].reserve_time = newReserveTime; + that.schedule = function (purpose, task, preference, reserveTime, on_ok, on_error) { + if(purpose !== 'conference') { + if (tasks[task]) { + var newReserveTime = reserveTime && tasks[task].reserve_time < reserveTime ? reserveTime : tasks[task].reserve_time, + worker = Object.keys(tasks[task])[0]; + + if (workers[worker]) { + if (isTaskInExecution(task, worker)) { + tasks[task][worker].reserve_time = newReserveTime; + } else { + reserveWorkerForTask(task, worker, newReserveTime); + } + return on_ok(worker, workers[worker].info); } else { - reserveWorkerForTask(task, worker, newReserveTime); + repealTask(task, worker); } - return on_ok(worker, workers[worker].info); - } else { - repealTask(task); } } @@ -171,15 +195,17 @@ exports.Scheduler = function(spec) { return on_error('No worker matches the preference.'); } else { strategy.allocate(workers, candidates, function (worker) { - reserveWorkerForTask(task, worker, (reserveTime && reserveTime > 0 ? reserveTime : schedule_reserve_time)); + if (!isTaskInExecution(task, worker)) { + reserveWorkerForTask(task, worker, (reserveTime && reserveTime > 0 ? reserveTime : schedule_reserve_time)); + } on_ok(worker, workers[worker].info); }, on_error); } }; that.unschedule = function (worker, task) { - if (tasks[task] && tasks[task].worker === worker) { - repealTask(task); + if (tasks[task] && tasks[task][worker]) { + repealTask(task, worker); } }; @@ -187,20 +213,40 @@ exports.Scheduler = function(spec) { return workers[worker]; }; + that.getScheduledWorkers = function (task, on_ok) { + var worker = []; + if(tasks[task]) { + worker = Object.keys(tasks[task]); + } + log.info("get scheduled workers:", worker, " for task:", task); + on_ok(worker); + }; + that.getScheduled = function (task, on_ok, on_error) { - tasks[task] ? on_ok(tasks[task].worker) : on_error('No such a task'); + if(tasks[task]) { + var worker = Object.keys(tasks[task]); + if(worker.length >1) { + var index = Math.floor(Math.random() * worker.length); + on_ok(tasks[task][worker[index]].id); + } else { + var key = worker[0]; + log.info("getScheduled tasks:", tasks, " key:", key); + key && on_ok(tasks[task][key].id); + } + + } else { + on_error('No such a task'); + } }; that.setScheduled = function (task, worker, reserveTime) { - tasks[task] = {worker: worker, - reserve_time: reserveTime}; + tasks[task][worker] = {reserve_time: reserveTime}; }; that.getData = function () { var data = {workers: workers, tasks: {}}; for (var task in tasks) { - data.tasks[task] = {reserve_time: tasks[task].reserve_time, - worker: tasks[task].worker}; + data.tasks[task] = tasks[task]; } return data; }; @@ -216,7 +262,8 @@ exports.Scheduler = function(spec) { for (var task in tasks) { var worker = tasks[task].worker; if (workers[worker] && (workers[worker].tasks.indexOf(task) === -1)) { - reserveWorkerForTask(task, worker, tasks[task].reserve_time); + log.info("serve worker:", worker, " task:", task); + reserveWorkerForTask(task, worker, tasks[task][worker].reserve_time); } } }; diff --git a/source/common/amqpClient.js b/source/common/amqpClient.js index de09f0745..6db5baf73 100644 --- a/source/common/amqpClient.js +++ b/source/common/amqpClient.js @@ -359,7 +359,7 @@ class Monitor { log.debug('Received monitoring message:', msg); this.onMessage && this.onMessage(msg); } catch (error) { - log.error('Error processing monitored message:', msg, 'and error:', error); + log.error('Error processing monitored message:', rawMessage, 'and error:', error); } }, {noAck: true}); }).then((ok) => { diff --git a/source/portal/index.js b/source/portal/index.js index ad32d1e37..80b2dd897 100644 --- a/source/portal/index.js +++ b/source/portal/index.js @@ -260,13 +260,17 @@ amqper.connect(config.rabbit, function () { if (data.reason === 'abnormal' || data.reason === 'error' || data.reason === 'quit') { if (portal !== undefined) { if (data.message.purpose === 'conference') { - return portal.getParticipantsByController(data.message.type, data.message.id) - .then(function (impactedParticipants) { - impactedParticipants.forEach(function(participantId) { - log.error('Fault on conference controller(type:', data.message.type, 'id:', data.message.id, ') of participant', participantId, 'was detected, drop it.'); - socketio_server && socketio_server.drop(participantId); + if(data.message.tasks.length > 0) { + return portal.updateRoomcontroller(data.message); + } else { + return portal.getParticipantsByController(data.message.type, data.message.id) + .then(function (impactedParticipants) { + impactedParticipants.forEach(function(participantId) { + log.error('Fault on conference controller(type:', data.message.type, 'id:', data.message.id, ') of participant', participantId, 'was detected, drop it.'); + socketio_server && socketio_server.drop(participantId); + }); }); - }); + } } } } diff --git a/source/portal/portal.js b/source/portal/portal.js index a6a78d644..fc731f44c 100644 --- a/source/portal/portal.js +++ b/source/portal/portal.js @@ -202,7 +202,7 @@ var Portal = function(spec, rpcReq) { }; that.unsubscribe = function(participantId, subscriptionId) { - log.debug('unsubscribe, participantId:', participantId, 'subscriptionId:', subscriptionId); + log.debug('unsubscribe, participantId:', participantId, 'subscriptionId:', subscriptionId, " controller:", participants[participantId].controller); if (participants[participantId] === undefined) { return Promise.reject('Participant has NOT joined'); } @@ -253,6 +253,18 @@ var Portal = function(spec, rpcReq) { return Promise.resolve(result); }; + that.updateRoomcontroller = function (message) { + for (var participant_id in participants) { + log.info("participant:", participant_id, "message:", message); + if ((message.type === 'node' && message.id === participants[participant_id].controller) || (message.type === 'worker' && participants[participant_id].controller.startsWith(message.id))) { + var newcontroller = message.tasks[Math.floor(Math.random() * message.tasks.length)]; + log.info("Replace with newcontroller:, ", newcontroller); + participants[participant_id].controller = newcontroller; + } + } + return Promise.resolve('ok'); + } + return that; };