diff --git a/meet/server/.eslintrc.json b/meet/server/.eslintrc.json index 5193d1cb..652f7f40 100644 --- a/meet/server/.eslintrc.json +++ b/meet/server/.eslintrc.json @@ -1,25 +1,25 @@ { "env": { "es6": true, "node": true }, "extends": [ "eslint:recommended" ], "settings": {}, "parserOptions": { "ecmaVersion": 2018, "sourceType": "module", "ecmaFeatures": { "impliedStrict": true } }, "rules": { - // "brace-style": ["error", "1tbs"] - // "indent": ["error", 4] + "brace-style": ["error", "1tbs"], + "indent": ["error", 4] } } diff --git a/meet/server/lib/Logger.js b/meet/server/lib/Logger.js index 73901899..0f0ce752 100644 --- a/meet/server/lib/Logger.js +++ b/meet/server/lib/Logger.js @@ -1,53 +1,44 @@ const debug = require('debug'); const APP_NAME = 'kolabmeet-server'; -class Logger -{ - constructor(prefix) - { - if (prefix) - { +class Logger { + constructor(prefix) { + if (prefix) { this._debug = debug(`${APP_NAME}:${prefix}`); this._info = debug(`${APP_NAME}:INFO:${prefix}`); this._warn = debug(`${APP_NAME}:WARN:${prefix}`); this._error = debug(`${APP_NAME}:ERROR:${prefix}`); - } - else - { + } else { this._debug = debug(APP_NAME); this._info = debug(`${APP_NAME}:INFO`); this._warn = debug(`${APP_NAME}:WARN`); this._error = debug(`${APP_NAME}:ERROR`); } /* eslint-disable no-console */ this._debug.log = console.info.bind(console); this._info.log = console.info.bind(console); this._warn.log = console.warn.bind(console); this._error.log = console.error.bind(console); /* eslint-enable no-console */ } - get debug() - { + get debug() { return this._debug; } - get info() - { + get info() { return this._info; } - get warn() - { + get warn() { return this._warn; } - get error() - { + get error() { return this._error; } } module.exports = Logger; diff --git a/meet/server/lib/Peer.js b/meet/server/lib/Peer.js index 05813a2a..dc4da5f8 100644 --- a/meet/server/lib/Peer.js +++ b/meet/server/lib/Peer.js @@ -1,292 +1,249 @@ const EventEmitter = require('events').EventEmitter; const Logger = require('./Logger'); const Roles = require('./userRoles'); const logger = new Logger('Peer'); -class Peer extends EventEmitter -{ - constructor({ id, roomId }) - { +class Peer extends EventEmitter { + constructor({ id, roomId }) { logger.info('constructor() [id:"%s"]', id); super(); this._id = id; this._roomId = roomId; this._socket = null; this._closed = false; this._role = 0; this._nickname = false; this._picture = null; this._routerId = null; this._rtpCapabilities = null; this._raisedHand = false; this._transports = new Map(); this._producers = new Map(); this._consumers = new Map(); } - close() - { + close() { logger.info('close()'); this._closed = true; // Iterate and close all mediasoup Transport associated to this Peer, so all // its Producers and Consumers will also be closed. - for (const transport of this.transports.values()) - { + for (const transport of this.transports.values()) { transport.close(); } if (this.socket) this.socket.disconnect(true); this.emit('close'); } - get id() - { + get id() { return this._id; } - set id(id) - { + set id(id) { this._id = id; } - get roomId() - { + get roomId() { return this._roomId; } - set roomId(roomId) - { + set roomId(roomId) { this._roomId = roomId; } - get socket() - { + get socket() { return this._socket; } - set socket(socket) - { + set socket(socket) { this._socket = socket; - if (this.socket) - { - this.socket.on('disconnect', () => - { + if (this.socket) { + this.socket.on('disconnect', () => { if (this.closed) return; logger.debug('"disconnect" event [id:%s]', this.id); this.close(); }); } } - get closed() - { + get closed() { return this._closed; } - get role() - { + get role() { return this._role; } - get nickname() - { + get nickname() { return this._nickname; } - set nickname(nickname) - { - if (nickname !== this._nickname) - { + set nickname(nickname) { + if (nickname !== this._nickname) { this._nickname = nickname; this.emit('nicknameChanged', {}); } } - get picture() - { + get picture() { return this._picture; } - set picture(picture) - { - if (picture !== this._picture) - { + set picture(picture) { + if (picture !== this._picture) { const oldPicture = this._picture; this._picture = picture; this.emit('pictureChanged', { oldPicture }); } } - get routerId() - { + get routerId() { return this._routerId; } - set routerId(routerId) - { + set routerId(routerId) { this._routerId = routerId; } - get rtpCapabilities() - { + get rtpCapabilities() { return this._rtpCapabilities; } - set rtpCapabilities(rtpCapabilities) - { + set rtpCapabilities(rtpCapabilities) { this._rtpCapabilities = rtpCapabilities; } - get raisedHand() - { + get raisedHand() { return this._raisedHand; } - set raisedHand(raisedHand) - { + set raisedHand(raisedHand) { this._raisedHand = raisedHand; } - get transports() - { + get transports() { return this._transports; } - get producers() - { + get producers() { return this._producers; } - get consumers() - { + get consumers() { return this._consumers; } - setRole(newRole) - { + setRole(newRole) { if (this._role != newRole) { // It is either screen sharing or publisher/subscriber if (newRole & Roles.SCREEN) { if (newRole & Roles.PUBLISHER) { newRole ^= Roles.PUBLISHER; } if (newRole & Roles.SUBSCRIBER) { newRole ^= Roles.SUBSCRIBER; } } this._role = newRole; this.emit('gotRole', { newRole }); } } - isValidRole(newRole) - { + isValidRole(newRole) { Object.keys(Roles).forEach(roleId => { const role = Roles[roleId] if (newRole & role) { newRole = newRole ^ role; } }) return newRole == 0; } - hasRole(role) - { + hasRole(role) { return !!(this._role & role); } - addTransport(id, transport) - { + addTransport(id, transport) { this.transports.set(id, transport); } - getTransport(id) - { + getTransport(id) { return this.transports.get(id); } - getConsumerTransport() - { + getConsumerTransport() { return Array.from(this.transports.values()) .find((t) => t.appData.consuming); } - removeTransport(id) - { + removeTransport(id) { this.transports.delete(id); } - addProducer(id, producer) - { + addProducer(id, producer) { this.producers.set(id, producer); } - getProducer(id) - { + getProducer(id) { return this.producers.get(id); } - removeProducer(id) - { + removeProducer(id) { this.producers.delete(id); } - addConsumer(id, consumer) - { + addConsumer(id, consumer) { this.consumers.set(id, consumer); } - getConsumer(id) - { + getConsumer(id) { return this.consumers.get(id); } - removeConsumer(id) - { + removeConsumer(id) { this.consumers.delete(id); } - get peerInfo() - { + get peerInfo() { const peerInfo = { id: this.id, nickname: this.nickname, // picture: this.picture, role: this.role, raisedHand: this.raisedHand }; return peerInfo; } } module.exports = Peer; diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index 52d44b24..fdcd34bf 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1269 +1,1162 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); const axios = require('axios'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const Roles = require('./userRoles'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; -class Room extends EventEmitter -{ +class Room extends EventEmitter { - static calculateLoads(mediasoupWorkers, peers, mediasoupRouters) - { + static calculateLoads(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); // Calculate router loads by adding up peers per router, and collected piped routers - for (const peer of peers) - { + for (const peer of peers) { const routerId = peer.routerId; - if (routerId) - { - if (mediasoupRouters.has(routerId)) - { + if (routerId) { + if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } - if (routerLoads.has(routerId)) - { + if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); - } - else - { + } else { routerLoads.set(routerId, 1); } } } // Calculate worker loads by adding up router loads per worker - for (const worker of mediasoupWorkers) - { - for (const router of worker._routers) - { + for (const worker of mediasoupWorkers) { + for (const router of worker._routers) { const routerId = router._internal.routerId; - if (workerLoads.has(worker._pid)) - { + if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); - } - else - { + } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } return {routerLoads, workerLoads, pipedRoutersIds}; } /* * Find a router that is on a worker that is least loaded. * * A worker with a router that we are already piping to is preferred. */ - static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) - { + static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const {routerLoads, workerLoads, pipedRoutersIds} = Room.calculateLoads(mediasoupWorkers, peers.values(), mediasoupRouters); const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || - pipedRoutersIds.size === mediasoupRouters.size) - { + pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; - for (const worker of mediasoupWorkers) - { - if (worker._pid === workerId) - { - for (const router of worker._routers) - { + for (const worker of mediasoupWorkers) { + if (worker._pid === workerId) { + for (const router of worker._routers) { const routerId = router._internal.routerId; - if (mediasoupRouters.has(routerId)) - { + if (mediasoupRouters.has(routerId)) { return routerId; } } } } - } - else - { + } else { // find if there is a piped router that is on a worker that is below limit - for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) - { - for (const worker of mediasoupWorkers) - { - if (worker._pid === workerId) - { - for (const router of worker._routers) - { + for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { + for (const worker of mediasoupWorkers) { + if (worker._pid === workerId) { + for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && - workerLoad < ROUTER_SCALE_SIZE) - { + workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; - for (const worker of mediasoupWorkers) - { - if (worker._pid === workerId) - { - for (const router of worker._routers) - { + for (const worker of mediasoupWorkers) { + if (worker._pid === workerId) { + for (const router of worker._routers) { const routerId = router._internal.routerId; - if (mediasoupRouters.has(routerId)) - { + if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ - static async create({ mediasoupWorkers, roomId, peers }) - { + static async create({ mediasoupWorkers, roomId, peers }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); - for (const worker of mediasoupWorkers) - { + for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers - }) - { + }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); this._peers = peers; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; } - dumpStats() - { + dumpStats() { const peers = this.getPeers(); const {routerLoads, workerLoads, pipedRoutersIds} = Room.calculateLoads(this._mediasoupWorkers, peers, this._mediasoupRouters); let stats = { numberOfWorkers: this._mediasoupWorkers.length, numberOfRouters: this._mediasoupRouters.size, numberOfPeers: peers.length, routerLoads: routerLoads, workerLoads: workerLoads, pipedRoutersIds: pipedRoutersIds, }; console.log(stats); } - close() - { + close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; // Close the peers. for (const peer of this._peers.values()) { if (!peer.closed) peer.close(); } this._peers = {}; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._mediasoupWorkers = null; this._mediasoupRouters.clear(); this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } - handlePeer({ peer }) - { + handlePeer({ peer }) { logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role); // Should not happen - if (this._peers[peer.id]) - { + if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } this._peerJoining(peer); } - logStatus() - { + logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } - dump() - { + dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } - get id() - { + get id() { return this._roomId; } - selfDestructCountdown() - { + selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); clearTimeout(this._selfDestructTimeout); - this._selfDestructTimeout = setTimeout(() => - { + this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; - if (this.checkEmpty()) - { + if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); - } - else + } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } - checkEmpty() - { + checkEmpty() { return Object.keys(this._peers).length === 0; } - _peerJoining(peer) - { - this._queue.push(async () => - { + _peerJoining(peer) { + this._queue.push(async () => { peer.socket.join(this._roomId); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); let iceServers; - if ('turnAPIURI' in config) - { - try - { + if ('turnAPIURI' in config) { + try { const { data } = await axios.get( config.turnAPIURI, { timeout : config.turnAPITimeout || 2000, params : { ...config.turnAPIparams, 'api_key' : config.turnAPIKey, 'ip' : peer.socket.request.connection.remoteAddress } }); iceServers = [ { urls : data.uris, username : data.username, credential : data.password } ]; - } - catch (error) - { + } catch (error) { if ('backupTurnServers' in config && config.backupTurnServers.length) iceServers = config.backupTurnServers; logger.error('_peerJoining() | error on REST turn [error:"%o"]', error); } - } - else if ('backupTurnServers' in config && config.backupTurnServers.length) - { + } else if ('backupTurnServers' in config && config.backupTurnServers.length) { iceServers = config.backupTurnServers; } this._notification(peer.socket, 'roomReady', { iceServers }); }) - .catch((error) => - { + .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } - _handlePeer(peer) - { + _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); - peer.on('close', () => - { + peer.on('close', () => { this._handlePeerClose(peer); }); - peer.on('nicknameChanged', () => - { + peer.on('nicknameChanged', () => { // Spread to others this._notification(peer.socket, 'changeNickname', { peerId: peer.id, nickname: peer.nickname }, true); }); - peer.on('gotRole', ({ newRole }) => - { + peer.on('gotRole', ({ newRole }) => { // Spread to others this._notification(peer.socket, 'changeRole', { peerId: peer.id, role: newRole }, true, true); }); - peer.socket.on('request', (request, cb) => - { + peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) - .catch((error) => - { + .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } - _handlePeerClose(peer) - { + _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } - async _handleSocketRequest(peer, request, cb) - { + async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); console.log(request.method); - switch (request.method) + switch (request.method) { + case 'getRouterRtpCapabilities': { - case 'getRouterRtpCapabilities': - { - cb(null, router.rtpCapabilities); - break; - } + cb(null, router.rtpCapabilities); + break; + } - case 'dumpStats': - { - this.dumpStats() + case 'dumpStats': + { + this.dumpStats() - cb(null); - break; - } + cb(null); + break; + } - case 'join': - { - const { - nickname, - picture, - rtpCapabilities - } = request.data; - - // Store client data into the Peer data object. - peer.nickname = nickname; - peer.picture = picture; - peer.rtpCapabilities = rtpCapabilities; - - // Tell the new Peer about already joined Peers. - const otherPeers = this.getPeers(peer); - - const peerInfos = otherPeers.map(otherPeer => otherPeer.peerInfo); - - cb(null, { - id: peer.id, - role: peer.role, - peers: peerInfos, - }); + case 'join': + { + const { + nickname, + picture, + rtpCapabilities + } = request.data; + + // Store client data into the Peer data object. + peer.nickname = nickname; + peer.picture = picture; + peer.rtpCapabilities = rtpCapabilities; + + // Tell the new Peer about already joined Peers. + const otherPeers = this.getPeers(peer); + + const peerInfos = otherPeers.map(otherPeer => otherPeer.peerInfo); + + cb(null, { + id: peer.id, + role: peer.role, + peers: peerInfos, + }); - // Create Consumers for existing Producers. - for (const otherPeer of otherPeers) { - for (const producer of otherPeer.producers.values()) { - this._createConsumer({ - consumerPeer: peer, - producerPeer: otherPeer, - producer - }); - } + // Create Consumers for existing Producers. + for (const otherPeer of otherPeers) { + for (const producer of otherPeer.producers.values()) { + this._createConsumer({ + consumerPeer: peer, + producerPeer: otherPeer, + producer + }); } + } - // Notify the new Peer to all other Peers. - this._notification(peer.socket, 'newPeer', peer.peerInfo, true); + // Notify the new Peer to all other Peers. + this._notification(peer.socket, 'newPeer', peer.peerInfo, true); - logger.debug( - 'peer joined [peer: "%s", nickname: "%s", picture: "%s"]', - peer.id, nickname, picture); + logger.debug( + 'peer joined [peer: "%s", nickname: "%s", picture: "%s"]', + peer.id, nickname, picture); - break; - } - case 'createPlainTransport': - { - const { producing, consuming } = request.data; + break; + } + case 'createPlainTransport': + { + const { producing, consuming } = request.data; - const transport = await router.createPlainTransport( - { - //When consuming we manually connect using connectPlainTransport, - //otherwise we let the port autodetection work. - comedia: producing, - // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) - rtcpMux: false, - listenIp: { ip: "127.0.0.1", announcedIp: null }, - appData : { producing, consuming } - } - ); + const transport = await router.createPlainTransport( + { + //When consuming we manually connect using connectPlainTransport, + //otherwise we let the port autodetection work. + comedia: producing, + // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) + rtcpMux: false, + listenIp: { ip: "127.0.0.1", announcedIp: null }, + appData : { producing, consuming } + } + ); // await transport.enableTraceEvent([ "probation", "bwe" ]); // transport.on("trace", (trace) => { // console.log(trace); // }); - peer.addTransport(transport.id, transport); + peer.addTransport(transport.id, transport); - cb( - null, - { - id : transport.id, - ip : transport.tuple.localIp, - port : transport.tuple.localPort, - rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined - }); + cb( + null, + { + id : transport.id, + ip : transport.tuple.localIp, + port : transport.tuple.localPort, + rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined + }); - break; - } + break; + } - case 'connectPlainTransport': - { - const { transportId, ip, port, rtcpPort } = request.data; - const transport = peer.getTransport(transportId); + case 'connectPlainTransport': + { + const { transportId, ip, port, rtcpPort } = request.data; + const transport = peer.getTransport(transportId); - if (!transport) - throw new Error(`transport with id "${transportId}" not found`); + if (!transport) + throw new Error(`transport with id "${transportId}" not found`); - await transport.connect({ - ip: ip, - port: port, - rtcpPort: rtcpPort, - }); + await transport.connect({ + ip: ip, + port: port, + rtcpPort: rtcpPort, + }); - cb(); + cb(); - break; - } + break; + } - case 'createWebRtcTransport': - { - // NOTE: Don't require that the Peer is joined here, so the client can - // initiate mediasoup Transports and be ready when he later joins. + case 'createWebRtcTransport': + { + // NOTE: Don't require that the Peer is joined here, so the client can + // initiate mediasoup Transports and be ready when he later joins. - const { forceTcp, producing, consuming } = request.data; + const { forceTcp, producing, consuming } = request.data; - const webRtcTransportOptions = + const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; - webRtcTransportOptions.enableTcp = true; + webRtcTransportOptions.enableTcp = true; - if (forceTcp) - webRtcTransportOptions.enableUdp = false; - else - { - webRtcTransportOptions.enableUdp = true; - webRtcTransportOptions.preferUdp = true; - } + if (forceTcp) + webRtcTransportOptions.enableUdp = false; + else { + webRtcTransportOptions.enableUdp = true; + webRtcTransportOptions.preferUdp = true; + } - const transport = await router.createWebRtcTransport( - webRtcTransportOptions - ); + const transport = await router.createWebRtcTransport( + webRtcTransportOptions + ); - transport.on('dtlsstatechange', (dtlsState) => { - if (dtlsState === 'failed' || dtlsState === 'closed') { - logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); - } - }); + transport.on('dtlsstatechange', (dtlsState) => { + if (dtlsState === 'failed' || dtlsState === 'closed') { + logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); + } + }); - // Store the WebRtcTransport into the Peer data Object. - peer.addTransport(transport.id, transport); + // Store the WebRtcTransport into the Peer data Object. + peer.addTransport(transport.id, transport); - cb( - null, - { - id : transport.id, - iceParameters : transport.iceParameters, - iceCandidates : transport.iceCandidates, - dtlsParameters : transport.dtlsParameters - }); + cb( + null, + { + id : transport.id, + iceParameters : transport.iceParameters, + iceCandidates : transport.iceCandidates, + dtlsParameters : transport.dtlsParameters + }); - const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; + const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; - // If set, apply max incoming bitrate limit. - if (maxIncomingBitrate) - { - try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } - catch (error) { - logger.info("Setting the incoming bitrate failed") - } + // If set, apply max incoming bitrate limit. + if (maxIncomingBitrate) { + try { + await transport.setMaxIncomingBitrate(maxIncomingBitrate); + } catch (error) { + logger.info("Setting the incoming bitrate failed") } - - break; } - case 'connectWebRtcTransport': - { - const { transportId, dtlsParameters } = request.data; - const transport = peer.getTransport(transportId); + break; + } - if (!transport) - throw new Error(`transport with id "${transportId}" not found`); + case 'connectWebRtcTransport': + { + const { transportId, dtlsParameters } = request.data; + const transport = peer.getTransport(transportId); - await transport.connect({ dtlsParameters }); + if (!transport) + throw new Error(`transport with id "${transportId}" not found`); - cb(); + await transport.connect({ dtlsParameters }); - break; - } -/* + cb(); + + break; + } + /* case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } */ - case 'produce': - { - let { appData } = request.data; + case 'produce': + { + let { appData } = request.data; - if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) - throw new Error('invalid producer source'); + if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) + throw new Error('invalid producer source'); - if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) - throw new Error('peer not authorized'); + if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) + throw new Error('peer not authorized'); - if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) - throw new Error('peer not authorized'); + if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) + throw new Error('peer not authorized'); - if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) - throw new Error('peer not authorized'); + if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) + throw new Error('peer not authorized'); - const { transportId, kind, rtpParameters } = request.data; - const transport = peer.getTransport(transportId); + const { transportId, kind, rtpParameters } = request.data; + const transport = peer.getTransport(transportId); - if (!transport) - throw new Error(`transport with id "${transportId}" not found`); + if (!transport) + throw new Error(`transport with id "${transportId}" not found`); - // Add peerId into appData to later get the associated Peer during - // the 'loudest' event of the audioLevelObserver. - appData = { ...appData, peerId: peer.id }; + // Add peerId into appData to later get the associated Peer during + // the 'loudest' event of the audioLevelObserver. + appData = { ...appData, peerId: peer.id }; - const producer = + const producer = await transport.produce({ kind, rtpParameters, appData }); - const pipeRouters = this._getRoutersToPipeTo(peer.routerId); + const pipeRouters = this._getRoutersToPipeTo(peer.routerId); - for (const [ routerId, destinationRouter ] of this._mediasoupRouters) - { - if (pipeRouters.includes(routerId)) - { - await router.pipeToRouter({ - producerId : producer.id, - router : destinationRouter - }); - } + for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { + if (pipeRouters.includes(routerId)) { + await router.pipeToRouter({ + producerId : producer.id, + router : destinationRouter + }); } + } - // Store the Producer into the Peer data Object. - peer.addProducer(producer.id, producer); - - producer.on('videoorientationchange', (videoOrientation) => - { - logger.debug( - 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', - producer.id, videoOrientation); - }); - - // Trace individual packets for debugging - // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); - // producer.on("trace", (trace) => { - // console.log(`Trace on ${producer.id}`, trace); - // }); + // Store the Producer into the Peer data Object. + peer.addProducer(producer.id, producer); - cb(null, { id: producer.id }); + producer.on('videoorientationchange', (videoOrientation) => { + logger.debug( + 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', + producer.id, videoOrientation); + }); - // Optimization: Create a server-side Consumer for each Peer. - for (const otherPeer of this.getPeers(peer)) - { - this._createConsumer( - { - consumerPeer : otherPeer, - producerPeer : peer, - producer - }); - } + // Trace individual packets for debugging + // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); + // producer.on("trace", (trace) => { + // console.log(`Trace on ${producer.id}`, trace); + // }); - // Add into the audioLevelObserver. - if (kind === 'audio') - { - this._audioLevelObserver.addProducer({ producerId: producer.id }) - .catch(() => {}); - } + cb(null, { id: producer.id }); - break; + // Optimization: Create a server-side Consumer for each Peer. + for (const otherPeer of this.getPeers(peer)) { + this._createConsumer( + { + consumerPeer : otherPeer, + producerPeer : peer, + producer + }); } - case 'closeProducer': - { - const { producerId } = request.data; - const producer = peer.getProducer(producerId); + // Add into the audioLevelObserver. + if (kind === 'audio') { + this._audioLevelObserver.addProducer({ producerId: producer.id }) + .catch(() => {}); + } - if (!producer) - throw new Error(`producer with id "${producerId}" not found`); + break; + } - producer.close(); + case 'closeProducer': + { + const { producerId } = request.data; + const producer = peer.getProducer(producerId); - // Remove from its map. - peer.removeProducer(producer.id); + if (!producer) + throw new Error(`producer with id "${producerId}" not found`); - cb(); + producer.close(); - break; - } + // Remove from its map. + peer.removeProducer(producer.id); - case 'pauseProducer': - { - const { producerId } = request.data; - const producer = peer.getProducer(producerId); + cb(); - if (!producer) - throw new Error(`producer with id "${producerId}" not found`); + break; + } - await producer.pause(); + case 'pauseProducer': + { + const { producerId } = request.data; + const producer = peer.getProducer(producerId); - cb(); + if (!producer) + throw new Error(`producer with id "${producerId}" not found`); - break; - } + await producer.pause(); - case 'resumeProducer': - { - const { producerId } = request.data; - const producer = peer.getProducer(producerId); + cb(); - if (!producer) - throw new Error(`producer with id "${producerId}" not found`); + break; + } - await producer.resume(); + case 'resumeProducer': + { + const { producerId } = request.data; + const producer = peer.getProducer(producerId); - cb(); + if (!producer) + throw new Error(`producer with id "${producerId}" not found`); - break; - } + await producer.resume(); - case 'pauseConsumer': - { - const { consumerId } = request.data; - const consumer = peer.getConsumer(consumerId); + cb(); - if (!consumer) - throw new Error(`consumer with id "${consumerId}" not found`); + break; + } - await consumer.pause(); + case 'pauseConsumer': + { + const { consumerId } = request.data; + const consumer = peer.getConsumer(consumerId); - cb(); + if (!consumer) + throw new Error(`consumer with id "${consumerId}" not found`); - break; - } + await consumer.pause(); - case 'resumeConsumer': - { - const { consumerId } = request.data; - const consumer = peer.getConsumer(consumerId); + cb(); - if (!consumer) - throw new Error(`consumer with id "${consumerId}" not found`); + break; + } - await consumer.resume(); + case 'resumeConsumer': + { + const { consumerId } = request.data; + const consumer = peer.getConsumer(consumerId); - cb(); + if (!consumer) + throw new Error(`consumer with id "${consumerId}" not found`); - break; - } + await consumer.resume(); - case 'changeNickname': - { - const { nickname } = request.data; + cb(); - peer.nickname = nickname; + break; + } - // This will be spread through events from the peer object + case 'changeNickname': + { + const { nickname } = request.data; - // Return no error - cb(); + peer.nickname = nickname; - break; - } + // This will be spread through events from the peer object - case 'chatMessage': - { - const { message } = request.data; + // Return no error + cb(); - // Spread to others - this._notification(peer.socket, 'chatMessage', { - peerId: peer.id, - nickname: peer.nickname, - message: message - }, true, true); + break; + } - // Return no error - cb(); + case 'chatMessage': + { + const { message } = request.data; - break; - } + // Spread to others + this._notification(peer.socket, 'chatMessage', { + peerId: peer.id, + nickname: peer.nickname, + message: message + }, true, true); - case 'moderator:addRole': - { - if (!peer.hasRole(Roles.MODERATOR)) - throw new Error('peer not authorized'); + // Return no error + cb(); - const { peerId, role } = request.data; + break; + } - const rolePeer = this._peers[peerId]; + case 'moderator:addRole': + { + if (!peer.hasRole(Roles.MODERATOR)) + throw new Error('peer not authorized'); - if (!rolePeer) - throw new Error(`peer with id "${peerId}" not found`); + const { peerId, role } = request.data; - if (!rolePeer.isValidRole(role)) - throw new Error('invalid role'); + const rolePeer = this._peers[peerId]; - if (!rolePeer.hasRole(role)) { - // This will propagate the event automatically - rolePeer.setRole(rolePeer.role | role); - } + if (!rolePeer) + throw new Error(`peer with id "${peerId}" not found`); - // Return no error - cb(); + if (!rolePeer.isValidRole(role)) + throw new Error('invalid role'); - break; + if (!rolePeer.hasRole(role)) { + // This will propagate the event automatically + rolePeer.setRole(rolePeer.role | role); } - case 'moderator:removeRole': - { - if (!peer.hasRole(Roles.MODERATOR)) - throw new Error('peer not authorized'); + // Return no error + cb(); - const { peerId, role } = request.data; + break; + } - const rolePeer = this._peers[peerId]; + case 'moderator:removeRole': + { + if (!peer.hasRole(Roles.MODERATOR)) + throw new Error('peer not authorized'); - if (!rolePeer) - throw new Error(`peer with id "${peerId}" not found`); + const { peerId, role } = request.data; - if (!rolePeer.isValidRole(role)) - throw new Error('invalid role'); + const rolePeer = this._peers[peerId]; - if (rolePeer.hasRole(role)) { - // This will propagate the event automatically - rolePeer.setRole(rolePeer.role ^ role); - } + if (!rolePeer) + throw new Error(`peer with id "${peerId}" not found`); - // Return no error - cb(); + if (!rolePeer.isValidRole(role)) + throw new Error('invalid role'); - break; + if (rolePeer.hasRole(role)) { + // This will propagate the event automatically + rolePeer.setRole(rolePeer.role ^ role); } - case 'moderator:closeRoom': - { - if (!peer.hasRole(Roles.OWNER)) - throw new Error('peer not authorized'); + // Return no error + cb(); - this._notification(peer.socket, 'moderator:closeRoom', null, true); + break; + } - cb(); + case 'moderator:closeRoom': + { + if (!peer.hasRole(Roles.OWNER)) + throw new Error('peer not authorized'); - // Close the room - this.close(); + this._notification(peer.socket, 'moderator:closeRoom', null, true); - // TODO: remove the room? + cb(); - break; - } + // Close the room + this.close(); - case 'moderator:kickPeer': - { - if (!peer.hasRole(Roles.MODERATOR)) - throw new Error('peer not authorized'); + // TODO: remove the room? - const { peerId } = request.data; + break; + } - const kickPeer = this._peers[peerId]; + case 'moderator:kickPeer': + { + if (!peer.hasRole(Roles.MODERATOR)) + throw new Error('peer not authorized'); - if (!kickPeer) - throw new Error(`peer with id "${peerId}" not found`); + const { peerId } = request.data; - this._notification(kickPeer.socket, 'moderator:kickPeer'); + const kickPeer = this._peers[peerId]; - kickPeer.close(); + if (!kickPeer) + throw new Error(`peer with id "${peerId}" not found`); - cb(); + this._notification(kickPeer.socket, 'moderator:kickPeer'); - break; - } + kickPeer.close(); - case 'raisedHand': - { - const { raisedHand } = request.data; + cb(); - peer.raisedHand = raisedHand; + break; + } - // Spread to others - this._notification(peer.socket, 'raisedHand', { - peerId: peer.id, - raisedHand: raisedHand, - }, true); + case 'raisedHand': + { + const { raisedHand } = request.data; - // Return no error - cb(); + peer.raisedHand = raisedHand; - break; - } + // Spread to others + this._notification(peer.socket, 'raisedHand', { + peerId: peer.id, + raisedHand: raisedHand, + }, true); - default: - { - logger.error('unknown request.method "%s"', request.method); + // Return no error + cb(); - cb(500, `unknown request.method "${request.method}"`); - } + break; + } + + default: + { + logger.error('unknown request.method "%s"', request.method); + + cb(500, `unknown request.method "${request.method}"`); + } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ - async _createConsumer({ consumerPeer, producerPeer, producer }) - { + async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) - ) - { + ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. - if (!transport) - { + if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; - try - { + try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); - } - catch (error) - { + } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Trace individual packets for debugging // await consumer.enableTraceEvent([ "rtp", "pli", "fir" ]); // consumer.on("trace", (trace) => { // console.log(`Trace on ${consumer.id}`, trace); // }); // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. - consumer.on('transportclose', () => - { + consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); - consumer.on('producerclose', () => - { + consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); - consumer.on('producerpause', () => - { + consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); - consumer.on('producerresume', () => - { + consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. - try - { + try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); - } - catch (error) - { + } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } /** * Get the list of peers. */ - getPeers(excludePeer = undefined) - { + getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } - _timeoutCallback(callback) - { + _timeoutCallback(callback) { let called = false; const interval = setTimeout( - () => - { + () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); - return (...args) => - { + return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } - _sendRequest(socket, method, data = {}) - { - return new Promise((resolve, reject) => - { + _sendRequest(socket, method, data = {}) { + return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, - this._timeoutCallback((err, response) => - { - if (err) - { + this._timeoutCallback((err, response) => { + if (err) { reject(err); - } - else - { + } else { resolve(response); } }) ); }); } - async _request(socket, method, data) - { + async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; - for (let tries = 0; tries < requestRetries; tries++) - { - try - { + for (let tries = 0; tries < requestRetries; tries++) { + try { return await this._sendRequest(socket, method, data); - } - catch (error) - { + } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } - _notification(socket, method, data = {}, broadcast = false, includeSender = false) - { - if (broadcast) - { + _notification(socket, method, data = {}, broadcast = false, includeSender = false) { + if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); - } - else - { + } else { socket.emit('notification', { method, data }); } } /* * Pipe producers of peers that are running under another router to this router. */ - async _pipeProducersToRouter(routerId) - { + async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); // All peers that have a different router const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); - for (const peer of peersToPipe) - { + for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); - for (const producerId of peer.producers.keys()) - { - if (router._producers.has(producerId)) - { + for (const producerId of peer.producers.keys()) { + if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } - async _getRouterId() - { + async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._peers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to: // The combined set of routers of all peers, exluding the router of the peer itself. - _getRoutersToPipeTo(originRouterId) - { + _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/meet/server/lib/errors.js b/meet/server/lib/errors.js index a353a6ed..cb71ad77 100644 --- a/meet/server/lib/errors.js +++ b/meet/server/lib/errors.js @@ -1,23 +1,21 @@ /** * Error produced when a socket request has a timeout. */ -class SocketTimeoutError extends Error -{ - constructor(message) - { +class SocketTimeoutError extends Error { + constructor(message) { super(message); this.name = 'SocketTimeoutError'; // eslint-disable-next-line no-prototype-builtins if (Error.hasOwnProperty('captureStackTrace')) // Just in V8. Error.captureStackTrace(this, SocketTimeoutError); else this.stack = (new Error(message)).stack; } } module.exports = { SocketTimeoutError }; diff --git a/meet/server/lib/interactiveClient.js b/meet/server/lib/interactiveClient.js index 2aa38e78..6f96e35e 100644 --- a/meet/server/lib/interactiveClient.js +++ b/meet/server/lib/interactiveClient.js @@ -1,27 +1,25 @@ const net = require('net'); const os = require('os'); const path = require('path'); const SOCKET_PATH_UNIX = '/tmp/kolabmeet-server.sock'; const SOCKET_PATH_WIN = path.join('\\\\?\\pipe', process.cwd(), 'kolabmeet-server'); const SOCKET_PATH = os.platform() === 'win32'? SOCKET_PATH_WIN : SOCKET_PATH_UNIX; -module.exports = async function() -{ +module.exports = async function() { const socket = net.connect(SOCKET_PATH); process.stdin.pipe(socket); socket.pipe(process.stdout); socket.on('connect', () => process.stdin.setRawMode(true)); socket.on('close', () => process.exit(0)); socket.on('exit', () => socket.end()); - if (process.argv && process.argv[2] === '--stats') - { + if (process.argv && process.argv[2] === '--stats') { await socket.write('stats\n'); socket.end(); } }; diff --git a/meet/server/lib/interactiveServer.js b/meet/server/lib/interactiveServer.js index 31027169..847278ec 100644 --- a/meet/server/lib/interactiveServer.js +++ b/meet/server/lib/interactiveServer.js @@ -1,693 +1,610 @@ const os = require('os'); const path = require('path'); const repl = require('repl'); const readline = require('readline'); const net = require('net'); const fs = require('fs'); const mediasoup = require('mediasoup'); const colors = require('colors/safe'); const pidusage = require('pidusage'); const SOCKET_PATH_UNIX = '/tmp/kolabmeet-server.sock'; const SOCKET_PATH_WIN = path.join('\\\\?\\pipe', process.cwd(), 'kolabmeet-server'); const SOCKET_PATH = os.platform() === 'win32' ? SOCKET_PATH_WIN : SOCKET_PATH_UNIX; // Maps to store all mediasoup objects. const workers = new Map(); const routers = new Map(); const transports = new Map(); const producers = new Map(); const consumers = new Map(); const dataProducers = new Map(); const dataConsumers = new Map(); -class Interactive -{ - constructor(socket) - { +class Interactive { + constructor(socket) { this._socket = socket; this._isTerminalOpen = false; } - openCommandConsole() - { + openCommandConsole() { const cmd = readline.createInterface( { input : this._socket, output : this._socket, terminal : true }); - cmd.on('close', () => - { + cmd.on('close', () => { if (this._isTerminalOpen) return; this.log('\nexiting...'); this._socket.end(); }); - const readStdin = () => - { - cmd.question('cmd> ', async (input) => - { + const readStdin = () => { + cmd.question('cmd> ', async (input) => { const params = input.split(/[\s\t]+/); const command = params.shift(); - switch (command) + switch (command) { + case '': { - case '': - { - readStdin(); - break; - } - - case 'h': - case 'help': - { - this.log(''); - this.log('available commands:'); - this.log('- h, help : show this message'); - this.log('- usage : show CPU and memory usage of the Node.js and mediasoup-worker processes'); - this.log('- logLevel level : changes logLevel in all mediasoup Workers'); - this.log('- logTags [tag] [tag] : changes logTags in all mediasoup Workers (values separated by space)'); - this.log('- dumpRooms : dump all rooms'); - this.log('- dumpPeers : dump all peers'); - this.log('- dw, dumpWorkers : dump mediasoup Workers'); - this.log('- dr, dumpRouter [id] : dump mediasoup Router with given id (or the latest created one)'); - this.log('- dt, dumpTransport [id] : dump mediasoup Transport with given id (or the latest created one)'); - this.log('- dp, dumpProducer [id] : dump mediasoup Producer with given id (or the latest created one)'); - this.log('- dc, dumpConsumer [id] : dump mediasoup Consumer with given id (or the latest created one)'); - this.log('- st, statsTransport [id] : get stats for mediasoup Transport with given id (or all)'); - this.log('- sp, statsProducer [id] : get stats for mediasoup Producer with given id (or all)'); - this.log('- sc, statsConsumer [id] : get stats for mediasoup Consumer with given id (or all)'); - this.log('- ddp, dumpDataProducer [id] : dump mediasoup DataProducer with given id (or the latest created one)'); - this.log('- ddc, dumpDataConsumer [id] : dump mediasoup DataConsumer with given id (or the latest created one)'); - this.log('- sdp, statsDataProducer [id] : get stats for mediasoup DataProducer with given id (or the latest created one)'); - this.log('- sdc, statsDataConsumer [id] : get stats for mediasoup DataConsumer with given id (or the latest created one)'); - this.log('- t, terminal : open Node REPL Terminal'); - this.log(''); - readStdin(); - - break; - } + readStdin(); + break; + } - case 'u': - case 'usage': - { - let usage = await pidusage(process.pid); + case 'h': + case 'help': + { + this.log(''); + this.log('available commands:'); + this.log('- h, help : show this message'); + this.log('- usage : show CPU and memory usage of the Node.js and mediasoup-worker processes'); + this.log('- logLevel level : changes logLevel in all mediasoup Workers'); + this.log('- logTags [tag] [tag] : changes logTags in all mediasoup Workers (values separated by space)'); + this.log('- dumpRooms : dump all rooms'); + this.log('- dumpPeers : dump all peers'); + this.log('- dw, dumpWorkers : dump mediasoup Workers'); + this.log('- dr, dumpRouter [id] : dump mediasoup Router with given id (or the latest created one)'); + this.log('- dt, dumpTransport [id] : dump mediasoup Transport with given id (or the latest created one)'); + this.log('- dp, dumpProducer [id] : dump mediasoup Producer with given id (or the latest created one)'); + this.log('- dc, dumpConsumer [id] : dump mediasoup Consumer with given id (or the latest created one)'); + this.log('- st, statsTransport [id] : get stats for mediasoup Transport with given id (or all)'); + this.log('- sp, statsProducer [id] : get stats for mediasoup Producer with given id (or all)'); + this.log('- sc, statsConsumer [id] : get stats for mediasoup Consumer with given id (or all)'); + this.log('- ddp, dumpDataProducer [id] : dump mediasoup DataProducer with given id (or the latest created one)'); + this.log('- ddc, dumpDataConsumer [id] : dump mediasoup DataConsumer with given id (or the latest created one)'); + this.log('- sdp, statsDataProducer [id] : get stats for mediasoup DataProducer with given id (or the latest created one)'); + this.log('- sdc, statsDataConsumer [id] : get stats for mediasoup DataConsumer with given id (or the latest created one)'); + this.log('- t, terminal : open Node REPL Terminal'); + this.log(''); + readStdin(); + + break; + } - this.log(`Node.js process [pid:${process.pid}]:\n${JSON.stringify(usage, null, ' ')}`); + case 'u': + case 'usage': + { + let usage = await pidusage(process.pid); - for (const worker of workers.values()) - { - usage = await pidusage(worker.pid); + this.log(`Node.js process [pid:${process.pid}]:\n${JSON.stringify(usage, null, ' ')}`); - this.log(`mediasoup-worker process [pid:${worker.pid}]:\n${JSON.stringify(usage, null, ' ')}`); - } + for (const worker of workers.values()) { + usage = await pidusage(worker.pid); - break; + this.log(`mediasoup-worker process [pid:${worker.pid}]:\n${JSON.stringify(usage, null, ' ')}`); } - case 'logLevel': - { - const level = params[0]; - const promises = []; + break; + } - for (const worker of workers.values()) - { - promises.push(worker.updateSettings({ logLevel: level })); - } + case 'logLevel': + { + const level = params[0]; + const promises = []; - try - { - await Promise.all(promises); + for (const worker of workers.values()) { + promises.push(worker.updateSettings({ logLevel: level })); + } - this.log('done'); - } - catch (error) - { - this.error(String(error)); - } + try { + await Promise.all(promises); - break; + this.log('done'); + } catch (error) { + this.error(String(error)); } - case 'logTags': - { - const tags = params; - const promises = []; + break; + } - for (const worker of workers.values()) - { - promises.push(worker.updateSettings({ logTags: tags })); - } + case 'logTags': + { + const tags = params; + const promises = []; - try - { - await Promise.all(promises); + for (const worker of workers.values()) { + promises.push(worker.updateSettings({ logTags: tags })); + } - this.log('done'); - } - catch (error) - { - this.error(String(error)); - } + try { + await Promise.all(promises); - break; + this.log('done'); + } catch (error) { + this.error(String(error)); } - case 'stats': - { - this.log(`rooms:${global.rooms.size}\npeers:${global.peers.size}`); + break; + } - break; - } + case 'stats': + { + this.log(`rooms:${global.rooms.size}\npeers:${global.peers.size}`); - case 'dumpRooms': - { - for (const room of global.rooms.values()) - { - try - { - const dump = await room.dump(); - - this.log(`room.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`room.dump() failed: ${error}`); - } - } + break; + } - break; - } + case 'dumpRooms': + { + for (const room of global.rooms.values()) { + try { + const dump = await room.dump(); - case 'dumpPeers': - { - for (const peer of global.peers.values()) - { - try - { - const dump = await peer.peerInfo; - - this.log(`peer.peerInfo():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`peer.peerInfo() failed: ${error}`); - } + this.log(`room.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`room.dump() failed: ${error}`); } - - break; } - case 'dw': - case 'dumpWorkers': - { - for (const worker of workers.values()) - { - try - { - const dump = await worker.dump(); - - this.log(`worker.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`worker.dump() failed: ${error}`); - } - } + break; + } - break; + case 'dumpPeers': + { + for (const peer of global.peers.values()) { + try { + const dump = await peer.peerInfo; + + this.log(`peer.peerInfo():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`peer.peerInfo() failed: ${error}`); + } } - case 'dr': - case 'dumpRouter': - { - const id = params[0] || Array.from(routers.keys()).pop(); - const router = routers.get(id); + break; + } - if (!router) - { - this.error('Router not found'); + case 'dw': + case 'dumpWorkers': + { + for (const worker of workers.values()) { + try { + const dump = await worker.dump(); - break; + this.log(`worker.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`worker.dump() failed: ${error}`); } + } - try - { - const dump = await router.dump(); + break; + } - this.log(`router.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`router.dump() failed: ${error}`); - } + case 'dr': + case 'dumpRouter': + { + const id = params[0] || Array.from(routers.keys()).pop(); + const router = routers.get(id); + + if (!router) { + this.error('Router not found'); break; } - case 'dt': - case 'dumpTransport': - { - const id = params[0] || Array.from(transports.keys()).pop(); - const transport = transports.get(id); + try { + const dump = await router.dump(); - if (!transport) - { - this.error('Transport not found'); + this.log(`router.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`router.dump() failed: ${error}`); + } - break; - } + break; + } - try - { - const dump = await transport.dump(); + case 'dt': + case 'dumpTransport': + { + const id = params[0] || Array.from(transports.keys()).pop(); + const transport = transports.get(id); - this.log(`transport.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`transport.dump() failed: ${error}`); - } + if (!transport) { + this.error('Transport not found'); break; } - case 'dp': - case 'dumpProducer': - { - const id = params[0] || Array.from(producers.keys()).pop(); - const producer = producers.get(id); + try { + const dump = await transport.dump(); - if (!producer) - { - this.error('Producer not found'); + this.log(`transport.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`transport.dump() failed: ${error}`); + } - break; - } + break; + } - try - { - const dump = await producer.dump(); + case 'dp': + case 'dumpProducer': + { + const id = params[0] || Array.from(producers.keys()).pop(); + const producer = producers.get(id); - this.log(`producer.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`producer.dump() failed: ${error}`); - } + if (!producer) { + this.error('Producer not found'); break; } - case 'dc': - case 'dumpConsumer': - { - const id = params[0] || Array.from(consumers.keys()).pop(); - const consumer = consumers.get(id); + try { + const dump = await producer.dump(); - if (!consumer) - { - this.error('Consumer not found'); + this.log(`producer.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`producer.dump() failed: ${error}`); + } - break; - } + break; + } - try - { - const dump = await consumer.dump(); + case 'dc': + case 'dumpConsumer': + { + const id = params[0] || Array.from(consumers.keys()).pop(); + const consumer = consumers.get(id); - this.log(`consumer.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`consumer.dump() failed: ${error}`); - } + if (!consumer) { + this.error('Consumer not found'); break; } - case 'ddp': - case 'dumpDataProducer': - { - const id = params[0] || Array.from(dataProducers.keys()).pop(); - const dataProducer = dataProducers.get(id); + try { + const dump = await consumer.dump(); - if (!dataProducer) - { - this.error('DataProducer not found'); + this.log(`consumer.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`consumer.dump() failed: ${error}`); + } - break; - } + break; + } - try - { - const dump = await dataProducer.dump(); + case 'ddp': + case 'dumpDataProducer': + { + const id = params[0] || Array.from(dataProducers.keys()).pop(); + const dataProducer = dataProducers.get(id); - this.log(`dataProducer.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`dataProducer.dump() failed: ${error}`); - } + if (!dataProducer) { + this.error('DataProducer not found'); break; } - case 'ddc': - case 'dumpDataConsumer': - { - const id = params[0] || Array.from(dataConsumers.keys()).pop(); - const dataConsumer = dataConsumers.get(id); + try { + const dump = await dataProducer.dump(); - if (!dataConsumer) - { - this.error('DataConsumer not found'); + this.log(`dataProducer.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`dataProducer.dump() failed: ${error}`); + } - break; - } + break; + } - try - { - const dump = await dataConsumer.dump(); + case 'ddc': + case 'dumpDataConsumer': + { + const id = params[0] || Array.from(dataConsumers.keys()).pop(); + const dataConsumer = dataConsumers.get(id); - this.log(`dataConsumer.dump():\n${JSON.stringify(dump, null, ' ')}`); - } - catch (error) - { - this.error(`dataConsumer.dump() failed: ${error}`); - } + if (!dataConsumer) { + this.error('DataConsumer not found'); break; } - case 'st': - case 'statsTransport': - { - const list = params[0] ? [transports.get(params[0])] : transports.values(); - - for (const transport of list) { - if (!transport) { - this.error('Producer not found'); - break; - } - try { - const stats = await transport.getStats(); - - this.log(`transport.getStats():\n${JSON.stringify(stats, null, ' ')}`); - } - catch (error) { - this.error(`transport.getStats() failed: ${error}`); - } - } + try { + const dump = await dataConsumer.dump(); - break; + this.log(`dataConsumer.dump():\n${JSON.stringify(dump, null, ' ')}`); + } catch (error) { + this.error(`dataConsumer.dump() failed: ${error}`); } - case 'sp': - case 'statsProducer': - { - const list = params[0] ? [producers.get(params[0])] : producers.values(); - - for (const producer of list) { - if (!producer) { - this.error('Producer not found'); - break; - } - - try { - const stats = await producer.getStats(); - - this.log(`producer.getStats():\n${JSON.stringify(stats, null, ' ')}`); - } - catch (error) { - this.error(`producer.getStats() failed: ${error}`); - } - } + break; + } - break; - } + case 'st': + case 'statsTransport': + { + const list = params[0] ? [transports.get(params[0])] : transports.values(); - case 'sc': - case 'statsConsumer': - { - const list = params[0] ? [consumers.get(params[0])] : consumers.values(); - - for (const consumer of list) { - if (!consumer) { - this.error('consumerr not found'); - break; - } - - try { - const stats = await consumer.getStats(); - this.log(`consumer.getStats():\n${JSON.stringify(stats, null, ' ')}`); - } - catch (error) { - this.error(`consumer.getStats() failed: ${error}`); - } + for (const transport of list) { + if (!transport) { + this.error('Producer not found'); + break; } + try { + const stats = await transport.getStats(); - break; + this.log(`transport.getStats():\n${JSON.stringify(stats, null, ' ')}`); + } catch (error) { + this.error(`transport.getStats() failed: ${error}`); + } } - case 'sdp': - case 'statsDataProducer': - { - const id = params[0] || Array.from(dataProducers.keys()).pop(); - const dataProducer = dataProducers.get(id); + break; + } - if (!dataProducer) - { - this.error('DataProducer not found'); + case 'sp': + case 'statsProducer': + { + const list = params[0] ? [producers.get(params[0])] : producers.values(); + for (const producer of list) { + if (!producer) { + this.error('Producer not found'); break; } - try - { - const stats = await dataProducer.getStats(); + try { + const stats = await producer.getStats(); - this.log(`dataProducer.getStats():\n${JSON.stringify(stats, null, ' ')}`); + this.log(`producer.getStats():\n${JSON.stringify(stats, null, ' ')}`); + } catch (error) { + this.error(`producer.getStats() failed: ${error}`); } - catch (error) - { - this.error(`dataProducer.getStats() failed: ${error}`); - } - - break; } - case 'sdc': - case 'statsDataConsumer': - { - const id = params[0] || Array.from(dataConsumers.keys()).pop(); - const dataConsumer = dataConsumers.get(id); + break; + } - if (!dataConsumer) - { - this.error('DataConsumer not found'); + case 'sc': + case 'statsConsumer': + { + const list = params[0] ? [consumers.get(params[0])] : consumers.values(); + for (const consumer of list) { + if (!consumer) { + this.error('consumerr not found'); break; } - try - { - const stats = await dataConsumer.getStats(); - - this.log(`dataConsumer.getStats():\n${JSON.stringify(stats, null, ' ')}`); - } - catch (error) - { - this.error(`dataConsumer.getStats() failed: ${error}`); + try { + const stats = await consumer.getStats(); + this.log(`consumer.getStats():\n${JSON.stringify(stats, null, ' ')}`); + } catch (error) { + this.error(`consumer.getStats() failed: ${error}`); } + } + + break; + } + + case 'sdp': + case 'statsDataProducer': + { + const id = params[0] || Array.from(dataProducers.keys()).pop(); + const dataProducer = dataProducers.get(id); + + if (!dataProducer) { + this.error('DataProducer not found'); break; } - case 't': - case 'terminal': - { - this._isTerminalOpen = true; + try { + const stats = await dataProducer.getStats(); - cmd.close(); - this.openTerminal(); + this.log(`dataProducer.getStats():\n${JSON.stringify(stats, null, ' ')}`); + } catch (error) { + this.error(`dataProducer.getStats() failed: ${error}`); + } + + break; + } + + case 'sdc': + case 'statsDataConsumer': + { + const id = params[0] || Array.from(dataConsumers.keys()).pop(); + const dataConsumer = dataConsumers.get(id); - return; + if (!dataConsumer) { + this.error('DataConsumer not found'); + + break; } - default: - { - this.error(`unknown command '${command}'`); - this.log('press \'h\' or \'help\' to get the list of available commands'); + try { + const stats = await dataConsumer.getStats(); + + this.log(`dataConsumer.getStats():\n${JSON.stringify(stats, null, ' ')}`); + } catch (error) { + this.error(`dataConsumer.getStats() failed: ${error}`); } + + break; + } + + case 't': + case 'terminal': + { + this._isTerminalOpen = true; + + cmd.close(); + this.openTerminal(); + + return; + } + + default: + { + this.error(`unknown command '${command}'`); + this.log('press \'h\' or \'help\' to get the list of available commands'); + } } readStdin(); }); }; readStdin(); } - openTerminal() - { + openTerminal() { this.log('\n[opening Node REPL Terminal...]'); this.log('here you have access to workers, routers, transports, producers, consumers, dataProducers and dataConsumers ES6 maps'); const terminal = repl.start( { input : this._socket, output : this._socket, terminal : true, prompt : 'terminal> ', useColors : true, useGlobal : true, ignoreUndefined : false }); this._isTerminalOpen = true; - terminal.on('exit', () => - { + terminal.on('exit', () => { this.log('\n[exiting Node REPL Terminal...]'); this._isTerminalOpen = false; this.openCommandConsole(); }); } - log(msg) - { - try - { + log(msg) { + try { this._socket.write(`${colors.green(msg)}\n`); - } - catch (error) { + } catch (error) { //Do nothing } } - error(msg) - { - try - { + error(msg) { + try { this._socket.write(`${colors.red.bold('ERROR: ')}${colors.red(msg)}\n`); - } - catch (error) { + } catch (error) { //Do nothing } } } -function runMediasoupObserver() -{ - mediasoup.observer.on('newworker', (worker) => - { +function runMediasoupObserver() { + mediasoup.observer.on('newworker', (worker) => { // Store the latest worker in a global variable. global.worker = worker; workers.set(worker.pid, worker); worker.observer.on('close', () => workers.delete(worker.pid)); - worker.observer.on('newrouter', (router) => - { + worker.observer.on('newrouter', (router) => { // Store the latest router in a global variable. global.router = router; routers.set(router.id, router); router.observer.on('close', () => routers.delete(router.id)); - router.observer.on('newtransport', (transport) => - { + router.observer.on('newtransport', (transport) => { // Store the latest transport in a global variable. global.transport = transport; transports.set(transport.id, transport); transport.observer.on('close', () => transports.delete(transport.id)); - transport.observer.on('newproducer', (producer) => - { + transport.observer.on('newproducer', (producer) => { // Store the latest producer in a global variable. global.producer = producer; producers.set(producer.id, producer); producer.observer.on('close', () => producers.delete(producer.id)); }); - transport.observer.on('newconsumer', (consumer) => - { + transport.observer.on('newconsumer', (consumer) => { // Store the latest consumer in a global variable. global.consumer = consumer; consumers.set(consumer.id, consumer); consumer.observer.on('close', () => consumers.delete(consumer.id)); }); - transport.observer.on('newdataproducer', (dataProducer) => - { + transport.observer.on('newdataproducer', (dataProducer) => { // Store the latest dataProducer in a global variable. global.dataProducer = dataProducer; dataProducers.set(dataProducer.id, dataProducer); dataProducer.observer.on('close', () => dataProducers.delete(dataProducer.id)); }); - transport.observer.on('newdataconsumer', (dataConsumer) => - { + transport.observer.on('newdataconsumer', (dataConsumer) => { // Store the latest dataConsumer in a global variable. global.dataConsumer = dataConsumer; dataConsumers.set(dataConsumer.id, dataConsumer); dataConsumer.observer.on('close', () => dataConsumers.delete(dataConsumer.id)); }); }); }); }); } -module.exports = async function(rooms, peers) -{ - try - { +module.exports = async function(rooms, peers) { + try { // Run the mediasoup observer API. runMediasoupObserver(); // Make maps global so they can be used during the REPL terminal. global.rooms = rooms; global.peers = peers; global.workers = workers; global.routers = routers; global.transports = transports; global.producers = producers; global.consumers = consumers; global.dataProducers = dataProducers; global.dataConsumers = dataConsumers; - const server = net.createServer((socket) => - { + const server = net.createServer((socket) => { const interactive = new Interactive(socket); interactive.openCommandConsole(); }); - await new Promise((resolve) => - { - try { fs.unlinkSync(SOCKET_PATH); } - catch (error) { + await new Promise((resolve) => { + try { + fs.unlinkSync(SOCKET_PATH); + } catch (error) { //Do nothing } server.listen(SOCKET_PATH, resolve); }); - } - catch (error) { + } catch (error) { //Do nothing } }; diff --git a/meet/server/lib/promExporter.js b/meet/server/lib/promExporter.js index e5864d70..08139265 100644 --- a/meet/server/lib/promExporter.js +++ b/meet/server/lib/promExporter.js @@ -1,286 +1,244 @@ const { Resolver } = require('dns').promises; const express = require('express'); const mediasoup = require('mediasoup'); const prom = require('prom-client'); const Logger = require('./Logger'); const logger = new Logger('prom'); const resolver = new Resolver(); const workers = new Map(); const labelNames = [ 'pid', 'room_id', 'peer_id', 'display_name', 'user_agent', 'transport_id', 'proto', 'local_addr', 'remote_addr', 'id', 'kind', 'codec', 'type' ]; const metadata = { 'byteCount' : { metricType: prom.Counter, unit: 'bytes' }, 'score' : { metricType: prom.Gauge } }; -module.exports = async function(rooms, peers, config) -{ - const collect = async function(registry) - { - const newMetrics = function(subsystem) - { +module.exports = async function(rooms, peers, config) { + const collect = async function(registry) { + const newMetrics = function(subsystem) { const namespace = 'mediasoup'; const metrics = new Map(); - for (const key in metadata) - { - if (Object.prototype.hasOwnProperty.call(metadata, key)) - { + for (const key in metadata) { + if (Object.prototype.hasOwnProperty.call(metadata, key)) { const value = metadata[key]; const name = key.split(/(?=[A-Z])/).join('_') .toLowerCase(); const unit = value.unit; const metricType = value.metricType; let s = `${namespace}_${subsystem}_${name}`; - if (unit) - { + if (unit) { s += `_${unit}`; } const m = new metricType({ name : s, help : `${subsystem}.${key}`, labelNames : labelNames, registers : [ registry ] }); metrics.set(key, m); } } return metrics; }; - const commonLabels = function(both, fn) - { - for (const roomId of rooms.keys()) - { - for (const [ peerId, peer ] of peers) - { - if (fn(peer)) - { + const commonLabels = function(both, fn) { + for (const roomId of rooms.keys()) { + for (const [ peerId, peer ] of peers) { + if (fn(peer)) { const displayName = peer._displayName; const userAgent = peer._socket.client.request.headers['user-agent']; const kind = both.kind; const codec = both.rtpParameters.codecs[0].mimeType.split('/')[1]; return { roomId, peerId, displayName, userAgent, kind, codec }; } } } throw new Error('cannot find common labels'); }; - const addr = async function(ip, port) - { - if (config.deidentify) - { + const addr = async function(ip, port) { + if (config.deidentify) { const a = ip.split('.'); - for (let i = 0; i < a.length - 2; i++) - { + for (let i = 0; i < a.length - 2; i++) { a[i] = 'xx'; } return `${a.join('.')}:${port}`; - } - else if (config.numeric) - { + } else if (config.numeric) { return `${ip}:${port}`; - } - else - { - try - { + } else { + try { const a = await resolver.reverse(ip); ip = a[0]; - } - catch (err) - { + } catch (err) { logger.error(`reverse DNS query failed: ${ip} ${err.code}`); } return `${ip}:${port}`; } }; - const quiet = function(s) - { + const quiet = function(s) { return config.quiet ? '' : s; }; - const setValue = function(key, m, labels, v) - { + const setValue = function(key, m, labels, v) { logger.debug(`setValue key=${key} v=${v}`); - switch (metadata[key].metricType) - { - case prom.Counter: - m.inc(labels, v); - break; - case prom.Gauge: - m.set(labels, v); - break; - default: - throw new Error(`unexpected metric: ${m}`); + switch (metadata[key].metricType) { + case prom.Counter: + m.inc(labels, v); + break; + case prom.Gauge: + m.set(labels, v); + break; + default: + throw new Error(`unexpected metric: ${m}`); } }; logger.debug('collect'); const mRooms = new prom.Gauge({ name: 'kolabmeet_rooms', help: '#rooms', registers: [ registry ] }); mRooms.set(rooms.size); const mPeers = new prom.Gauge({ name: 'kolabmeet_peers', help: '#peers', labelNames: [ 'room_id' ], registers: [ registry ] }); - for (const [ roomId, room ] of rooms) - { + for (const [ roomId, room ] of rooms) { mPeers.labels(roomId).set(Object.keys(room._peers).length); } const mConsumer = newMetrics('consumer'); const mProducer = newMetrics('producer'); - for (const [ pid, worker ] of workers) - { + for (const [ pid, worker ] of workers) { logger.debug(`visiting worker ${pid}`); - for (const router of worker._routers) - { + for (const router of worker._routers) { logger.debug(`visiting router ${router.id}`); - for (const [ transportId, transport ] of router._transports) - { + for (const [ transportId, transport ] of router._transports) { logger.debug(`visiting transport ${transportId}`); const transportJson = await transport.dump(); - if (transportJson.iceState != 'completed') - { + if (transportJson.iceState != 'completed') { logger.debug(`skipping transport ${transportId}}: ${transportJson.iceState}`); continue; } const iceSelectedTuple = transportJson.iceSelectedTuple; const proto = iceSelectedTuple.protocol; const localAddr = await addr(iceSelectedTuple.localIp, iceSelectedTuple.localPort); const remoteAddr = await addr(iceSelectedTuple.remoteIp, iceSelectedTuple.remotePort); - for (const [ producerId, producer ] of transport._producers) - { + for (const [ producerId, producer ] of transport._producers) { logger.debug(`visiting producer ${producerId}`); const { roomId, peerId, displayName, userAgent, kind, codec } = commonLabels(producer, (peer) => peer._producers.has(producerId)); const a = await producer.getStats(); - for (const x of a) - { + for (const x of a) { const type = x.type; const labels = { 'pid' : pid, 'room_id' : roomId, 'peer_id' : peerId, 'display_name' : displayName, 'user_agent' : userAgent, 'transport_id' : quiet(transportId), 'proto' : proto, 'local_addr' : localAddr, 'remote_addr' : remoteAddr, 'id' : quiet(producerId), 'kind' : kind, 'codec' : codec, 'type' : type }; - for (const [ key, m ] of mProducer) - { + for (const [ key, m ] of mProducer) { setValue(key, m, labels, x[key]); } } } - for (const [ consumerId, consumer ] of transport._consumers) - { + for (const [ consumerId, consumer ] of transport._consumers) { logger.debug(`visiting consumer ${consumerId}`); const { roomId, peerId, displayName, userAgent, kind, codec } = commonLabels(consumer, (peer) => peer._consumers.has(consumerId)); const a = await consumer.getStats(); - for (const x of a) - { - if (x.type == 'inbound-rtp') - { + for (const x of a) { + if (x.type == 'inbound-rtp') { continue; } const type = x.type; const labels = { 'pid' : pid, 'room_id' : roomId, 'peer_id' : peerId, 'display_name' : displayName, 'user_agent' : userAgent, 'transport_id' : quiet(transportId), 'proto' : proto, 'local_addr' : localAddr, 'remote_addr' : remoteAddr, 'id' : quiet(consumerId), 'kind' : kind, 'codec' : codec, 'type' : type }; - for (const [ key, m ] of mConsumer) - { + for (const [ key, m ] of mConsumer) { setValue(key, m, labels, x[key]); } } } } } } }; - try - { + try { logger.debug(`config.deidentify=${config.deidentify}`); logger.debug(`config.listen=${config.listen}`); logger.debug(`config.numeric=${config.numeric}`); logger.debug(`config.port=${config.port}`); logger.debug(`config.quiet=${config.quiet}`); - mediasoup.observer.on('newworker', (worker) => - { + mediasoup.observer.on('newworker', (worker) => { logger.debug(`observing newworker ${worker.pid} #${workers.size}`); workers.set(worker.pid, worker); - worker.observer.on('close', () => - { + worker.observer.on('close', () => { logger.debug(`observing close worker ${worker.pid} #${workers.size - 1}`); workers.delete(worker.pid); }); }); const app = express(); - app.get('/', async (req, res) => - { + app.get('/', async (req, res) => { logger.debug(`GET ${req.originalUrl}`); const registry = new prom.Registry(); await collect(registry); res.set('Content-Type', registry.contentType); const data = registry.metrics(); res.end(data); }); const server = app.listen(config.port || 8889, - config.listen || undefined, () => - { + config.listen || undefined, () => { const address = server.address(); logger.info(`listening ${address.address}:${address.port}`); }); - } - catch (err) - { + } catch (err) { logger.error(err); } }; diff --git a/meet/server/server.js b/meet/server/server.js index 17b7abc5..e320ac77 100755 --- a/meet/server/server.js +++ b/meet/server/server.js @@ -1,423 +1,399 @@ #!/usr/bin/env node process.title = 'kolabmeet-server'; const config = require('./config/config'); const fs = require('fs'); const http = require('http'); const spdy = require('spdy'); const express = require('express'); const bodyParser = require('body-parser'); const cookieParser = require('cookie-parser'); const compression = require('compression'); const mediasoup = require('mediasoup'); const AwaitQueue = require('awaitqueue'); const Logger = require('./lib/Logger'); const Room = require('./lib/Room'); const Peer = require('./lib/Peer'); const helmet = require('helmet'); // auth const redis = require('redis'); const expressSession = require('express-session'); const RedisStore = require('connect-redis')(expressSession); const sharedSession = require('express-socket.io-session'); const interactiveServer = require('./lib/interactiveServer'); const promExporter = require('./lib/promExporter'); const { v4: uuidv4 } = require('uuid'); /* eslint-disable no-console */ console.log('- process.env.DEBUG:', process.env.DEBUG); console.log('- config.mediasoup.worker.logLevel:', config.mediasoup.worker.logLevel); console.log('- config.mediasoup.worker.logTags:', config.mediasoup.worker.logTags); /* eslint-enable no-console */ const logger = new Logger(); const queue = new AwaitQueue(); let statusLogger = null; if ('StatusLogger' in config) statusLogger = new config.StatusLogger(); // mediasoup Workers. // @type {Array} const mediasoupWorkers = []; // Map of Room instances indexed by roomId. const rooms = new Map(); // Map of Peer instances indexed by peerId. const peers = new Map(); // TLS server configuration. const tls = { cert : fs.readFileSync(config.tls.cert), key : fs.readFileSync(config.tls.key), secureOptions : 'tlsv12', ciphers : [ 'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-CHACHA20-POLY1305', 'ECDHE-RSA-CHACHA20-POLY1305', 'DHE-RSA-AES128-GCM-SHA256', 'DHE-RSA-AES256-GCM-SHA384' ].join(':'), honorCipherOrder : true }; const app = express(); app.use(helmet.hsts()); const sharedCookieParser=cookieParser(); app.use(sharedCookieParser); app.use(bodyParser.json({ limit: '5mb' })); app.use(bodyParser.urlencoded({ limit: '5mb', extended: true })); const session = expressSession({ secret : config.cookieSecret, name : config.cookieName, resave : true, saveUninitialized : true, store : config.redisOptions.host != 'none' ? new RedisStore({ client: redis.createClient(config.redisOptions) }) : null, cookie : { secure : true, httpOnly : true, maxAge : 60 * 60 * 1000 // Expire after 1 hour since last request from user } }); -if (config.trustProxy) -{ +if (config.trustProxy) { app.set('trust proxy', config.trustProxy); } app.use(session); let mainListener; let io; -async function run() -{ - try - { +async function run() { + try { // Open the interactive server. await interactiveServer(rooms, peers); // start Prometheus exporter - if (config.prometheus) - { + if (config.prometheus) { await promExporter(rooms, peers, config.prometheus); } // Run a mediasoup Worker. await runMediasoupWorkers(); // Run HTTPS server. await runHttpsServer(); // Run WebSocketServer. await runWebSocketServer(); // eslint-disable-next-line no-unused-vars - const errorHandler = (err, req, res, next) => - { + const errorHandler = (err, req, res, next) => { const trackingId = uuidv4(); res.status(500).send( `

Internal Server Error

If you report this error, please also report this tracking ID which makes it possible to locate your session in the logs which are available to the system administrator: ${trackingId}

` ); logger.error( 'Express error handler dump with tracking ID: %s, error dump: %o', trackingId, err); }; // eslint-disable-next-line no-unused-vars app.use(errorHandler); - } - catch (error) - { + } catch (error) { logger.error('run() [error:"%o"]', error); } app.emit('ready'); } -function statusLog() -{ - if (statusLogger) - { +function statusLog() { + if (statusLogger) { statusLogger.log({ rooms : rooms, peers : peers }); } } -async function runHttpsServer() -{ +async function runHttpsServer() { app.use(compression()); app.get(`${config.pathPrefix}/api/ping`, function (req, res, /*next*/) { res.send('PONG') }) app.get(`${config.pathPrefix}/api/sessions`, function (req, res, /*next*/) { //TODO json.stringify res.json({ - id : "testId" - }) + id : "testId" + }) }) //Check if the room exists app.get(`${config.pathPrefix}/api/sessions/:session_id`, function (req, res, /*next*/) { console.warn("Checking for room") let room = rooms.get(req.params.session_id); if (!room) { console.warn("doesn't exist") res.status(404).send() } else { console.warn("exist") res.status(200).send() } }) // Create room and return id app.post(`${config.pathPrefix}/api/sessions`, async function (req, res, /*next*/) { console.warn("Creating new room", req.body.mediaMode, req.body.recordingMode) //FIXME we're truncating because of kolab4 database layout (should be fixed instead) const roomId = uuidv4().substring(0, 16) await getOrCreateRoom({ roomId }); res.json({ - id : roomId - }) + id : roomId + }) }) app.post(`${config.pathPrefix}/api/signal`, async function (req, res, /*next*/) { let data = req.body; const roomId = data.session; // const signalType = data.type; // const payload = data.data; const peers = data.to; if (peers) { for (const peerId of peers) { let peer = peers.get(peerId); peer.socket.emit( 'signal', data ); } } else { io.to(roomId).emit( 'signal', data ); } res.json({}) }); // Create connection in room (just wait for websocket instead? // $post = [ // 'json' => [ // 'role' => self::OV_ROLE_PUBLISHER, // 'data' => json_encode(['role' => $role]) // ] // ]; app.post(`${config.pathPrefix}/api/sessions/:session_id/connection`, function (req, res, /*next*/) { console.warn("Creating connection in session", req.params.session_id) let roomId = req.params.session_id let data = req.body; //FIXME we're truncating because of kolab4 database layout (should be fixed instnead) const peerId = uuidv4().substring(0, 16) //TODO create room already? let peer = new Peer({ id: peerId, roomId }); peers.set(peerId, peer); peer.on('close', () => { peers.delete(peerId); statusLog(); }); peer.nickname = "Display Name"; // peer.picture = picture; peer.email = "email@test.com"; if ('role' in data) peer.setRole(data.role); const proto = config.publicDomain.includes('localhost') || config.publicDomain.includes('127.0.0.1') ? 'ws' : 'wss'; res.json({ id: peerId, // When the below get's passed to the socket.io client we end up with something like (depending on the socket.io path) // wss://${publicDomain}/meetmedia/signaling/?peerId=peer1&roomId=room1&EIO=3&transport=websocket, token: `${proto}://${config.publicDomain}/?peerId=${peerId}&roomId=${roomId}` }) }) - if (config.httpOnly === true) - { + if (config.httpOnly === true) { // http mainListener = http.createServer(app); - } - else - { + } else { // https mainListener = spdy.createServer(tls, app); // http const redirectListener = http.createServer(app); if (config.listeningHost) redirectListener.listen(config.listeningRedirectPort, config.listeningHost); else redirectListener.listen(config.listeningRedirectPort); } console.info(`Listening on ${config.listeningPort} ${config.listeningHost}`) // https or http if (config.listeningHost) mainListener.listen(config.listeningPort, config.listeningHost); else mainListener.listen(config.listeningPort); } /** * Create a WebSocketServer to allow WebSocket connections from browsers. */ -async function runWebSocketServer() -{ +async function runWebSocketServer() { io = require('socket.io')(mainListener, { path: `${config.pathPrefix}/signaling`, cookie: false }); io.use( sharedSession(session, sharedCookieParser, { autoSave: true }) ); // Handle connections from clients. - io.on('connection', (socket) => - { + io.on('connection', (socket) => { logger.info("websocket connection") const { roomId, peerId } = socket.handshake.query; - if (!roomId || !peerId) - { + if (!roomId || !peerId) { logger.warn('connection request without roomId and/or peerId'); socket.disconnect(true); return; } logger.info( 'connection request [roomId:"%s", peerId:"%s"]', roomId, peerId); - queue.push(async () => - { + queue.push(async () => { const room = await getOrCreateRoom({ roomId }); let peer = peers.get(peerId); if (!peer) { logger.warn("Peer does not exist %s", peerId); socket.disconnect(true); return; } peer.socket = socket; room.handlePeer({ peer }); statusLog(); }) - .catch((error) => - { + .catch((error) => { logger.error('room creation or room joining failed [error:"%o"]', error); if (socket) socket.disconnect(true); return; }); }); } /** * Launch as many mediasoup Workers as given in the configuration file. */ -async function runMediasoupWorkers() -{ +async function runMediasoupWorkers() { const { numWorkers } = config.mediasoup; logger.info('running %d mediasoup Workers...', numWorkers); - for (let i = 0; i < numWorkers; ++i) - { + for (let i = 0; i < numWorkers; ++i) { const worker = await mediasoup.createWorker( { logLevel : config.mediasoup.worker.logLevel, logTags : config.mediasoup.worker.logTags, rtcMinPort : config.mediasoup.worker.rtcMinPort, rtcMaxPort : config.mediasoup.worker.rtcMaxPort }); - worker.on('died', () => - { + worker.on('died', () => { logger.error( 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); mediasoupWorkers.push(worker); } } /** * Get a Room instance (or create one if it does not exist). */ -async function getOrCreateRoom({ roomId }) -{ +async function getOrCreateRoom({ roomId }) { let room = rooms.get(roomId); // If the Room does not exist create a new one. - if (!room) - { + if (!room) { logger.info('creating a new Room [roomId:"%s"]', roomId); room = await Room.create({ mediasoupWorkers, roomId, peers }); rooms.set(roomId, room); statusLog(); - room.on('close', () => - { + room.on('close', () => { rooms.delete(roomId); statusLog(); }); } return room; } run(); module.exports = app; // export for testing diff --git a/meet/server/test.js b/meet/server/test.js index 56543a16..e35cd12c 100755 --- a/meet/server/test.js +++ b/meet/server/test.js @@ -1,82 +1,79 @@ #!/usr/bin/env node const io = require("socket.io-client"); const axios = require('axios') const roomId = "room1"; axios - .post('http://127.0.0.1:12443/api/sessions/${roomId}/connection', { + .post('http://127.0.0.1:12443/api/sessions/${roomId}/connection', { // todo: 'Buy the milk' - }) - .then(res => { - console.log(`statusCode: ${res.status}`) - // console.log(res) - // - let data = res.data; - console.log(data) - const peerId = data['id']; - - const _signalingUrl = `ws://127.0.0.1:12443/?peerId=${peerId}&roomId=${roomId}`; - console.warn(`${_signalingUrl}`); - - let _signalingSocket = io(_signalingUrl, { transports: ["websocket"], rejectUnauthorized: false }); - - _signalingSocket.on('connect', () => - { - console.debug('signaling Peer "connect" event'); - _signalingSocket.emit("hello", { a: "b", c: [] }); - - axios - .post('http://127.0.0.1:12443/api/signal', { - session: roomId, - type: "sometype", - data: { - }, - //optional - // 'to' => [$connections] - }) - .then(res => { - console.log(`statusCode: ${res.status}`) - }); + }) + .then(res => { + console.log(`statusCode: ${res.status}`) + // console.log(res) + // + let data = res.data; + console.log(data) + const peerId = data['id']; + + const _signalingUrl = `ws://127.0.0.1:12443/?peerId=${peerId}&roomId=${roomId}`; + console.warn(`${_signalingUrl}`); + + let _signalingSocket = io(_signalingUrl, { transports: ["websocket"], rejectUnauthorized: false }); + + _signalingSocket.on('connect', () => { + console.debug('signaling Peer "connect" event'); + _signalingSocket.emit("hello", { a: "b", c: [] }); + + axios + .post('http://127.0.0.1:12443/api/signal', { + session: roomId, + type: "sometype", + data: { + }, + //optional + // 'to' => [$connections] + }) + .then(res => { + console.log(`statusCode: ${res.status}`) + }); - }); + }); - _signalingSocket.on('disconnect', (reason) => - { - console.warn('signaling Peer "disconnect" event [reason:"%s"]', reason); - }); + _signalingSocket.on('disconnect', (reason) => { + console.warn('signaling Peer "disconnect" event [reason:"%s"]', reason); + }); - _signalingSocket.on('signal', (reason) => - { - console.warn('Received signal "%s"', reason); - }); + _signalingSocket.on('signal', (reason) => { + console.warn('Received signal "%s"', reason); + }); - _signalingSocket.on("error", (error) => { - console.warn('error %s', error); - }); + _signalingSocket.on("error", (error) => { + console.warn('error %s', error); + }); - _signalingSocket.on("reconnect_failed", () => { - console.warn('reconnect failed'); - }); - //_signalingSocket.connect(); + _signalingSocket.on("reconnect_failed", () => { + console.warn('reconnect failed'); + }); + //_signalingSocket.connect(); - console.warn('done'); + console.warn('done'); - }) - .catch(error => { - console.error(error) - }) + }) + .catch(error => { + console.error(error) + }) //const delay = ms => new Promise(resolve => setTimeout(resolve, ms)) //await delay(1000) /// waiting 1 second. //console.warn('done done');