diff --git a/meet/server/lib/Peer.js b/meet/server/lib/Peer.js index 7eb7ae6c..e35e2fa1 100644 --- a/meet/server/lib/Peer.js +++ b/meet/server/lib/Peer.js @@ -1,241 +1,246 @@ const EventEmitter = require('events').EventEmitter; const Logger = require('./Logger'); +const crypto = require('crypto'); const Roles = require('./userRoles'); +const { v4: uuidv4 } = require('uuid'); const logger = new Logger('Peer'); class Peer extends EventEmitter { - constructor({ id, roomId }) { - logger.info('constructor() [id:"%s"]', id); + constructor({ roomId }) { + logger.info('Peer constructor()'); + super(); - this._id = id; + this._id = uuidv4(); this._roomId = roomId; this._socket = null; this._closed = false; this._role = 0; this._nickname = false; this._language = null; this._routerId = null; this._rtpCapabilities = null; this._raisedHand = false; this._transports = new Map(); this._producers = new Map(); this._consumers = new Map(); + + this._authToken = crypto.randomBytes(16).toString('hex'); } close() { logger.info('close()'); this._closed = true; // Iterate and close all mediasoup Transport associated to this Peer, so all // its Producers and Consumers will also be closed. for (const transport of this.transports.values()) { transport.close(); } if (this.socket) this.socket.disconnect(true); this.emit('close'); } - get id() { - return this._id; + get authToken() { + return this._authToken; } - set id(id) { - this._id = id; + get id() { + return this._id; } get roomId() { return this._roomId; } set roomId(roomId) { this._roomId = roomId; } get socket() { return this._socket; } set socket(socket) { this._socket = socket; if (this.socket) { this.socket.on('disconnect', () => { if (this.closed) return; logger.debug('"disconnect" event [id:%s]', this.id); this.close(); }); } } get closed() { return this._closed; } get role() { return this._role; } get nickname() { return this._nickname; } get language() { return this._language; } set nickname(nickname) { if (nickname !== this._nickname) { this._nickname = nickname; this.emit('nicknameChanged'); } } set language(language) { if (language != this._language) { this._language = language; this.emit('languageChanged'); } } get routerId() { return this._routerId; } set routerId(routerId) { this._routerId = routerId; } get rtpCapabilities() { return this._rtpCapabilities; } set rtpCapabilities(rtpCapabilities) { this._rtpCapabilities = rtpCapabilities; } get raisedHand() { return this._raisedHand; } set raisedHand(raisedHand) { if (this._raisedHand != raisedHand) { this._raisedHand = raisedHand; this.emit('raisedHandChanged'); } } get transports() { return this._transports; } get producers() { return this._producers; } get consumers() { return this._consumers; } setRole(newRole) { if (this._role != newRole) { this._role = newRole; this.emit('roleChanged'); } } isValidRole(newRole) { Object.keys(Roles).forEach(roleId => { const role = Roles[roleId] if (newRole & role) { newRole = newRole ^ role; } }) return newRole == 0; } hasRole(role) { return !!(this._role & role); } addTransport(id, transport) { this.transports.set(id, transport); } getTransport(id) { return this.transports.get(id); } getConsumerTransport() { return Array.from(this.transports.values()) .find((t) => t.appData.consuming); } removeTransport(id) { this.transports.delete(id); } addProducer(id, producer) { this.producers.set(id, producer); } getProducer(id) { return this.producers.get(id); } removeProducer(id) { this.producers.delete(id); } addConsumer(id, consumer) { this.consumers.set(id, consumer); } getConsumer(id) { return this.consumers.get(id); } removeConsumer(id) { this.consumers.delete(id); } get peerInfo() { const peerInfo = { id: this.id, language: this.language, nickname: this.nickname, role: this.role, raisedHand: this.raisedHand }; return peerInfo; } } module.exports = Peer; diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index 65e9e18e..83f290a6 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1253 +1,1256 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); const crypto = require('crypto'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const Roles = require('./userRoles'); +const { v4: uuidv4 } = require('uuid'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter { static calculateLoads(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); // Calculate router loads by adding up peers per router, and collected piped routers Object.values(peers).forEach(peer => { const routerId = peer.routerId; if (routerId) { if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); } else { routerLoads.set(routerId, 1); } } }); // Calculate worker loads by adding up router loads per worker for (const worker of mediasoupWorkers) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } return {routerLoads, workerLoads, pipedRoutersIds}; } /* * Find a router that is on a worker that is least loaded. * * A worker with a router that we are already piping to is preferred. */ static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const {workerLoads, pipedRoutersIds} = Room.calculateLoads(mediasoupWorkers, peers, mediasoupRouters); const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } else { // find if there is a piped router that is on a worker that is below limit for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. - * @param {String} roomId - Id of the Room instance. + * @param {axios} webhook - An axios instance for webhook (http) requests */ - static async create({ mediasoupWorkers, roomId, peers, webhook }) { + static async create({ mediasoupWorkers, peers, webhook }) { + const roomId = uuidv4().substring(0, 16); // TODO: Use full uuid + logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers, webhook }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers, webhook }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); this._peers = peers; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; this._audioLevelObserver = audioLevelObserver; this._webhook = webhook; } dumpStats() { const peers = this.getPeers(); const {routerLoads, workerLoads, pipedRoutersIds} = Room.calculateLoads(this._mediasoupWorkers, peers, this._mediasoupRouters); let stats = { numberOfWorkers: this._mediasoupWorkers.length, numberOfRouters: this._mediasoupRouters.size, numberOfPeers: peers.length, routerLoads: routerLoads, workerLoads: workerLoads, pipedRoutersIds: pipedRoutersIds, }; console.log(stats); } close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; // Close the peers. Object.values(this._peers).forEach(peer => { if (!peer.closed) peer.close(); }); this._peers = {}; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._mediasoupRouters.clear(); this._mediasoupWorkers = null; this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } handlePeer({ peer }) { logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role); // Should not happen if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } this._peerJoining(peer); } logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } get id() { return this._roomId; } selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } checkEmpty() { return Object.keys(this._peers).length === 0; } _getTURNCredentials(name, secret) { const unixTimeStamp = parseInt(Date.now()/1000) + 24*3600; // this credential would be valid for the next 24 hours // If there is no name, the timestamp alone can also be used. const username = name ? `${unixTimeStamp}:${name}` : `${unixTimeStamp}`; const hmac = crypto.createHmac('sha1', secret); hmac.setEncoding('base64'); hmac.write(username); hmac.end(); const password = hmac.read(); return { username, password }; } _peerJoining(peer) { this._queue.push(async () => { peer.socket.join(this._roomId); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); let iceServers; if (config.turn) { // Generate time-limited credentials. The name is only relevant for the logs. const {username, password} = this._getTURNCredentials(peer.id, config.turn.staticSecret); iceServers = [ { urls : config.turn.urls, username : username, credential : password } ]; } this._notification(peer.socket, 'roomReady', { iceServers }); }) .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); peer.on('close', () => { this._handlePeerClose(peer); }); peer.on('nicknameChanged', () => { // Spread to others (and self) const data = { peerId: peer.id, nickname: peer.nickname }; this._notification(peer.socket, 'changeNickname', data, true, true); }); peer.on('languageChanged', () => { // Spread to others (and self) const data = { peerId: peer.id, language: peer.language }; this._notification(peer.socket, 'changeLanguage', data, true, true); }); peer.on('roleChanged', () => { // Spread to others (and self) const data = { peerId: peer.id, role: peer.role }; this._notification(peer.socket, 'changeRole', data, true, true); }); peer.on('raisedHandChanged', () => { // Spread to others (and self) const data = { peerId: peer.id, raisedHand: peer.raisedHand }; this._notification(peer.socket, 'changeRaisedHand', data, true, true); }); peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); switch (request.method) { case 'getRouterRtpCapabilities': { cb(null, router.rtpCapabilities); break; } case 'dumpStats': { this.dumpStats() cb(null); break; } case 'join': { const { nickname, rtpCapabilities } = request.data; // Store client data into the Peer data object. peer.nickname = nickname; peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. const otherPeers = this.getPeers(peer); const peerInfos = otherPeers.map(otherPeer => otherPeer.peerInfo); cb(null, { peers: peerInfos, ...peer.peerInfo }); // Create Consumers for existing Producers. for (const otherPeer of otherPeers) { for (const producer of otherPeer.producers.values()) { this._createConsumer({ consumerPeer: peer, producerPeer: otherPeer, producer }); } } // Notify the new Peer to all other Peers. this._notification(peer.socket, 'newPeer', peer.peerInfo, true); logger.debug( 'peer joined [peer: "%s", nickname: "%s"]', peer.id, nickname); break; } case 'createPlainTransport': { const { producing, consuming } = request.data; const transport = await router.createPlainTransport( { //When consuming we manually connect using connectPlainTransport, //otherwise we let the port autodetection work. comedia: producing, // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) rtcpMux: false, listenIp: { ip: "127.0.0.1", announcedIp: null }, appData : { producing, consuming } } ); // await transport.enableTraceEvent([ "probation", "bwe" ]); // transport.on("trace", (trace) => { // console.log(trace); // }); peer.addTransport(transport.id, transport); cb( null, { id : transport.id, ip : transport.tuple.localIp, port : transport.tuple.localPort, rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined }); break; } case 'connectPlainTransport': { const { transportId, ip, port, rtcpPort } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ ip: ip, port: port, rtcpPort: rtcpPort, }); cb(); break; } case 'createWebRtcTransport': { // NOTE: Don't require that the Peer is joined here, so the client can // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; webRtcTransportOptions.enableTcp = true; if (forceTcp) webRtcTransportOptions.enableUdp = false; else { webRtcTransportOptions.enableUdp = true; webRtcTransportOptions.preferUdp = true; } const transport = await router.createWebRtcTransport( webRtcTransportOptions ); transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); } }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters }); const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } catch (error) { logger.info("Setting the incoming bitrate failed") } } break; } case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ dtlsParameters }); cb(); break; } /* case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } */ case 'produce': { let { appData } = request.data; if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) throw new Error('invalid producer source'); if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); const { transportId, kind, rtpParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); // Add peerId into appData to later get the associated Peer during // the 'loudest' event of the audioLevelObserver. appData = { ...appData, peerId: peer.id }; const producer = await transport.produce({ kind, rtpParameters, appData }); const pipeRouters = this._getRoutersToPipeTo(peer.routerId); for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, router : destinationRouter }); } } // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); producer.on('videoorientationchange', (videoOrientation) => { logger.debug( 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); // Trace individual packets for debugging // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); // producer.on("trace", (trace) => { // console.log(`Trace on ${producer.id}`, trace); // }); cb(null, { id: producer.id }); // Optimization: Create a server-side Consumer for each Peer. for (const otherPeer of this.getPeers(peer)) { this._createConsumer({ consumerPeer: otherPeer, producerPeer: peer, producer }); } // Add into the audioLevelObserver. if (kind === 'audio') { this._audioLevelObserver.addProducer({ producerId: producer.id }) .catch(() => {}); } break; } case 'closeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); producer.close(); // Remove from its map. peer.removeProducer(producer.id); cb(); break; } case 'pauseProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.pause(); cb(); break; } case 'resumeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.resume(); cb(); break; } case 'pauseConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.pause(); cb(); break; } case 'resumeConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.resume(); cb(); break; } case 'changeNickname': { const { nickname } = request.data; peer.nickname = nickname; // This will be spread through events from the peer object // Return no error cb(); break; } case 'chatMessage': { const { message } = request.data; // Spread to others this._notification(peer.socket, 'chatMessage', { peerId: peer.id, nickname: peer.nickname, message: message }, true, true); // Return no error cb(); break; } case 'moderator:addRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (!rolePeer.hasRole(role)) { // The 'owner' role is not assignable if (role & Roles.OWNER) throw new Error('the OWNER role is not assignable'); // Promotion to publisher? Put the user hand down if (role & Roles.PUBLISHER && rolePeer.raisedHand) rolePeer.raisedHand = false; // This will propagate the event automatically rolePeer.setRole(rolePeer.role | role); } // Return no error cb(); break; } case 'moderator:removeRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (rolePeer.hasRole(role)) { if (role & Roles.OWNER) throw new Error('the OWNER role is not removable'); if (role & Roles.MODERATOR && rolePeer.role & Roles.OWNER) throw new Error('the MODERATOR role cannot be removed from the OWNER'); // Non-publisher cannot be a language interpreter if (role & Roles.PUBLISHER) rolePeer.language = null; // This will propagate the event automatically rolePeer.setRole(rolePeer.role ^ role); } // Return no error cb(); break; } case 'moderator:changeLanguage': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, language } = request.data; if (language && !/^[a-z]{2}$/.test(language)) throw new Error('invalid language code'); const langPeer = this._peers[peerId]; if (!langPeer) throw new Error(`peer with id "${peerId}" not found`); langPeer.language = language; // This will be spread through events from the peer object // Return no error cb(); break; } case 'moderator:joinRequestAccept': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { requestId } = request.data; // Return no error cb(); if (this._webhook) { this._webhook.post('', { requestId, roomId: this._roomId, event: 'joinRequestAccepted' }) .then(function (response) { logger.info(`Accepted join request ${requestId}. Webhook succeeded.`); }) .catch(function (error) { logger.error(error); }); } break; } case 'moderator:joinRequestDeny': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { requestId } = request.data; // Return no error cb(); if (this._webhook) { this._webhook.post('', { requestId, roomId: this._roomId, event: 'joinRequestDenied' }) .then(function (response) { logger.info(`Denied join request ${requestId}. Webhook succeeded.`); }) .catch(function (error) { logger.error(error); }); } break; } case 'moderator:closeRoom': { if (!peer.hasRole(Roles.OWNER)) throw new Error('peer not authorized'); this._notification(peer.socket, 'moderator:closeRoom', null, true); cb(); // Close the room this.close(); // TODO: remove the room? break; } case 'moderator:kickPeer': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const kickPeer = this._peers[peerId]; if (!kickPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(kickPeer.socket, 'moderator:kickPeer'); kickPeer.close(); cb(); break; } case 'raisedHand': { const { raisedHand } = request.data; peer.raisedHand = raisedHand; // This will be spread through events from the peer object // Return no error cb(); break; } default: { logger.error('unknown request.method "%s"', request.method); cb(500, `unknown request.method "${request.method}"`); } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Trace individual packets for debugging // await consumer.enableTraceEvent([ "rtp", "pli", "fir" ]); // consumer.on("trace", (trace) => { // console.log(`Trace on ${consumer.id}`, trace); // }); // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); // TODO: We don't have to send websocket signals on producerpause/producerresume // The same can be achieved on the client-side using consumer.observer.on('pause') // and consumer.observer.on('resume') consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } /** * Get the list of peers. */ getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } _timeoutCallback(callback) { let called = false; const interval = setTimeout( () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } _sendRequest(socket, method, data = {}) { return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, this._timeoutCallback((err, response) => { if (err) { reject(err); } else { resolve(response); } }) ); }); } async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; for (let tries = 0; tries < requestRetries; tries++) { try { return await this._sendRequest(socket, method, data); } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } _notification(socket, method, data = {}, broadcast = false, includeSender = false) { if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); } else { socket.emit('notification', { method, data }); } } /* * Pipe producers of peers that are running under another router to this router. */ async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); // All peers that have a different router const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); for (const producerId of peer.producers.keys()) { if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._peers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to: // The combined set of routers of all peers, exluding the router of the peer itself. _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/meet/server/server.js b/meet/server/server.js index 62c0a408..ae2493e6 100755 --- a/meet/server/server.js +++ b/meet/server/server.js @@ -1,417 +1,411 @@ #!/usr/bin/env node process.title = 'kolabmeet-server'; const config = require('./config/config'); const fs = require('fs'); const http = require('http'); const spdy = require('spdy'); const express = require('express'); const bodyParser = require('body-parser'); const cookieParser = require('cookie-parser'); const compression = require('compression'); const mediasoup = require('mediasoup'); const AwaitQueue = require('awaitqueue'); const Logger = require('./lib/Logger'); const Room = require('./lib/Room'); const Peer = require('./lib/Peer'); const helmet = require('helmet'); const axios = require('axios'); // auth const redis = require('redis'); const expressSession = require('express-session'); const RedisStore = require('connect-redis')(expressSession); const sharedSession = require('express-socket.io-session'); const interactiveServer = require('./lib/interactiveServer'); const promExporter = require('./lib/promExporter'); const { v4: uuidv4 } = require('uuid'); /* eslint-disable no-console */ console.log('- process.env.DEBUG:', process.env.DEBUG); console.log('- config.mediasoup.worker.logLevel:', config.mediasoup.worker.logLevel); console.log('- config.mediasoup.worker.logTags:', config.mediasoup.worker.logTags); /* eslint-enable no-console */ const logger = new Logger(); const queue = new AwaitQueue(); let statusLogger = null; if ('StatusLogger' in config) statusLogger = new config.StatusLogger(); // mediasoup Workers. // @type {Array} const mediasoupWorkers = []; // Map of Room instances indexed by roomId. const rooms = new Map(); // Map of Peer instances indexed by peerId. const peers = new Map(); // TLS server configuration. -const tls = -{ - cert : fs.readFileSync(config.tls.cert), - key : fs.readFileSync(config.tls.key), - secureOptions : 'tlsv12', - ciphers : - [ - 'ECDHE-ECDSA-AES128-GCM-SHA256', - 'ECDHE-RSA-AES128-GCM-SHA256', - 'ECDHE-ECDSA-AES256-GCM-SHA384', - 'ECDHE-RSA-AES256-GCM-SHA384', - 'ECDHE-ECDSA-CHACHA20-POLY1305', - 'ECDHE-RSA-CHACHA20-POLY1305', - 'DHE-RSA-AES128-GCM-SHA256', - 'DHE-RSA-AES256-GCM-SHA384' - ].join(':'), - honorCipherOrder : true +const tls = { + cert: fs.readFileSync(config.tls.cert), + key: fs.readFileSync(config.tls.key), + secureOptions: 'tlsv12', + ciphers: [ + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'ECDHE-RSA-AES128-GCM-SHA256', + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'ECDHE-RSA-AES256-GCM-SHA384', + 'ECDHE-ECDSA-CHACHA20-POLY1305', + 'ECDHE-RSA-CHACHA20-POLY1305', + 'DHE-RSA-AES128-GCM-SHA256', + 'DHE-RSA-AES256-GCM-SHA384' + ].join(':'), + honorCipherOrder: true }; // HTTP client instance for webhook "pushes" let webhook = null; if (config.webhookURL) { webhook = axios.create({ baseURL: config.webhookURL, timeout: 5000 }); } const app = express(); app.use(helmet.hsts()); -const sharedCookieParser=cookieParser(); +const sharedCookieParser = cookieParser(); app.use(sharedCookieParser); app.use(bodyParser.json({ limit: '5mb' })); app.use(bodyParser.urlencoded({ limit: '5mb', extended: true })); const session = expressSession({ - secret : config.cookieSecret, - name : config.cookieName, - resave : true, - saveUninitialized : true, - store : config.redisOptions.host != 'none' ? new RedisStore({ client: redis.createClient(config.redisOptions) }) : null, - cookie : { - secure : true, - httpOnly : true, - maxAge : 60 * 60 * 1000 // Expire after 1 hour since last request from user + secret: config.cookieSecret, + name: config.cookieName, + resave: true, + saveUninitialized: true, + store: config.redisOptions.host != 'none' ? new RedisStore({ client: redis.createClient(config.redisOptions) }) : null, + cookie: { + secure: true, + httpOnly: true, + maxAge: 60 * 60 * 1000 // Expire after 1 hour since last request from user } }); if (config.trustProxy) { app.set('trust proxy', config.trustProxy); } app.use(session); let mainListener; let io; async function run() { try { // Open the interactive server. await interactiveServer(rooms, peers); // start Prometheus exporter if (config.prometheus) { await promExporter(rooms, peers, config.prometheus); } // Run a mediasoup Worker. await runMediasoupWorkers(); // Run HTTPS server. await runHttpsServer(); // Run WebSocketServer. await runWebSocketServer(); const errorHandler = (err, req, res /*, next */) => { const trackingId = uuidv4(); res.status(500).send( `

Internal Server Error

If you report this error, please also report this tracking ID which makes it possible to locate your session in the logs which are available to the system administrator: ${trackingId}

` ); logger.error( 'Express error handler dump with tracking ID: %s, error dump: %o', trackingId, err); }; app.use(errorHandler); } catch (error) { logger.error('run() [error:"%o"]', error); } app.emit('ready'); } function statusLog() { if (statusLogger) { - statusLogger.log({ - rooms : rooms, - peers : peers - }); + statusLogger.log({ rooms, peers }); } } async function runHttpsServer() { app.use(compression()); app.get(`${config.pathPrefix}/api/ping`, function (req, res /*, next*/) { - res.send('PONG') + res.send('PONG'); }) app.get(`${config.pathPrefix}/api/sessions`, function (req, res /*, next*/) { //TODO json.stringify res.json({ id : "testId" }) }) // Check if the room exists app.get(`${config.pathPrefix}/api/sessions/:session_id`, function (req, res /*, next*/) { - console.log("Checking for room") - let room = rooms.get(req.params.session_id); + console.log("Checking for room"); + + const room = rooms.get(req.params.session_id); if (!room) { - console.log("doesn't exist") - res.status(404).send() + console.log("doesn't exist"); + res.status(404).send(); } else { - console.log("exist") - res.status(200).send() + console.log("exist"); + res.status(200).send(); } }) // Create room and return id app.post(`${config.pathPrefix}/api/sessions`, async function (req, res /*, next*/) { console.log("Creating new room"); - //FIXME we're truncating because of kolab4 database layout (should be fixed instead) - const roomId = uuidv4().substring(0, 16) - await getOrCreateRoom({ roomId }); + + const room = await createRoom(); res.json({ - id : roomId + id : room.id }) }) + // Seend websocket notification signals to room participants app.post(`${config.pathPrefix}/api/signal`, async function (req, res /*, next*/) { - let data = req.body; + const data = req.body; const roomId = data.roomId; const emit = (socket) => { socket.emit('notification', { method: `signal:${data.type}`, data: data.data }) }; if ('role' in data) { peers.forEach(peer => { if (peer.socket && peer.roomId == roomId && peer.hasRole(data.role)) { emit(peer.socket); } }) } else { emit(io.to(roomId)); } - res.json({}) + res.json({}); }); // Create connection in room (just wait for websocket instead? // $post = [ // 'json' => [ // 'role' => self::OV_ROLE_PUBLISHER, // 'data' => json_encode(['role' => $role]) // ] // ]; app.post(`${config.pathPrefix}/api/sessions/:session_id/connection`, function (req, res /*, next*/) { logger.info('Creating peer connection [roomId:"%s"]', req.params.session_id); - let roomId = req.params.session_id - let data = req.body; + const roomId = req.params.session_id; + const room = rooms.get(roomId); + + if (!room) { + res.status(404).send(); + return; + } - //FIXME we're truncating because of kolab4 database layout (should be fixed instnead) - const peerId = uuidv4().substring(0, 16) - //TODO create room already? + const peer = new Peer({ roomId }); - let peer = new Peer({ id: peerId, roomId }); - peers.set(peerId, peer); + peers.set(peer.id, peer); peer.on('close', () => { - peers.delete(peerId); + peers.delete(peer.id); statusLog(); }); + const data = req.body; + if ('role' in data) peer.setRole(data.role); const proto = config.publicDomain.includes('localhost') || config.publicDomain.includes('127.0.0.1') ? 'ws' : 'wss'; res.json({ - id: peerId, + id: peer.id, // Note: socket.io client will end up using (hardcoded) /meetmedia/signaling path - token: `${proto}://${config.publicDomain}?peerId=${peerId}&roomId=${roomId}` + token: `${proto}://${config.publicDomain}?peerId=${peer.id}&roomId=${roomId}&authToken=${peer.authToken}` }); }) if (config.httpOnly === true) { // http mainListener = http.createServer(app); } else { // https mainListener = spdy.createServer(tls, app); // http const redirectListener = http.createServer(app); if (config.listeningHost) redirectListener.listen(config.listeningRedirectPort, config.listeningHost); else redirectListener.listen(config.listeningRedirectPort); } console.info(`Listening on ${config.listeningPort} ${config.listeningHost}`) // https or http if (config.listeningHost) mainListener.listen(config.listeningPort, config.listeningHost); else mainListener.listen(config.listeningPort); } /** * Create a WebSocketServer to allow WebSocket connections from browsers. */ async function runWebSocketServer() { io = require('socket.io')(mainListener, { path: `${config.pathPrefix}/signaling`, cookie: false }); io.use( sharedSession(session, sharedCookieParser, { autoSave: true }) ); // Handle connections from clients. io.on('connection', (socket) => { logger.info("websocket connection") - const { roomId, peerId } = socket.handshake.query; + const { roomId, peerId, authToken } = socket.handshake.query; - if (!roomId || !peerId) { + if (!roomId || !peerId || !authToken) { logger.warn('connection request without roomId and/or peerId'); socket.disconnect(true); return; } logger.info('connection request [roomId:"%s", peerId:"%s"]', roomId, peerId); queue.push(async () => { - const room = await getOrCreateRoom({ roomId }); + const room = rooms.get(roomId); - let peer = peers.get(peerId); + if (!room) { + logger.warn("Room does not exist %s", roomId); + socket.disconnect(true); + return; + } + + const peer = peers.get(peerId); - if (!peer) { + if (!peer || peer.roomId != roomId || peer.authToken != authToken) { logger.warn("Peer does not exist %s", peerId); socket.disconnect(true); return; } peer.socket = socket; room.handlePeer({ peer }); statusLog(); }) .catch((error) => { logger.error('room creation or room joining failed [error:"%o"]', error); if (socket) socket.disconnect(true); }); }); } /** * Launch as many mediasoup Workers as given in the configuration file. */ async function runMediasoupWorkers() { const { numWorkers } = config.mediasoup; logger.info('running %d mediasoup Workers...', numWorkers); for (let i = 0; i < numWorkers; ++i) { const worker = await mediasoup.createWorker( { logLevel : config.mediasoup.worker.logLevel, logTags : config.mediasoup.worker.logTags, rtcMinPort : config.mediasoup.worker.rtcMinPort, rtcMaxPort : config.mediasoup.worker.rtcMaxPort }); worker.on('died', () => { logger.error( 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); mediasoupWorkers.push(worker); } } /** * Get a Room instance (or create one if it does not exist). */ -async function getOrCreateRoom({ roomId }) { - let room = rooms.get(roomId); - - // If the Room does not exist create a new one. - if (!room) { - logger.info('creating a new Room [roomId:"%s"]', roomId); - - // Get existing peers in the room - const roomPeers = {} - peers.forEach(peer => { - if (!peer.closed && peer.roomId == roomId) - roomPeers[peer.id] = peer; - }); +async function createRoom() { + logger.info('creating a new Room'); - // Create the room - room = await Room.create({ mediasoupWorkers, roomId, peers: roomPeers, webhook }); + // Create the room + const room = await Room.create({ mediasoupWorkers, peers: {}, webhook }); - rooms.set(roomId, room); + room.on('close', () => { + logger.info('closing a Room [roomId:"%s"]', room.id); + rooms.delete(room.id); statusLog(); - room.on('close', () => { - logger.info('closing a Room [roomId:"%s"]', roomId); + if (webhook) { + webhook.post('', { roomId: room.id, event: 'roomClosed' }) + .then(function (response) { + logger.info(`Room ${room.id} closed. Webhook succeeded.`); + }) + .catch(function (error) { + logger.error(error); + }); + } + }); - rooms.delete(roomId); - statusLog(); + rooms.set(room.id, room); - if (webhook) { - webhook.post('', { roomId, event: 'roomClosed' }) - .then(function (response) { - logger.info(`Room ${roomId} closed. Webhook succeeded.`); - }) - .catch(function (error) { - logger.error(error); - }); - } - }); - } + statusLog(); return room; } run(); module.exports = app; // export for testing diff --git a/src/app/Http/Controllers/API/V4/OpenViduController.php b/src/app/Http/Controllers/API/V4/OpenViduController.php index fb33c121..5d8225d5 100644 --- a/src/app/Http/Controllers/API/V4/OpenViduController.php +++ b/src/app/Http/Controllers/API/V4/OpenViduController.php @@ -1,288 +1,288 @@ user(); $rooms = Room::where('user_id', $user->id)->orderBy('name')->get(); if (count($rooms) == 0) { // Create a room for the user (with a random and unique name) while (true) { $name = strtolower(\App\Utils::randStr(3, 3, '-')); if (!Room::where('name', $name)->count()) { break; } } $room = Room::create([ 'name' => $name, 'user_id' => $user->id ]); $rooms = collect([$room]); } $result = [ 'list' => $rooms, 'count' => count($rooms), ]; return response()->json($result); } /** * Join the room session. Each room has one owner, and the room isn't open until the owner * joins (and effectively creates the session). * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse */ public function joinRoom($id) { $room = Room::where('name', $id)->first(); // Room does not exist, or the owner is deleted if (!$room || !$room->owner) { return $this->errorResponse(404, \trans('meet.room-not-found')); } // Check if there's still a valid meet entitlement for the room owner if (!$room->owner->hasSku('meet')) { return $this->errorResponse(404, \trans('meet.room-not-found')); } $user = Auth::guard()->user(); $isOwner = $user && $user->id == $room->user_id; $init = !empty(request()->input('init')); // There's no existing session if (!$room->hasSession()) { // Participants can't join the room until the session is created by the owner if (!$isOwner) { return $this->errorResponse(422, \trans('meet.session-not-found'), ['code' => 323]); } // The room owner can create the session on request if (!$init) { return $this->errorResponse(422, \trans('meet.session-not-found'), ['code' => 324]); } $session = $room->createSession(); if (empty($session)) { return $this->errorResponse(500, \trans('meet.session-create-error')); } } $settings = $room->getSettings(['locked', 'nomedia', 'password']); $password = (string) $settings['password']; $config = [ 'locked' => $settings['locked'] === 'true', 'nomedia' => $settings['nomedia'] === 'true', 'password' => $isOwner ? $password : '', 'requires_password' => !$isOwner && strlen($password), ]; $response = ['config' => $config]; // Validate room password if (!$isOwner && strlen($password)) { $request_password = request()->input('password'); if ($request_password !== $password) { return $this->errorResponse(422, \trans('meet.session-password-error'), $response + ['code' => 325]); } } // Handle locked room if (!$isOwner && $config['locked']) { $nickname = request()->input('nickname'); $picture = request()->input('picture'); $requestId = request()->input('requestId'); $request = $requestId ? $room->requestGet($requestId) : null; $error = \trans('meet.session-room-locked-error'); // Request already has been processed (not accepted yet, but it could be denied) if (empty($request['status']) || $request['status'] != Room::REQUEST_ACCEPTED) { if (!$request) { if (empty($nickname) || empty($requestId) || !preg_match('/^[a-z0-9]{8,32}$/i', $requestId)) { return $this->errorResponse(422, $error, $response + ['code' => 326]); } if (empty($picture)) { $svg = file_get_contents(resource_path('images/user.svg')); $picture = 'data:image/svg+xml;base64,' . base64_encode($svg); } elseif (!preg_match('|^data:image/png;base64,[a-zA-Z0-9=+/]+$|', $picture)) { return $this->errorResponse(422, $error, $response + ['code' => 326]); } // TODO: Resize when big/make safe the user picture? $request = ['nickname' => $nickname, 'requestId' => $requestId, 'picture' => $picture]; if (!$room->requestSave($requestId, $request)) { // FIXME: should we use error code 500? return $this->errorResponse(422, $error, $response + ['code' => 326]); } // Send the request (signal) to all moderators $result = $room->signal('joinRequest', $request, Room::ROLE_MODERATOR); } return $this->errorResponse(422, $error, $response + ['code' => 327]); } } // Initialize connection tokens if ($init) { // Choose the connection role $canPublish = !empty(request()->input('canPublish')) && (empty($config['nomedia']) || $isOwner); $role = $canPublish ? Room::ROLE_PUBLISHER : Room::ROLE_SUBSCRIBER; if ($isOwner) { $role |= Room::ROLE_MODERATOR; $role |= Room::ROLE_OWNER; } // Create session token for the current user/connection $response = $room->getSessionToken($role); if (empty($response)) { return $this->errorResponse(500, \trans('meet.session-join-error')); } $response_code = 200; $response['role'] = $role; $response['config'] = $config; } else { $response_code = 422; $response['code'] = 322; } return response()->json($response, $response_code); } /** * Set the domain configuration. * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse|void */ public function setRoomConfig($id) { $room = Room::where('name', $id)->first(); // Room does not exist, or the owner is deleted if (!$room || !$room->owner) { return $this->errorResponse(404); } $user = Auth::guard()->user(); // Only room owner can configure the room if ($user->id != $room->user_id) { return $this->errorResponse(403); } $input = request()->input(); $errors = []; foreach ($input as $key => $value) { switch ($key) { case 'password': if ($value === null || $value === '') { $input[$key] = null; } else { // TODO: Do we have to validate the password in any way? } break; case 'locked': $input[$key] = $value ? 'true' : null; break; case 'nomedia': $input[$key] = $value ? 'true' : null; break; default: $errors[$key] = \trans('meet.room-unsupported-option-error'); } } if (!empty($errors)) { return response()->json(['status' => 'error', 'errors' => $errors], 422); } if (!empty($input)) { $room->setSettings($input); } return response()->json([ 'status' => 'success', 'message' => \trans('meet.room-setconfig-success'), ]); } /** * Webhook as triggered from OpenVidu server * * @param \Illuminate\Http\Request $request The API request. * * @return \Illuminate\Http\Response The response */ public function webhook(Request $request) { \Log::debug($request->getContent()); - $sessionId = $request->input('roomId'); + $sessionId = (string) $request->input('roomId'); $event = (string) $request->input('event'); switch ($event) { case 'roomClosed': // When all participants left the room the server will dispatch roomClosed // event. We'll remove the session reference from the database. $room = Room::where('session_id', $sessionId)->first(); if ($room) { $room->session_id = null; $room->save(); } break; case 'joinRequestAccepted': case 'joinRequestDenied': $room = Room::where('session_id', $sessionId)->first(); if ($room) { $method = $event == 'joinRequestAccepted' ? 'requestAccept' : 'requestDeny'; $room->{$method}($request->input('requestId')); } break; } return response('Success', 200); } } diff --git a/src/app/OpenVidu/Room.php b/src/app/OpenVidu/Room.php index 8e0b4d93..eff25def 100644 --- a/src/app/OpenVidu/Room.php +++ b/src/app/OpenVidu/Room.php @@ -1,281 +1,275 @@ false, // No exceptions from Guzzle 'base_uri' => \config('openvidu.api_url'), 'verify' => \config('openvidu.api_verify_tls'), 'auth' => [ \config('openvidu.api_username'), \config('openvidu.api_password') ], 'on_stats' => function (\GuzzleHttp\TransferStats $stats) { $threshold = \config('logging.slow_log'); if ($threshold && ($sec = $stats->getTransferTime()) > $threshold) { $url = $stats->getEffectiveUri(); $method = $stats->getRequest()->getMethod(); \Log::warning(sprintf("[STATS] %s %s: %.4f sec.", $method, $url, $sec)); } }, ] ); } return self::$client; } /** * Create a OpenVidu session * * @return array|null Session data on success, NULL otherwise */ public function createSession(): ?array { $params = [ 'json' => [ /* request params here */ ] ]; $response = $this->client()->request('POST', "sessions", $params); if ($response->getStatusCode() !== 200) { $this->session_id = null; $this->save(); return null; // TODO: Log an error/warning } $session = json_decode($response->getBody(), true); $this->session_id = $session['id']; $this->save(); return $session; } /** * Create a OpenVidu session (connection) token * * @param int $role User role (see self::ROLE_* constants) * * @return array|null Token data on success, NULL otherwise * @throws \Exception if session does not exist */ public function getSessionToken($role = self::ROLE_SUBSCRIBER): ?array { if (!$this->session_id) { throw new \Exception("The room session does not exist"); } $url = 'sessions/' . $this->session_id . '/connection'; $post = [ 'json' => [ 'role' => $role, ] ]; $response = $this->client()->request('POST', $url, $post); if ($response->getStatusCode() == 200) { $json = json_decode($response->getBody(), true); - // TODO: make use of the authentication token - - $authToken = base64_encode($json['id'] . ':' . \random_bytes(16)); - return [ - 'session' => $this->session_id, 'token' => $json['token'], - 'authToken' => $authToken, 'role' => $role, ]; } // TODO: Log an error/warning on non-200 response return null; } /** * Check if the room has an active session * * @return bool True when the session exists, False otherwise */ public function hasSession(): bool { if (!$this->session_id) { return false; } $response = $this->client()->request('GET', "sessions/{$this->session_id}"); return $response->getStatusCode() == 200; } /** * The room owner. * * @return \Illuminate\Database\Eloquent\Relations\BelongsTo */ public function owner() { return $this->belongsTo('\App\User', 'user_id', 'id'); } /** * Accept the join request. * * @param string $id Request identifier * * @return bool True on success, False on failure */ public function requestAccept(string $id): bool { $request = Cache::get($this->session_id . '-' . $id); if ($request) { $request['status'] = self::REQUEST_ACCEPTED; return Cache::put($this->session_id . '-' . $id, $request, now()->addHours(1)); } return false; } /** * Deny the join request. * * @param string $id Request identifier * * @return bool True on success, False on failure */ public function requestDeny(string $id): bool { $request = Cache::get($this->session_id . '-' . $id); if ($request) { $request['status'] = self::REQUEST_DENIED; return Cache::put($this->session_id . '-' . $id, $request, now()->addHours(1)); } return false; } /** * Get the join request data. * * @param string $id Request identifier * * @return array|null Request data (e.g. nickname, status, picture?) */ public function requestGet(string $id): ?array { return Cache::get($this->session_id . '-' . $id); } /** * Save the join request. * * @param string $id Request identifier * @param array $request Request data * * @return bool True on success, False on failure */ public function requestSave(string $id, array $request): bool { // We don't really need the picture in the cache // As we use this cache for the request status only unset($request['picture']); return Cache::put($this->session_id . '-' . $id, $request, now()->addHours(1)); } /** * Any (additional) properties of this room. * * @return \Illuminate\Database\Eloquent\Relations\HasMany */ public function settings() { return $this->hasMany('App\OpenVidu\RoomSetting', 'room_id'); } /** * Send a OpenVidu signal to the session participants (connections) * * @param string $name Signal name (type) * @param array $data Signal data array * @param int $target Limit targets by their participant role * * @return bool True on success, False on failure * @throws \Exception if session does not exist */ public function signal(string $name, array $data = [], $target = null): bool { if (!$this->session_id) { throw new \Exception("The room session does not exist"); } $post = [ 'roomId' => $this->session_id, 'type' => $name, 'role' => $target, 'data' => $data, ]; $response = $this->client()->request('POST', 'signal', ['json' => $post]); return $response->getStatusCode() == 200; } } diff --git a/src/resources/vue/Meet/Room.vue b/src/resources/vue/Meet/Room.vue index 5dc9cb7b..e5cc7fa4 100644 --- a/src/resources/vue/Meet/Room.vue +++ b/src/resources/vue/Meet/Room.vue @@ -1,639 +1,635 @@ diff --git a/src/tests/Feature/Controller/OpenViduTest.php b/src/tests/Feature/Controller/OpenViduTest.php index a52d2c56..1059f5e8 100644 --- a/src/tests/Feature/Controller/OpenViduTest.php +++ b/src/tests/Feature/Controller/OpenViduTest.php @@ -1,452 +1,446 @@ clearMeetEntitlements(); $room = Room::where('name', 'john')->first(); $room->setSettings(['password' => null, 'locked' => null, 'nomedia' => null]); } public function tearDown(): void { $this->clearMeetEntitlements(); $room = Room::where('name', 'john')->first(); $room->setSettings(['password' => null, 'locked' => null, 'nomedia' => null]); parent::tearDown(); } /** * Test listing user rooms * * @group openvidu */ public function testIndex(): void { $john = $this->getTestUser('john@kolab.org'); $jack = $this->getTestUser('jack@kolab.org'); Room::where('user_id', $jack->id)->delete(); // Unauth access not allowed $response = $this->get("api/v4/openvidu/rooms"); $response->assertStatus(401); // John has one room $response = $this->actingAs($john)->get("api/v4/openvidu/rooms"); $response->assertStatus(200); $json = $response->json(); $this->assertSame(1, $json['count']); $this->assertCount(1, $json['list']); $this->assertSame('john', $json['list'][0]['name']); // Jack has no room, but it will be auto-created $response = $this->actingAs($jack)->get("api/v4/openvidu/rooms"); $response->assertStatus(200); $json = $response->json(); $this->assertSame(1, $json['count']); $this->assertCount(1, $json['list']); $this->assertMatchesRegularExpression('/^[0-9a-z-]{11}$/', $json['list'][0]['name']); } /** * Test joining the room * * @group openvidu */ public function testJoinRoom(): void { $john = $this->getTestUser('john@kolab.org'); $jack = $this->getTestUser('jack@kolab.org'); $room = Room::where('name', 'john')->first(); $room->session_id = null; $room->save(); $this->assignMeetEntitlement($john); // Unauth access, no session yet $response = $this->post("api/v4/openvidu/rooms/{$room->name}"); $response->assertStatus(422); $json = $response->json(); $this->assertSame(323, $json['code']); // Non-existing room name $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/non-existing"); $response->assertStatus(404); // TODO: Test accessing an existing room of deleted owner // Non-owner, no session yet $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}"); $response->assertStatus(422); $json = $response->json(); $this->assertSame(323, $json['code']); // Room owner, no session yet $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}"); $response->assertStatus(422); $json = $response->json(); $this->assertSame(324, $json['code']); $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}", ['init' => 1]); $response->assertStatus(200); $json = $response->json(); $session_id = $room->fresh()->session_id; $this->assertSame(Room::ROLE_SUBSCRIBER | Room::ROLE_MODERATOR | Room::ROLE_OWNER, $json['role']); - $this->assertSame($session_id, $json['session']); - $this->assertTrue(is_string($session_id) && !empty($session_id)); $this->assertMatchesRegularExpression('|^wss?://|', $json['token']); + $this->assertMatchesRegularExpression('|&roomId=' . $session_id . '|', $json['token']); $john_token = $json['token']; // Non-owner, now the session exists, no 'init' argument $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}"); $response->assertStatus(422); $json = $response->json(); $this->assertSame(322, $json['code']); $this->assertTrue(empty($json['token'])); // Non-owner, now the session exists, with 'init', but no 'canPublish' argument $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", ['init' => 1]); $response->assertStatus(200); $json = $response->json(); $this->assertSame(Room::ROLE_SUBSCRIBER, $json['role']); - $this->assertSame($session_id, $json['session']); $this->assertMatchesRegularExpression('|^wss?://|', $json['token']); + $this->assertMatchesRegularExpression('|&roomId=' . $session_id . '|', $json['token']); $this->assertTrue($json['token'] != $john_token); // Non-owner, now the session exists, with 'init', and with 'role=PUBLISHER' $post = ['canPublish' => true, 'init' => 1]; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); $json = $response->json(); $this->assertSame(Room::ROLE_PUBLISHER, $json['role']); - $this->assertSame($session_id, $json['session']); $this->assertMatchesRegularExpression('|^wss?://|', $json['token']); + $this->assertMatchesRegularExpression('|&roomId=' . $session_id . '|', $json['token']); $this->assertTrue($json['token'] != $john_token); $this->assertEmpty($json['config']['password']); $this->assertEmpty($json['config']['requires_password']); // Non-owner, password protected room, password not provided $room->setSettings(['password' => 'pass']); $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}"); $response->assertStatus(422); $json = $response->json(); $this->assertCount(4, $json); $this->assertSame(325, $json['code']); $this->assertSame('error', $json['status']); $this->assertSame('Failed to join the session. Invalid password.', $json['message']); $this->assertEmpty($json['config']['password']); $this->assertTrue($json['config']['requires_password']); // Non-owner, password protected room, invalid provided $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", ['password' => 'aa']); $response->assertStatus(422); $json = $response->json(); $this->assertSame(325, $json['code']); // Non-owner, password protected room, valid password provided // TODO: Test without init=1 $post = ['password' => 'pass', 'init' => 'init']; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); - $json = $response->json(); - - $this->assertSame($session_id, $json['session']); - // Make sure the room owner can access the password protected room w/o password // TODO: Test without init=1 $post = ['init' => 'init']; $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); // Test 'nomedia' room option $room->setSettings(['nomedia' => 'true', 'password' => null]); $post = ['init' => 'init', 'canPublish' => true]; $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); $json = $response->json(); $this->assertSame(Room::ROLE_PUBLISHER & $json['role'], Room::ROLE_PUBLISHER); $post = ['init' => 'init', 'canPublish' => true]; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); $json = $response->json(); $this->assertSame(Room::ROLE_PUBLISHER & $json['role'], 0); } /** * Test locked room and join requests * * @group openvidu */ public function testJoinRequests(): void { $john = $this->getTestUser('john@kolab.org'); $jack = $this->getTestUser('jack@kolab.org'); $room = Room::where('name', 'john')->first(); $room->session_id = null; $room->save(); $room->setSettings(['password' => null, 'locked' => 'true']); $this->assignMeetEntitlement($john); // Create the session (also makes sure the owner can access a locked room) $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}", ['init' => 1]); $response->assertStatus(200); // Non-owner, locked room, invalid/missing input $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}"); $response->assertStatus(422); $json = $response->json(); $this->assertCount(4, $json); $this->assertSame(326, $json['code']); $this->assertSame('error', $json['status']); $this->assertSame('Failed to join the session. Room locked.', $json['message']); $this->assertTrue($json['config']['locked']); // Non-owner, locked room, invalid requestId $post = ['nickname' => 'name', 'requestId' => '-----', 'init' => 1]; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(422); $json = $response->json(); $this->assertSame(326, $json['code']); // Non-owner, locked room, invalid requestId $post = ['nickname' => 'name', 'init' => 1]; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(422); $json = $response->json(); $this->assertSame(326, $json['code']); // Non-owner, locked room, valid input $reqId = '12345678'; $post = ['nickname' => 'name', 'requestId' => $reqId, 'picture' => '']; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(422); $json = $response->json(); $this->assertCount(4, $json); $this->assertSame(327, $json['code']); $this->assertSame('error', $json['status']); $this->assertSame('Failed to join the session. Room locked.', $json['message']); $this->assertTrue($json['config']['locked']); $room->refresh(); $request = $room->requestGet($reqId); $this->assertSame($post['nickname'], $request['nickname']); $this->assertSame($post['requestId'], $request['requestId']); $room->requestAccept($reqId); // Non-owner, locked room, join request accepted $post['init'] = 1; $post['canPublish'] = true; $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); $json = $response->json(); $this->assertSame(Room::ROLE_PUBLISHER, $json['role']); $this->assertMatchesRegularExpression('|^wss?://|', $json['token']); // TODO: Test a scenario where both password and lock are enabled // TODO: Test accepting/denying as a non-owner moderator // TODO: Test somehow websocket communication $this->markTestIncomplete(); } /** * Test joining the room * * @group openvidu * @depends testJoinRoom */ public function testJoinRoomGuest(): void { $this->assignMeetEntitlement('john@kolab.org'); // There's no easy way to logout the user in the same test after // using actingAs(). That's why this is moved to a separate test $room = Room::where('name', 'john')->first(); // Guest, request with screenShare token $post = ['canPublish' => true, 'screenShare' => 1, 'init' => 1]; $response = $this->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); $json = $response->json(); $this->assertSame(Room::ROLE_PUBLISHER, $json['role']); - $this->assertSame($room->session_id, $json['session']); $this->assertMatchesRegularExpression('|^wss?://|', $json['token']); } /** * Test configuring the room (session) * * @group openvidu */ public function testSetRoomConfig(): void { $john = $this->getTestUser('john@kolab.org'); $jack = $this->getTestUser('jack@kolab.org'); $room = Room::where('name', 'john')->first(); // Unauth access not allowed $response = $this->post("api/v4/openvidu/rooms/{$room->name}/config", []); $response->assertStatus(401); // Non-existing room name $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/non-existing/config", []); $response->assertStatus(404); // TODO: Test a room with a deleted owner // Non-owner $response = $this->actingAs($jack)->post("api/v4/openvidu/rooms/{$room->name}/config", []); $response->assertStatus(403); // Room owner $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}/config", []); $response->assertStatus(200); $json = $response->json(); $this->assertCount(2, $json); $this->assertSame('success', $json['status']); $this->assertSame("Room configuration updated successfully.", $json['message']); // Set password and room lock $post = ['password' => 'aaa', 'locked' => 1]; $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}/config", $post); $response->assertStatus(200); $json = $response->json(); $this->assertCount(2, $json); $this->assertSame('success', $json['status']); $this->assertSame("Room configuration updated successfully.", $json['message']); $room->refresh(); $this->assertSame('aaa', $room->getSetting('password')); $this->assertSame('true', $room->getSetting('locked')); // Unset password and room lock $post = ['password' => '', 'locked' => 0]; $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}/config", $post); $response->assertStatus(200); $json = $response->json(); $this->assertCount(2, $json); $this->assertSame('success', $json['status']); $this->assertSame("Room configuration updated successfully.", $json['message']); $room->refresh(); $this->assertSame(null, $room->getSetting('password')); $this->assertSame(null, $room->getSetting('locked')); // Test invalid option error $post = ['password' => 'eee', 'unknown' => 0]; $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}/config", $post); $response->assertStatus(422); $json = $response->json(); $this->assertCount(2, $json); $this->assertSame('error', $json['status']); $this->assertSame("Invalid room configuration option.", $json['errors']['unknown']); $room->refresh(); $this->assertSame(null, $room->getSetting('password')); } /** * Test the webhook * * @group openvidu */ public function testWebhook(): void { $this->assignMeetEntitlement('john@kolab.org'); $john = $this->getTestUser('john@kolab.org'); $room = Room::where('name', 'john')->first(); // First, create the session $post = ['init' => 1]; $response = $this->actingAs($john)->post("api/v4/openvidu/rooms/{$room->name}", $post); $response->assertStatus(200); $sessionId = $room->fresh()->session_id; // Test accepting a join request $room->requestSave('1234', ['nickname' => 'test']); $post = ['roomId' => $sessionId, 'requestId' => '1234', 'event' => 'joinRequestAccepted']; $response = $this->post("api/webhooks/meet", $post); $response->assertStatus(200); $request = $room->requestGet('1234'); $this->assertSame(Room::REQUEST_ACCEPTED, $request['status']); // Test denying a join request $room->requestSave('1234', ['nickname' => 'test']); $post = ['roomId' => $sessionId, 'requestId' => '1234', 'event' => 'joinRequestDenied']; $response = $this->post("api/webhooks/meet", $post); $response->assertStatus(200); $request = $room->requestGet('1234'); $this->assertSame(Room::REQUEST_DENIED, $request['status']); // Test closing the session $post = ['roomId' => $sessionId, 'event' => 'roomClosed']; $response = $this->post("api/webhooks/meet", $post); $response->assertStatus(200); $this->assertNull($room->fresh()->session_id); } }