diff --git a/meet/server/.eslintrc.json b/meet/server/.eslintrc.json index 46be4d9f..9cc10d24 100644 --- a/meet/server/.eslintrc.json +++ b/meet/server/.eslintrc.json @@ -1,177 +1,24 @@ { "env": { "es6": true, "node": true }, "extends": [ "eslint:recommended" ], "settings": {}, "parserOptions": { "ecmaVersion": 2018, "sourceType": "module", "ecmaFeatures": { "impliedStrict": true } }, - "rules": - { - "array-bracket-spacing": [ 2, "always", - { - "objectsInArrays": true, - "arraysInArrays": true - }], - "arrow-parens": [ 2, "always" ], - "arrow-spacing": 2, - "block-spacing": [ 2, "always" ], - "brace-style": [ 2, "allman", { "allowSingleLine": true } ], - "camelcase": 2, - "comma-dangle": 2, - "comma-spacing": [ 2, { "before": false, "after": true } ], - "comma-style": 2, - "computed-property-spacing": 2, - "constructor-super": 2, - "func-call-spacing": 2, - "generator-star-spacing": 2, - "guard-for-in": 2, - "indent": [ 4, "space", { "SwitchCase": 1 } ], - "key-spacing": [ 2, - { - "singleLine": - { - "beforeColon": false, - "afterColon": true - }, - "multiLine": - { - "beforeColon": true, - "afterColon": true, - "align": "colon" - } - }], - "keyword-spacing": 2, - "linebreak-style": [ 2, "unix" ], - "lines-around-comment": [ 2, - { - "allowBlockStart": true, - "allowObjectStart": true, - "beforeBlockComment": true, - "beforeLineComment": false - }], - "max-len": [ 2, 90, - { - "tabWidth": 2, - "comments": 90, - "ignoreUrls": true, - "ignoreStrings": true, - "ignoreTemplateLiterals": true, - "ignoreRegExpLiterals": true - }], - "newline-after-var": 2, - "newline-before-return": 2, - "newline-per-chained-call": 2, - "no-alert": 2, - "no-caller": 2, - "no-case-declarations": 2, - "no-catch-shadow": 2, - "no-class-assign": 2, - "no-confusing-arrow": 2, - "no-console": 2, - "no-const-assign": 2, - "no-debugger": 2, - "no-dupe-args": 2, - "no-dupe-keys": 2, - "no-duplicate-case": 2, - "no-div-regex": 2, - "no-empty": [ 2, { "allowEmptyCatch": true } ], - "no-empty-pattern": 2, - "no-else-return": 0, - "no-eval": 2, - "no-extend-native": 2, - "no-ex-assign": 2, - "no-extra-bind": 2, - "no-extra-boolean-cast": 2, - "no-extra-label": 2, - "no-extra-semi": 2, - "no-fallthrough": 2, - "no-func-assign": 2, - "no-global-assign": 2, - "no-implicit-coercion": 2, - "no-implicit-globals": 2, - "no-inner-declarations": 2, - "no-invalid-regexp": 2, - "no-invalid-this": 2, - "no-irregular-whitespace": 2, - "no-trailing-spaces": [ - "error", - { - "ignoreComments": true - } - ], - "no-lonely-if": 2, - "no-mixed-operators": 2, - "no-mixed-spaces-and-tabs": 2, - "no-multi-spaces": 2, - "no-multi-str": 2, - "no-multiple-empty-lines": [ 1, { "max": 1, "maxEOF": 0, "maxBOF": 0 } ], - "no-native-reassign": 2, - "no-negated-in-lhs": 2, - "no-new": 2, - "no-new-func": 2, - "no-new-wrappers": 2, - "no-obj-calls": 2, - "no-proto": 2, - "no-prototype-builtins": 0, - "no-redeclare": 2, - "no-regex-spaces": 2, - "no-restricted-imports": 2, - "no-return-assign": 2, - "no-self-assign": 2, - "no-self-compare": 2, - "no-sequences": 2, - "no-shadow": 2, - "no-shadow-restricted-names": 2, - "no-spaced-func": 2, - "no-sparse-arrays": 2, - "no-this-before-super": 2, - "no-throw-literal": 2, - "no-undef": 2, - "no-unexpected-multiline": 2, - "no-unmodified-loop-condition": 2, - "no-unreachable": 2, - "no-unused-vars": [ 1, { "vars": "all", "args": "after-used" }], - "no-use-before-define": [ 2, { "functions": false } ], - "no-useless-call": 2, - "no-useless-computed-key": 2, - "no-useless-concat": 2, - "no-useless-rename": 2, - "no-var": 2, - "no-whitespace-before-property": 2, - "object-curly-newline": 0, - "object-curly-spacing": [ 2, "always" ], - "object-property-newline": [ 2, { "allowMultiplePropertiesPerLine": true } ], - "prefer-const": 2, - "prefer-rest-params": 2, - "prefer-spread": 2, - "prefer-template": 2, - "quotes": [ 2, "single", { "avoidEscape": true } ], - "semi": [ 2, "always" ], - "semi-spacing": 2, - "space-before-blocks": 2, - "space-before-function-paren": [ 2, - { - "anonymous" : "never", - "named" : "never", - "asyncArrow" : "always" - }], - "space-in-parens": [ 2, "never" ], - "spaced-comment": [ 2, "always" ], - "strict": 2, - "valid-typeof": 2, - "yoda": 2 - } -} \ No newline at end of file + "rules": { + + } +} diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index 5d222e93..0f117952 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1408 +1,1410 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); const axios = require('axios'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const { v4: uuidv4 } = require('uuid'); const jwt = require('jsonwebtoken'); const Roles = require('../userRoles'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter { static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); for (const peer of peers.values()) { const routerId = peer.routerId; if (routerId) { if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); } else { routerLoads.set(routerId, 1); } } } for (const worker of mediasoupWorkers) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } else { // find if there is a piped router that is on a worker that is below limit for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ static async create({ mediasoupWorkers, roomId, peers }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; this._allPeers = peers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); // Locked flag. this._locked = false; this._chatHistory = []; this._fileHistory = []; this._lastN = []; this._peers = {}; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; // mediasoup AudioLevelObserver. this._audioLevelObserver = audioLevelObserver; // Current active speaker. this._currentActiveSpeaker = null; this._handleAudioLevelObserver(); } isLocked() { return this._locked; } close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; this._chatHistory = null; this._fileHistory = null; // Close the peers. for (const peer in this._peers) { if (!this._peers[peer].closed) this._peers[peer].close(); } this._peers = null; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._allPeers = null; this._mediasoupWorkers = null; this._mediasoupRouters.clear(); this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } handlePeer({ peer, returning }) { logger.info('handlePeer() [peer:"%s", role:%s, returning:"%s"]', peer.id, peer.role, returning); // Should not happen if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } // Returning user if (returning) this._peerJoining(peer, true); else this._peerJoining(peer); } _handleOverRoomLimit(peer) { this._notification(peer.socket, 'overRoomLimit'); } _handleGuest(peer) { if (config.activateOnHostJoin && !this.checkEmpty()) this._peerJoining(peer); else { this._parkPeer(peer); this._notification(peer.socket, 'signInRequired'); } } _handleAudioLevelObserver() { /* // Set audioLevelObserver events. this._audioLevelObserver.on('volumes', (volumes) => { const { producer, volume } = volumes[0]; // Notify all Peers. for (const peer of this.getPeers()) { this._notification( peer.socket, 'activeSpeaker', { peerId : producer.appData.peerId, volume : volume }); } }); this._audioLevelObserver.on('silence', () => { // Notify all Peers. for (const peer of this.getPeers()) { this._notification( peer.socket, 'activeSpeaker', { peerId: null } ); } }); */ } logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } get id() { return this._roomId; } selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } checkEmpty() { return Object.keys(this._peers).length === 0; } _peerJoining(peer, returning = false) { this._queue.push(async () => { peer.socket.join(this._roomId); // If we don't have this peer, add to end !this._lastN.includes(peer.id) && this._lastN.push(peer.id); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); if (returning) { this._notification(peer.socket, 'roomBack'); } else { const token = jwt.sign({ id: peer.id }, this._uuid, { noTimestamp: true }); peer.socket.handshake.session.token = token; peer.socket.handshake.session.save(); let turnServers; if ('turnAPIURI' in config) { try { const { data } = await axios.get( config.turnAPIURI, { timeout : config.turnAPITimeout || 2000, params : { ...config.turnAPIparams, 'api_key' : config.turnAPIKey, 'ip' : peer.socket.request.connection.remoteAddress } }); turnServers = [ { urls : data.uris, username : data.username, credential : data.password } ]; } catch (error) { if ('backupTurnServers' in config) turnServers = config.backupTurnServers; logger.error('_peerJoining() | error on REST turn [error:"%o"]', error); } } else if ('backupTurnServers' in config) { turnServers = config.backupTurnServers; } this._notification(peer.socket, 'roomReady', { turnServers }); } }) .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); peer.on('close', () => { this._handlePeerClose(peer); }); peer.on('nicknameChanged', () => { // Spread to others this._notification(peer.socket, 'changeNickname', { peerId: peer.id, nickame: peer.nickname }, true); }); peer.on('pictureChanged', () => { // Spread to others this._notification(peer.socket, 'changePicture', { peerId : peer.id, picture : peer.picture }, true); }); peer.on('gotRole', ({ newRole }) => { // Spread to others this._notification(peer.socket, 'gotRole', { peerId: peer.id, role: newRole }, true, true); }); peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); // Remove from lastN this._lastN = this._lastN.filter((id) => id !== peer.id); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); console.log(request.method); switch (request.method) { case 'getRouterRtpCapabilities': { cb(null, router.rtpCapabilities); break; } case 'join': { const { nickname, picture, rtpCapabilities } = request.data; // Store client data into the Peer data object. peer.nickname = nickname; peer.picture = picture; peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. // And also create Consumers for existing Producers. const joinedPeers = this.getPeers(peer); const peerInfos = joinedPeers .map((joinedPeer) => (joinedPeer.peerInfo)); cb(null, { id: peer.id, role: peer.role, peers: peerInfos, //chatHistory : this._chatHistory, //fileHistory : this._fileHistory, //lastNHistory : this._lastN, //locked : this._locked, }); for (const joinedPeer of joinedPeers) { // Create Consumers for existing Producers. for (const producer of joinedPeer.producers.values()) { this._createConsumer( { consumerPeer : peer, producerPeer : joinedPeer, producer }); } } // Notify the new Peer to all other Peers. for (const otherPeer of this.getPeers(peer)) { this._notification( otherPeer.socket, 'newPeer', peer.peerInfo ); } logger.debug( 'peer joined [peer: "%s", nickname: "%s", picture: "%s"]', peer.id, nickname, picture); break; } case 'createWebRtcTransport': { // NOTE: Don't require that the Peer is joined here, so the client can // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; webRtcTransportOptions.enableTcp = true; if (forceTcp) webRtcTransportOptions.enableUdp = false; else { webRtcTransportOptions.enableUdp = true; webRtcTransportOptions.preferUdp = true; } const transport = await router.createWebRtcTransport( webRtcTransportOptions ); - transport.on('dtlsstatechange', (dtlsState) => - { - if (dtlsState === 'failed' || dtlsState === 'closed') + transport.on('dtlsstatechange', (dtlsState) => { + if (dtlsState === 'failed' || dtlsState === 'closed') { logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); + } }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters }); const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } - catch (error) {} + catch (error) { + logger.info("Setting the incoming bitrate failed") + } } break; } case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ dtlsParameters }); cb(); break; } case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } case 'produce': { let { appData } = request.data; if ( !appData.source || ![ 'mic', 'webcam', 'screen', 'extravideo' ] .includes(appData.source) ) throw new Error('invalid producer source'); if ( appData.source === 'mic' && !this._hasPermission(peer, Roles.PUBLISHER) ) throw new Error('peer not authorized'); if ( appData.source === 'webcam' && !this._hasPermission(peer, Roles.PUBLISHER) ) throw new Error('peer not authorized'); if ( appData.source === 'screen' && !this._hasPermission(peer, Roles.PUBLISHER) ) throw new Error('peer not authorized'); const { transportId, kind, rtpParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); // Add peerId into appData to later get the associated Peer during // the 'loudest' event of the audioLevelObserver. appData = { ...appData, peerId: peer.id }; const producer = await transport.produce({ kind, rtpParameters, appData }); const pipeRouters = this._getRoutersToPipeTo(peer.routerId); for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, router : destinationRouter }); } } // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); /* // Set Producer events. producer.on('score', (score) => { this._notification(peer.socket, 'producerScore', { producerId: producer.id, score }); }); */ producer.on('videoorientationchange', (videoOrientation) => { logger.debug( 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); cb(null, { id: producer.id }); // Optimization: Create a server-side Consumer for each Peer. for (const otherPeer of this.getPeers(peer)) { this._createConsumer( { consumerPeer : otherPeer, producerPeer : peer, producer }); } // Add into the audioLevelObserver. if (kind === 'audio') { this._audioLevelObserver.addProducer({ producerId: producer.id }) .catch(() => {}); } break; } case 'closeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); producer.close(); // Remove from its map. peer.removeProducer(producer.id); cb(); break; } case 'pauseProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.pause(); cb(); break; } case 'resumeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.resume(); cb(); break; } case 'pauseConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.pause(); cb(); break; } case 'resumeConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.resume(); cb(); break; } /* case 'requestConsumerKeyFrame': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.requestKeyFrame(); cb(); break; } case 'getTransportStats': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const stats = await transport.getStats(); cb(null, stats); break; } case 'getProducerStats': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); const stats = await producer.getStats(); cb(null, stats); break; } case 'getConsumerStats': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); const stats = await consumer.getStats(); cb(null, stats); break; } */ case 'changeNickname': { const { nickname } = request.data; peer.nickname = nickname; // This will be spread through events from the peer object // Return no error cb(); break; } /* case 'changePicture': { const { picture } = request.data; peer.picture = picture; // Spread to others this._notification(peer.socket, 'changePicture', { peerId : peer.id, picture : picture }, true); // Return no error cb(); break; } */ case 'chatMessage': { const { chatMessage } = request.data; this._chatHistory.push(chatMessage); // Spread to others this._notification(peer.socket, 'chatMessage', { peerId : peer.id, chatMessage : chatMessage }, true); // Return no error cb(); break; } case 'moderator:setRole': { if (!this._hasPermission(peer, Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const giveRolePeer = this._peers[peerId]; if (!giveRolePeer) throw new Error(`peer with id "${peerId}" not found`); // TODO: check if role is valid value // This will propagate the event automatically giveRolePeer.setRole(role); // Return no error cb(); break; } /* case 'moderator:clearChat': { if (!this._hasPermission(peer, Roles.MODERATOR)) throw new Error('peer not authorized'); this._chatHistory = []; // Spread to others this._notification(peer.socket, 'moderator:clearChat', null, true); // Return no error cb(); break; } */ case 'raisedHand': { const { raisedHand } = request.data; peer.raisedHand = raisedHand; // Spread to others this._notification(peer.socket, 'raisedHand', { peerId : peer.id, raisedHand : raisedHand, raisedHandTimestamp : peer.raisedHandTimestamp }, true); // Return no error cb(); break; } case 'moderator:closeMeeting': { if (!this._hasPermission(peer, Roles.MODERATOR)) throw new Error('peer not authorized'); this._notification(peer.socket, 'moderator:kick', null, true); cb(); // Close the room this.close(); break; } /* case 'moderator:kickPeer': { if (!this._hasPermission(peer, Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const kickPeer = this._peers[peerId]; if (!kickPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(kickPeer.socket, 'moderator:kick'); kickPeer.close(); cb(); break; } case 'moderator:lowerHand': { if (!this._hasPermission(peer, Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const lowerPeer = this._peers[peerId]; if (!lowerPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(lowerPeer.socket, 'moderator:lowerHand'); cb(); break; } */ default: { logger.error('unknown request.method "%s"', request.method); cb(500, `unknown request.method "${request.method}"`); } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } _hasPermission(peer, role) { return !!(peer.role & role); } /** * Get the list of peers. */ getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } _timeoutCallback(callback) { let called = false; const interval = setTimeout( () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } _sendRequest(socket, method, data = {}) { return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, this._timeoutCallback((err, response) => { if (err) { reject(err); } else { resolve(response); } }) ); }); } async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; for (let tries = 0; tries < requestRetries; tries++) { try { return await this._sendRequest(socket, method, data); } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } _notification(socket, method, data = {}, broadcast = false, includeSender = false) { if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); } else { socket.emit('notification', { method, data }); } } async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); for (const producerId of peer.producers.keys()) { if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._allPeers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/meet/server/lib/errors.js b/meet/server/lib/errors.js index 4dea9226..a353a6ed 100644 --- a/meet/server/lib/errors.js +++ b/meet/server/lib/errors.js @@ -1,22 +1,23 @@ /** * Error produced when a socket request has a timeout. */ class SocketTimeoutError extends Error { constructor(message) { super(message); this.name = 'SocketTimeoutError'; + // eslint-disable-next-line no-prototype-builtins if (Error.hasOwnProperty('captureStackTrace')) // Just in V8. Error.captureStackTrace(this, SocketTimeoutError); else this.stack = (new Error(message)).stack; } } module.exports = { SocketTimeoutError -}; \ No newline at end of file +}; diff --git a/meet/server/lib/interactiveServer.js b/meet/server/lib/interactiveServer.js index d58352db..379390e2 100644 --- a/meet/server/lib/interactiveServer.js +++ b/meet/server/lib/interactiveServer.js @@ -1,688 +1,693 @@ const os = require('os'); const path = require('path'); const repl = require('repl'); const readline = require('readline'); const net = require('net'); const fs = require('fs'); const mediasoup = require('mediasoup'); const colors = require('colors/safe'); const pidusage = require('pidusage'); const SOCKET_PATH_UNIX = '/tmp/edumeet-server.sock'; const SOCKET_PATH_WIN = path.join('\\\\?\\pipe', process.cwd(), 'edumeet-server'); const SOCKET_PATH = os.platform() === 'win32' ? SOCKET_PATH_WIN : SOCKET_PATH_UNIX; // Maps to store all mediasoup objects. const workers = new Map(); const routers = new Map(); const transports = new Map(); const producers = new Map(); const consumers = new Map(); const dataProducers = new Map(); const dataConsumers = new Map(); class Interactive { constructor(socket) { this._socket = socket; this._isTerminalOpen = false; } openCommandConsole() { const cmd = readline.createInterface( { input : this._socket, output : this._socket, terminal : true }); cmd.on('close', () => { if (this._isTerminalOpen) return; this.log('\nexiting...'); this._socket.end(); }); const readStdin = () => { cmd.question('cmd> ', async (input) => { const params = input.split(/[\s\t]+/); const command = params.shift(); switch (command) { case '': { readStdin(); break; } case 'h': case 'help': { this.log(''); this.log('available commands:'); this.log('- h, help : show this message'); this.log('- usage : show CPU and memory usage of the Node.js and mediasoup-worker processes'); this.log('- logLevel level : changes logLevel in all mediasoup Workers'); this.log('- logTags [tag] [tag] : changes logTags in all mediasoup Workers (values separated by space)'); this.log('- dumpRooms : dump all rooms'); this.log('- dumpPeers : dump all peers'); this.log('- dw, dumpWorkers : dump mediasoup Workers'); this.log('- dr, dumpRouter [id] : dump mediasoup Router with given id (or the latest created one)'); this.log('- dt, dumpTransport [id] : dump mediasoup Transport with given id (or the latest created one)'); this.log('- dp, dumpProducer [id] : dump mediasoup Producer with given id (or the latest created one)'); this.log('- dc, dumpConsumer [id] : dump mediasoup Consumer with given id (or the latest created one)'); this.log('- st, statsTransport [id] : get stats for mediasoup Transport with given id (or all)'); this.log('- sp, statsProducer [id] : get stats for mediasoup Producer with given id (or all)'); this.log('- sc, statsConsumer [id] : get stats for mediasoup Consumer with given id (or all)'); this.log('- ddp, dumpDataProducer [id] : dump mediasoup DataProducer with given id (or the latest created one)'); this.log('- ddc, dumpDataConsumer [id] : dump mediasoup DataConsumer with given id (or the latest created one)'); this.log('- sdp, statsDataProducer [id] : get stats for mediasoup DataProducer with given id (or the latest created one)'); this.log('- sdc, statsDataConsumer [id] : get stats for mediasoup DataConsumer with given id (or the latest created one)'); this.log('- t, terminal : open Node REPL Terminal'); this.log(''); readStdin(); break; } case 'u': case 'usage': { let usage = await pidusage(process.pid); this.log(`Node.js process [pid:${process.pid}]:\n${JSON.stringify(usage, null, ' ')}`); for (const worker of workers.values()) { usage = await pidusage(worker.pid); this.log(`mediasoup-worker process [pid:${worker.pid}]:\n${JSON.stringify(usage, null, ' ')}`); } break; } case 'logLevel': { const level = params[0]; const promises = []; for (const worker of workers.values()) { promises.push(worker.updateSettings({ logLevel: level })); } try { await Promise.all(promises); this.log('done'); } catch (error) { this.error(String(error)); } break; } case 'logTags': { const tags = params; const promises = []; for (const worker of workers.values()) { promises.push(worker.updateSettings({ logTags: tags })); } try { await Promise.all(promises); this.log('done'); } catch (error) { this.error(String(error)); } break; } case 'stats': { this.log(`rooms:${global.rooms.size}\npeers:${global.peers.size}`); break; } case 'dumpRooms': { for (const room of global.rooms.values()) { try { const dump = await room.dump(); this.log(`room.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`room.dump() failed: ${error}`); } } break; } case 'dumpPeers': { for (const peer of global.peers.values()) { try { const dump = await peer.peerInfo; this.log(`peer.peerInfo():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`peer.peerInfo() failed: ${error}`); } } break; } case 'dw': case 'dumpWorkers': { for (const worker of workers.values()) { try { const dump = await worker.dump(); this.log(`worker.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`worker.dump() failed: ${error}`); } } break; } case 'dr': case 'dumpRouter': { const id = params[0] || Array.from(routers.keys()).pop(); const router = routers.get(id); if (!router) { this.error('Router not found'); break; } try { const dump = await router.dump(); this.log(`router.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`router.dump() failed: ${error}`); } break; } case 'dt': case 'dumpTransport': { const id = params[0] || Array.from(transports.keys()).pop(); const transport = transports.get(id); if (!transport) { this.error('Transport not found'); break; } try { const dump = await transport.dump(); this.log(`transport.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`transport.dump() failed: ${error}`); } break; } case 'dp': case 'dumpProducer': { const id = params[0] || Array.from(producers.keys()).pop(); const producer = producers.get(id); if (!producer) { this.error('Producer not found'); break; } try { const dump = await producer.dump(); this.log(`producer.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`producer.dump() failed: ${error}`); } break; } case 'dc': case 'dumpConsumer': { const id = params[0] || Array.from(consumers.keys()).pop(); const consumer = consumers.get(id); if (!consumer) { this.error('Consumer not found'); break; } try { const dump = await consumer.dump(); this.log(`consumer.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`consumer.dump() failed: ${error}`); } break; } case 'ddp': case 'dumpDataProducer': { const id = params[0] || Array.from(dataProducers.keys()).pop(); const dataProducer = dataProducers.get(id); if (!dataProducer) { this.error('DataProducer not found'); break; } try { const dump = await dataProducer.dump(); this.log(`dataProducer.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`dataProducer.dump() failed: ${error}`); } break; } case 'ddc': case 'dumpDataConsumer': { const id = params[0] || Array.from(dataConsumers.keys()).pop(); const dataConsumer = dataConsumers.get(id); if (!dataConsumer) { this.error('DataConsumer not found'); break; } try { const dump = await dataConsumer.dump(); this.log(`dataConsumer.dump():\n${JSON.stringify(dump, null, ' ')}`); } catch (error) { this.error(`dataConsumer.dump() failed: ${error}`); } break; } case 'st': case 'statsTransport': { const list = params[0] ? [transports.get(params[0])] : transports.values(); for (const transport of list) { if (!transport) { this.error('Producer not found'); break; } try { const stats = await transport.getStats(); this.log(`transport.getStats():\n${JSON.stringify(stats, null, ' ')}`); } catch (error) { this.error(`transport.getStats() failed: ${error}`); } } break; } case 'sp': case 'statsProducer': { const list = params[0] ? [producers.get(params[0])] : producers.values(); for (const producer of list) { if (!producer) { this.error('Producer not found'); break; } try { const stats = await producer.getStats(); this.log(`producer.getStats():\n${JSON.stringify(stats, null, ' ')}`); } catch (error) { this.error(`producer.getStats() failed: ${error}`); } } break; } case 'sc': case 'statsConsumer': { const list = params[0] ? [consumers.get(params[0])] : consumers.values(); for (const consumer of list) { if (!consumer) { this.error('consumerr not found'); break; } try { const stats = await consumer.getStats(); this.log(`consumer.getStats():\n${JSON.stringify(stats, null, ' ')}`); } catch (error) { this.error(`consumer.getStats() failed: ${error}`); } } break; } case 'sdp': case 'statsDataProducer': { const id = params[0] || Array.from(dataProducers.keys()).pop(); const dataProducer = dataProducers.get(id); if (!dataProducer) { this.error('DataProducer not found'); break; } try { const stats = await dataProducer.getStats(); this.log(`dataProducer.getStats():\n${JSON.stringify(stats, null, ' ')}`); } catch (error) { this.error(`dataProducer.getStats() failed: ${error}`); } break; } case 'sdc': case 'statsDataConsumer': { const id = params[0] || Array.from(dataConsumers.keys()).pop(); const dataConsumer = dataConsumers.get(id); if (!dataConsumer) { this.error('DataConsumer not found'); break; } try { const stats = await dataConsumer.getStats(); this.log(`dataConsumer.getStats():\n${JSON.stringify(stats, null, ' ')}`); } catch (error) { this.error(`dataConsumer.getStats() failed: ${error}`); } break; } case 't': case 'terminal': { this._isTerminalOpen = true; cmd.close(); this.openTerminal(); return; } default: { this.error(`unknown command '${command}'`); this.log('press \'h\' or \'help\' to get the list of available commands'); } } readStdin(); }); }; readStdin(); } openTerminal() { this.log('\n[opening Node REPL Terminal...]'); this.log('here you have access to workers, routers, transports, producers, consumers, dataProducers and dataConsumers ES6 maps'); const terminal = repl.start( { input : this._socket, output : this._socket, terminal : true, prompt : 'terminal> ', useColors : true, useGlobal : true, ignoreUndefined : false }); this._isTerminalOpen = true; terminal.on('exit', () => { this.log('\n[exiting Node REPL Terminal...]'); this._isTerminalOpen = false; this.openCommandConsole(); }); } log(msg) { try { this._socket.write(`${colors.green(msg)}\n`); } - catch (error) - {} + catch (error) { + //Do nothing + } } error(msg) { try { this._socket.write(`${colors.red.bold('ERROR: ')}${colors.red(msg)}\n`); } - catch (error) - {} + catch (error) { + //Do nothing + } } } function runMediasoupObserver() { mediasoup.observer.on('newworker', (worker) => { // Store the latest worker in a global variable. global.worker = worker; workers.set(worker.pid, worker); worker.observer.on('close', () => workers.delete(worker.pid)); worker.observer.on('newrouter', (router) => { // Store the latest router in a global variable. global.router = router; routers.set(router.id, router); router.observer.on('close', () => routers.delete(router.id)); router.observer.on('newtransport', (transport) => { // Store the latest transport in a global variable. global.transport = transport; transports.set(transport.id, transport); transport.observer.on('close', () => transports.delete(transport.id)); transport.observer.on('newproducer', (producer) => { // Store the latest producer in a global variable. global.producer = producer; producers.set(producer.id, producer); producer.observer.on('close', () => producers.delete(producer.id)); }); transport.observer.on('newconsumer', (consumer) => { // Store the latest consumer in a global variable. global.consumer = consumer; consumers.set(consumer.id, consumer); consumer.observer.on('close', () => consumers.delete(consumer.id)); }); transport.observer.on('newdataproducer', (dataProducer) => { // Store the latest dataProducer in a global variable. global.dataProducer = dataProducer; dataProducers.set(dataProducer.id, dataProducer); dataProducer.observer.on('close', () => dataProducers.delete(dataProducer.id)); }); transport.observer.on('newdataconsumer', (dataConsumer) => { // Store the latest dataConsumer in a global variable. global.dataConsumer = dataConsumer; dataConsumers.set(dataConsumer.id, dataConsumer); dataConsumer.observer.on('close', () => dataConsumers.delete(dataConsumer.id)); }); }); }); }); } module.exports = async function(rooms, peers) { try { // Run the mediasoup observer API. runMediasoupObserver(); // Make maps global so they can be used during the REPL terminal. global.rooms = rooms; global.peers = peers; global.workers = workers; global.routers = routers; global.transports = transports; global.producers = producers; global.consumers = consumers; global.dataProducers = dataProducers; global.dataConsumers = dataConsumers; const server = net.createServer((socket) => { const interactive = new Interactive(socket); interactive.openCommandConsole(); }); await new Promise((resolve) => { try { fs.unlinkSync(SOCKET_PATH); } - catch (error) {} + catch (error) { + //Do nothing + } server.listen(SOCKET_PATH, resolve); }); } - catch (error) - {} + catch (error) { + //Do nothing + } }; diff --git a/meet/server/server.js b/meet/server/server.js index 1f440e47..2dc7e7f1 100755 --- a/meet/server/server.js +++ b/meet/server/server.js @@ -1,433 +1,431 @@ #!/usr/bin/env node process.title = 'edumeet-server'; -const bcrypt = require('bcrypt'); const config = require('./config/config'); const fs = require('fs'); const http = require('http'); const spdy = require('spdy'); const express = require('express'); const bodyParser = require('body-parser'); const cookieParser = require('cookie-parser'); const compression = require('compression'); const mediasoup = require('mediasoup'); const AwaitQueue = require('awaitqueue'); const Logger = require('./lib/Logger'); const Room = require('./lib/Room'); const Peer = require('./lib/Peer'); -const base64 = require('base-64'); const helmet = require('helmet'); // auth const redis = require('redis'); const redisClient = redis.createClient(config.redisOptions); const expressSession = require('express-session'); const RedisStore = require('connect-redis')(expressSession); const sharedSession = require('express-socket.io-session'); const interactiveServer = require('./lib/interactiveServer'); const promExporter = require('./lib/promExporter'); const { v4: uuidv4 } = require('uuid'); /* eslint-disable no-console */ console.log('- process.env.DEBUG:', process.env.DEBUG); console.log('- config.mediasoup.worker.logLevel:', config.mediasoup.worker.logLevel); console.log('- config.mediasoup.worker.logTags:', config.mediasoup.worker.logTags); /* eslint-enable no-console */ const logger = new Logger(); const queue = new AwaitQueue(); let statusLogger = null; if ('StatusLogger' in config) statusLogger = new config.StatusLogger(); // mediasoup Workers. // @type {Array} const mediasoupWorkers = []; // Map of Room instances indexed by roomId. const rooms = new Map(); // Map of Peer instances indexed by peerId. const peers = new Map(); // TLS server configuration. const tls = { cert : fs.readFileSync(config.tls.cert), key : fs.readFileSync(config.tls.key), secureOptions : 'tlsv12', ciphers : [ 'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-CHACHA20-POLY1305', 'ECDHE-RSA-CHACHA20-POLY1305', 'DHE-RSA-AES128-GCM-SHA256', 'DHE-RSA-AES256-GCM-SHA384' ].join(':'), honorCipherOrder : true }; const app = express(); app.use(helmet.hsts()); const sharedCookieParser=cookieParser(); app.use(sharedCookieParser); app.use(bodyParser.json({ limit: '5mb' })); app.use(bodyParser.urlencoded({ limit: '5mb', extended: true })); const session = expressSession({ secret : config.cookieSecret, name : config.cookieName, resave : true, saveUninitialized : true, store : new RedisStore({ client: redisClient }), cookie : { secure : true, httpOnly : true, maxAge : 60 * 60 * 1000 // Expire after 1 hour since last request from user } }); if (config.trustProxy) { app.set('trust proxy', config.trustProxy); } app.use(session); let mainListener; let io; async function run() { try { // Open the interactive server. await interactiveServer(rooms, peers); // start Prometheus exporter if (config.prometheus) { await promExporter(rooms, peers, config.prometheus); } // Run a mediasoup Worker. await runMediasoupWorkers(); // Run HTTPS server. await runHttpsServer(); // Run WebSocketServer. await runWebSocketServer(); // eslint-disable-next-line no-unused-vars const errorHandler = (err, req, res, next) => { const trackingId = uuidv4(); res.status(500).send( `

Internal Server Error

If you report this error, please also report this tracking ID which makes it possible to locate your session in the logs which are available to the system administrator: ${trackingId}

` ); logger.error( 'Express error handler dump with tracking ID: %s, error dump: %o', trackingId, err); }; // eslint-disable-next-line no-unused-vars app.use(errorHandler); } catch (error) { logger.error('run() [error:"%o"]', error); } } function statusLog() { if (statusLogger) { statusLogger.log({ rooms : rooms, peers : peers }); } } async function runHttpsServer() { app.use(compression()); - app.get(`${config.pathPrefix}/api/ping`, function (req, res, next) { + app.get(`${config.pathPrefix}/api/ping`, function (req, res, /*next*/) { res.send('PONG') }) - app.get(`${config.pathPrefix}/api/sessions`, function (req, res, next) { + app.get(`${config.pathPrefix}/api/sessions`, function (req, res, /*next*/) { //TODO json.stringify res.json({ id : "testId" }) }) //Check if the room exists - app.get(`${config.pathPrefix}/api/sessions/:session_id`, function (req, res, next) { + app.get(`${config.pathPrefix}/api/sessions/:session_id`, function (req, res, /*next*/) { console.warn("Checking for room") let room = rooms.get(req.params.session_id); if (!room) { console.warn("doesn't exist") res.status(404).send() } else { console.warn("exist") res.status(200).send() } }) // Create room and return id - app.post(`${config.pathPrefix}/api/sessions`, async function (req, res, next) { + app.post(`${config.pathPrefix}/api/sessions`, async function (req, res, /*next*/) { console.warn("Creating new room", req.body.mediaMode, req.body.recordingMode) //FIXME we're truncating because of kolab4 database layout (should be fixed instead) const roomId = uuidv4().substring(0, 16) - const room = await getOrCreateRoom({ roomId }); + await getOrCreateRoom({ roomId }); res.json({ id : roomId }) }) - app.post(`${config.pathPrefix}/api/signal`, async function (req, res, next) { + app.post(`${config.pathPrefix}/api/signal`, async function (req, res, /*next*/) { let data = req.body; const roomId = data.session; - const signalType = data.type; - const payload = data.data; + // const signalType = data.type; + // const payload = data.data; const peers = data.to; if (peers) { for (const peerId of peers) { let peer = peers.get(peerId); peer.socket.emit( 'signal', data ); } } else { io.to(roomId).emit( 'signal', data ); } res.json({}) }); // Create connection in room (just wait for websocket instead? // $post = [ // 'json' => [ // 'role' => self::OV_ROLE_PUBLISHER, // 'data' => json_encode(['role' => $role]) // ] // ]; - app.post(`${config.pathPrefix}/api/sessions/:session_id/connection`, function (req, res, next) { + app.post(`${config.pathPrefix}/api/sessions/:session_id/connection`, function (req, res, /*next*/) { console.warn("Creating connection in session", req.params.session_id) - roomId = req.params.session_id + let roomId = req.params.session_id let data = req.body; //FIXME we're truncating because of kolab4 database layout (should be fixed instnead) const peerId = uuidv4().substring(0, 16) //TODO create room already? - peer = new Peer({ id: peerId, roomId }); + let peer = new Peer({ id: peerId, roomId }); peers.set(peerId, peer); peer.on('close', () => { peers.delete(peerId); statusLog(); }); peer.nickname = "Display Name"; // peer.picture = picture; peer.email = "email@test.com"; if ('role' in data) peer.setRole(data.role); const proto = config.publicDomain.includes('localhost') || config.publicDomain.includes('127.0.0.1') ? 'ws' : 'wss'; res.json({ id: peerId, // When the below get's passed to the socket.io client we end up with something like (depending on the socket.io path) // wss://${publicDomain}/meetmedia/signaling/?peerId=peer1&roomId=room1&EIO=3&transport=websocket, token: `${proto}://${config.publicDomain}/?peerId=${peerId}&roomId=${roomId}` }) }) if (config.httpOnly === true) { // http mainListener = http.createServer(app); } else { // https mainListener = spdy.createServer(tls, app); // http const redirectListener = http.createServer(app); if (config.listeningHost) redirectListener.listen(config.listeningRedirectPort, config.listeningHost); else redirectListener.listen(config.listeningRedirectPort); } console.info(`Listening on ${config.listeningPort} ${config.listeningHost}`) // https or http if (config.listeningHost) mainListener.listen(config.listeningPort, config.listeningHost); else mainListener.listen(config.listeningPort); } /** * Create a WebSocketServer to allow WebSocket connections from browsers. */ async function runWebSocketServer() { io = require('socket.io')(mainListener, { path: `${config.pathPrefix}/signaling`, cookie: false }); io.use( sharedSession(session, sharedCookieParser, { autoSave: true }) ); // Handle connections from clients. io.on('connection', (socket) => { logger.info("websocket connection") const { roomId, peerId } = socket.handshake.query; if (!roomId || !peerId) { logger.warn('connection request without roomId and/or peerId'); socket.disconnect(true); return; } logger.info( 'connection request [roomId:"%s", peerId:"%s"]', roomId, peerId); queue.push(async () => { - const { token } = socket.handshake.session; + // const { token } = socket.handshake.session; const room = await getOrCreateRoom({ roomId }); let peer = peers.get(peerId); if (!peer) { logger.warn("Peer does not exist %s", peerId); socket.disconnect(true); return; } let returning = false; peer.socket = socket; //FIXME figure out to which extent we need to handle returning users // Returning user, remove if old peer exists // TODO maintain metadata? // if (peer) { // peer.close(); // returning = true; // } room.handlePeer({ peer, returning }); statusLog(); }) .catch((error) => { logger.error('room creation or room joining failed [error:"%o"]', error); if (socket) socket.disconnect(true); return; }); }); } /** * Launch as many mediasoup Workers as given in the configuration file. */ async function runMediasoupWorkers() { const { numWorkers } = config.mediasoup; logger.info('running %d mediasoup Workers...', numWorkers); for (let i = 0; i < numWorkers; ++i) { const worker = await mediasoup.createWorker( { logLevel : config.mediasoup.worker.logLevel, logTags : config.mediasoup.worker.logTags, rtcMinPort : config.mediasoup.worker.rtcMinPort, rtcMaxPort : config.mediasoup.worker.rtcMaxPort }); worker.on('died', () => { logger.error( 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); mediasoupWorkers.push(worker); } } /** * Get a Room instance (or create one if it does not exist). */ async function getOrCreateRoom({ roomId }) { let room = rooms.get(roomId); // If the Room does not exist create a new one. if (!room) { logger.info('creating a new Room [roomId:"%s"]', roomId); room = await Room.create({ mediasoupWorkers, roomId, peers }); rooms.set(roomId, room); statusLog(); room.on('close', () => { rooms.delete(roomId); statusLog(); }); } return room; } run(); diff --git a/meet/server/test.js b/meet/server/test.js index 719ed38c..56543a16 100755 --- a/meet/server/test.js +++ b/meet/server/test.js @@ -1,82 +1,82 @@ #!/usr/bin/env node const io = require("socket.io-client"); const axios = require('axios') const roomId = "room1"; axios .post('http://127.0.0.1:12443/api/sessions/${roomId}/connection', { // todo: 'Buy the milk' }) .then(res => { console.log(`statusCode: ${res.status}`) // console.log(res) // let data = res.data; console.log(data) const peerId = data['id']; const _signalingUrl = `ws://127.0.0.1:12443/?peerId=${peerId}&roomId=${roomId}`; console.warn(`${_signalingUrl}`); - _signalingSocket = io(_signalingUrl, { transports: ["websocket"], rejectUnauthorized: false }); + let _signalingSocket = io(_signalingUrl, { transports: ["websocket"], rejectUnauthorized: false }); _signalingSocket.on('connect', () => { console.debug('signaling Peer "connect" event'); _signalingSocket.emit("hello", { a: "b", c: [] }); axios .post('http://127.0.0.1:12443/api/signal', { session: roomId, type: "sometype", data: { }, //optional // 'to' => [$connections] }) .then(res => { console.log(`statusCode: ${res.status}`) }); }); _signalingSocket.on('disconnect', (reason) => { console.warn('signaling Peer "disconnect" event [reason:"%s"]', reason); }); _signalingSocket.on('signal', (reason) => { console.warn('Received signal "%s"', reason); }); _signalingSocket.on("error", (error) => { console.warn('error %s', error); }); _signalingSocket.on("reconnect_failed", () => { console.warn('reconnect failed'); }); //_signalingSocket.connect(); console.warn('done'); }) .catch(error => { console.error(error) }) //const delay = ms => new Promise(resolve => setTimeout(resolve, ms)) //await delay(1000) /// waiting 1 second. //console.warn('done done');