diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index 183bd51e..a8bdfd55 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1200 +1,1250 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); const crypto = require('crypto'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const Roles = require('./userRoles'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter { static calculateLoads(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); // Calculate router loads by adding up peers per router, and collected piped routers Object.values(peers).forEach(peer => { const routerId = peer.routerId; if (routerId) { if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); } else { routerLoads.set(routerId, 1); } } }); // Calculate worker loads by adding up router loads per worker for (const worker of mediasoupWorkers) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } return {routerLoads, workerLoads, pipedRoutersIds}; } /* * Find a router that is on a worker that is least loaded. * * A worker with a router that we are already piping to is preferred. */ static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const {workerLoads, pipedRoutersIds} = Room.calculateLoads(mediasoupWorkers, peers, mediasoupRouters); const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } else { // find if there is a piped router that is on a worker that is below limit for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ - static async create({ mediasoupWorkers, roomId, peers }) { + static async create({ mediasoupWorkers, roomId, peers, webhook }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, - peers + peers, + webhook }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, - peers + peers, + webhook }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); this._peers = peers; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; this._audioLevelObserver = audioLevelObserver; + + this._webhook = webhook; } dumpStats() { const peers = this.getPeers(); const {routerLoads, workerLoads, pipedRoutersIds} = Room.calculateLoads(this._mediasoupWorkers, peers, this._mediasoupRouters); let stats = { numberOfWorkers: this._mediasoupWorkers.length, numberOfRouters: this._mediasoupRouters.size, numberOfPeers: peers.length, routerLoads: routerLoads, workerLoads: workerLoads, pipedRoutersIds: pipedRoutersIds, }; console.log(stats); } close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; // Close the peers. Object.values(this._peers).forEach(peer => { if (!peer.closed) peer.close(); }); this._peers = {}; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._mediasoupRouters.clear(); this._mediasoupWorkers = null; this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } handlePeer({ peer }) { logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role); // Should not happen if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } this._peerJoining(peer); } logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } get id() { return this._roomId; } selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } checkEmpty() { return Object.keys(this._peers).length === 0; } _getTURNCredentials(name, secret) { const unixTimeStamp = parseInt(Date.now()/1000) + 24*3600; // this credential would be valid for the next 24 hours // If there is no name, the timestamp alone can also be used. const username = name ? `${unixTimeStamp}:${name}` : `${unixTimeStamp}`; const hmac = crypto.createHmac('sha1', secret); hmac.setEncoding('base64'); hmac.write(username); hmac.end(); const password = hmac.read(); return { username, password }; } _peerJoining(peer) { this._queue.push(async () => { peer.socket.join(this._roomId); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); let iceServers; if (config.turn) { // Generate time-limited credentials. The name is only relevant for the logs. const {username, password} = this._getTURNCredentials(peer.id, config.turn.staticSecret); iceServers = [ { urls : config.turn.urls, username : username, credential : password } ]; } this._notification(peer.socket, 'roomReady', { iceServers }); }) .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); peer.on('close', () => { this._handlePeerClose(peer); }); peer.on('nicknameChanged', () => { // Spread to others const data = { peerId: peer.id, nickname: peer.nickname }; this._notification(peer.socket, 'changeNickname', data, true); }); peer.on('languageChanged', () => { // Spread to others (and self) const data = { peerId: peer.id, language: peer.language }; this._notification(peer.socket, 'changeLanguage', data, true, true); }); peer.on('roleChanged', () => { // Spread to others (and self) const data = { peerId: peer.id, role: peer.role }; this._notification(peer.socket, 'changeRole', data, true, true); }); peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); switch (request.method) { case 'getRouterRtpCapabilities': { cb(null, router.rtpCapabilities); break; } case 'dumpStats': { this.dumpStats() cb(null); break; } case 'join': { const { nickname, rtpCapabilities } = request.data; // Store client data into the Peer data object. peer.nickname = nickname; peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. const otherPeers = this.getPeers(peer); const peerInfos = otherPeers.map(otherPeer => otherPeer.peerInfo); cb(null, { id: peer.id, role: peer.role, peers: peerInfos, }); // Create Consumers for existing Producers. for (const otherPeer of otherPeers) { for (const producer of otherPeer.producers.values()) { this._createConsumer({ consumerPeer: peer, producerPeer: otherPeer, producer }); } } // Notify the new Peer to all other Peers. this._notification(peer.socket, 'newPeer', peer.peerInfo, true); logger.debug( 'peer joined [peer: "%s", nickname: "%s"]', peer.id, nickname); break; } case 'createPlainTransport': { const { producing, consuming } = request.data; const transport = await router.createPlainTransport( { //When consuming we manually connect using connectPlainTransport, //otherwise we let the port autodetection work. comedia: producing, // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) rtcpMux: false, listenIp: { ip: "127.0.0.1", announcedIp: null }, appData : { producing, consuming } } ); // await transport.enableTraceEvent([ "probation", "bwe" ]); // transport.on("trace", (trace) => { // console.log(trace); // }); peer.addTransport(transport.id, transport); cb( null, { id : transport.id, ip : transport.tuple.localIp, port : transport.tuple.localPort, rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined }); break; } case 'connectPlainTransport': { const { transportId, ip, port, rtcpPort } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ ip: ip, port: port, rtcpPort: rtcpPort, }); cb(); break; } case 'createWebRtcTransport': { // NOTE: Don't require that the Peer is joined here, so the client can // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; webRtcTransportOptions.enableTcp = true; if (forceTcp) webRtcTransportOptions.enableUdp = false; else { webRtcTransportOptions.enableUdp = true; webRtcTransportOptions.preferUdp = true; } const transport = await router.createWebRtcTransport( webRtcTransportOptions ); transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); } }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters }); const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } catch (error) { logger.info("Setting the incoming bitrate failed") } } break; } case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ dtlsParameters }); cb(); break; } /* case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } */ case 'produce': { let { appData } = request.data; if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) throw new Error('invalid producer source'); if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); const { transportId, kind, rtpParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); // Add peerId into appData to later get the associated Peer during // the 'loudest' event of the audioLevelObserver. appData = { ...appData, peerId: peer.id }; const producer = await transport.produce({ kind, rtpParameters, appData }); const pipeRouters = this._getRoutersToPipeTo(peer.routerId); for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, router : destinationRouter }); } } // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); producer.on('videoorientationchange', (videoOrientation) => { logger.debug( 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); // Trace individual packets for debugging // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); // producer.on("trace", (trace) => { // console.log(`Trace on ${producer.id}`, trace); // }); cb(null, { id: producer.id }); // Optimization: Create a server-side Consumer for each Peer. for (const otherPeer of this.getPeers(peer)) { this._createConsumer({ consumerPeer: otherPeer, producerPeer: peer, producer }); } // Add into the audioLevelObserver. if (kind === 'audio') { this._audioLevelObserver.addProducer({ producerId: producer.id }) .catch(() => {}); } break; } case 'closeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); producer.close(); // Remove from its map. peer.removeProducer(producer.id); cb(); break; } case 'pauseProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.pause(); cb(); break; } case 'resumeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.resume(); cb(); break; } case 'pauseConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.pause(); cb(); break; } case 'resumeConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.resume(); cb(); break; } case 'changeNickname': { const { nickname } = request.data; peer.nickname = nickname; // This will be spread through events from the peer object // Return no error cb(); break; } case 'chatMessage': { const { message } = request.data; // Spread to others this._notification(peer.socket, 'chatMessage', { peerId: peer.id, nickname: peer.nickname, message: message }, true, true); // Return no error cb(); break; } case 'moderator:addRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (!rolePeer.hasRole(role)) { // The 'owner' role is not assignable if (role & Roles.OWNER) throw new Error('the OWNER role is not assignable'); // Promotion to publisher? Put the user hand down if (role & Roles.PUBLISHER && !(rolePeer.role & Roles.PUBLISHER)) rolePeer.raisedHand = false; // This will propagate the event automatically rolePeer.setRole(rolePeer.role | role); } // Return no error cb(); break; } case 'moderator:removeRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (rolePeer.hasRole(role)) { if (role & Roles.OWNER) throw new Error('the OWNER role is not removable'); if (role & Roles.MODERATOR && rolePeer.role & Roles.OWNER) throw new Error('the MODERATOR role cannot be removed from the OWNER'); // Non-publisher cannot be a language interpreter if (role & Roles.PUBLISHER) rolePeer.language = null; // This will propagate the event automatically rolePeer.setRole(rolePeer.role ^ role); } // Return no error cb(); break; } case 'moderator:changeLanguage': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { language } = request.data; if (language && !/^[a-z]{2}$/.test(language)) throw new Error('invalid language code'); peer.language = language; // This will be spread through events from the peer object // Return no error cb(); break; } + case 'moderator:joinRequestAccept': + { + if (!peer.hasRole(Roles.MODERATOR)) + throw new Error('peer not authorized'); + + const { requestId } = request.data; + + // Return no error + cb(); + + if (this._webhook) { + this._webhook.post('', { requestId, roomId: this._roomId, event: 'joinRequestAccepted' }) + .then(function (response) { + logger.info(`Accepted join request ${requestId}. Webhook succeeded.`); + }) + .catch(function (error) { + logger.error(error); + }); + } + + break; + } + + case 'moderator:joinRequestDeny': + { + if (!peer.hasRole(Roles.MODERATOR)) + throw new Error('peer not authorized'); + + const { requestId } = request.data; + + // Return no error + cb(); + + if (this._webhook) { + this._webhook.post('', { requestId, roomId: this._roomId, event: 'joinRequestDenied' }) + .then(function (response) { + logger.info(`Denied join request ${requestId}. Webhook succeeded.`); + }) + .catch(function (error) { + logger.error(error); + }); + } + + break; + } + case 'moderator:closeRoom': { if (!peer.hasRole(Roles.OWNER)) throw new Error('peer not authorized'); this._notification(peer.socket, 'moderator:closeRoom', null, true); cb(); // Close the room this.close(); // TODO: remove the room? break; } case 'moderator:kickPeer': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const kickPeer = this._peers[peerId]; if (!kickPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(kickPeer.socket, 'moderator:kickPeer'); kickPeer.close(); cb(); break; } case 'raisedHand': { const { raisedHand } = request.data; peer.raisedHand = raisedHand; // Spread to others this._notification(peer.socket, 'raisedHand', { peerId: peer.id, raisedHand: raisedHand, }, true); // Return no error cb(); break; } default: { logger.error('unknown request.method "%s"', request.method); cb(500, `unknown request.method "${request.method}"`); } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Trace individual packets for debugging // await consumer.enableTraceEvent([ "rtp", "pli", "fir" ]); // consumer.on("trace", (trace) => { // console.log(`Trace on ${consumer.id}`, trace); // }); // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); // TODO: We don't have to send websocket signals on producerpause/producerresume // The same can be achieved on the client-side using consumer.observer.on('pause') // and consumer.observer.on('resume') consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } /** * Get the list of peers. */ getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } _timeoutCallback(callback) { let called = false; const interval = setTimeout( () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } _sendRequest(socket, method, data = {}) { return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, this._timeoutCallback((err, response) => { if (err) { reject(err); } else { resolve(response); } }) ); }); } async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; for (let tries = 0; tries < requestRetries; tries++) { try { return await this._sendRequest(socket, method, data); } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } _notification(socket, method, data = {}, broadcast = false, includeSender = false) { if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); } else { socket.emit('notification', { method, data }); } } /* * Pipe producers of peers that are running under another router to this router. */ async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); // All peers that have a different router const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); for (const producerId of peer.producers.keys()) { if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._peers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to: // The combined set of routers of all peers, exluding the router of the peer itself. _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/meet/server/server.js b/meet/server/server.js index adfef1c9..62c0a408 100755 --- a/meet/server/server.js +++ b/meet/server/server.js @@ -1,417 +1,417 @@ #!/usr/bin/env node process.title = 'kolabmeet-server'; const config = require('./config/config'); const fs = require('fs'); const http = require('http'); const spdy = require('spdy'); const express = require('express'); const bodyParser = require('body-parser'); const cookieParser = require('cookie-parser'); const compression = require('compression'); const mediasoup = require('mediasoup'); const AwaitQueue = require('awaitqueue'); const Logger = require('./lib/Logger'); const Room = require('./lib/Room'); const Peer = require('./lib/Peer'); const helmet = require('helmet'); const axios = require('axios'); // auth const redis = require('redis'); const expressSession = require('express-session'); const RedisStore = require('connect-redis')(expressSession); const sharedSession = require('express-socket.io-session'); const interactiveServer = require('./lib/interactiveServer'); const promExporter = require('./lib/promExporter'); const { v4: uuidv4 } = require('uuid'); /* eslint-disable no-console */ console.log('- process.env.DEBUG:', process.env.DEBUG); console.log('- config.mediasoup.worker.logLevel:', config.mediasoup.worker.logLevel); console.log('- config.mediasoup.worker.logTags:', config.mediasoup.worker.logTags); /* eslint-enable no-console */ const logger = new Logger(); const queue = new AwaitQueue(); let statusLogger = null; if ('StatusLogger' in config) statusLogger = new config.StatusLogger(); // mediasoup Workers. // @type {Array} const mediasoupWorkers = []; // Map of Room instances indexed by roomId. const rooms = new Map(); // Map of Peer instances indexed by peerId. const peers = new Map(); // TLS server configuration. const tls = { cert : fs.readFileSync(config.tls.cert), key : fs.readFileSync(config.tls.key), secureOptions : 'tlsv12', ciphers : [ 'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES256-GCM-SHA384', 'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-CHACHA20-POLY1305', 'ECDHE-RSA-CHACHA20-POLY1305', 'DHE-RSA-AES128-GCM-SHA256', 'DHE-RSA-AES256-GCM-SHA384' ].join(':'), honorCipherOrder : true }; // HTTP client instance for webhook "pushes" let webhook = null; if (config.webhookURL) { webhook = axios.create({ baseURL: config.webhookURL, timeout: 5000 }); } const app = express(); app.use(helmet.hsts()); const sharedCookieParser=cookieParser(); app.use(sharedCookieParser); app.use(bodyParser.json({ limit: '5mb' })); app.use(bodyParser.urlencoded({ limit: '5mb', extended: true })); const session = expressSession({ secret : config.cookieSecret, name : config.cookieName, resave : true, saveUninitialized : true, store : config.redisOptions.host != 'none' ? new RedisStore({ client: redis.createClient(config.redisOptions) }) : null, cookie : { secure : true, httpOnly : true, maxAge : 60 * 60 * 1000 // Expire after 1 hour since last request from user } }); if (config.trustProxy) { app.set('trust proxy', config.trustProxy); } app.use(session); let mainListener; let io; async function run() { try { // Open the interactive server. await interactiveServer(rooms, peers); // start Prometheus exporter if (config.prometheus) { await promExporter(rooms, peers, config.prometheus); } // Run a mediasoup Worker. await runMediasoupWorkers(); // Run HTTPS server. await runHttpsServer(); // Run WebSocketServer. await runWebSocketServer(); const errorHandler = (err, req, res /*, next */) => { const trackingId = uuidv4(); res.status(500).send( `

Internal Server Error

If you report this error, please also report this tracking ID which makes it possible to locate your session in the logs which are available to the system administrator: ${trackingId}

` ); logger.error( 'Express error handler dump with tracking ID: %s, error dump: %o', trackingId, err); }; app.use(errorHandler); } catch (error) { logger.error('run() [error:"%o"]', error); } app.emit('ready'); } function statusLog() { if (statusLogger) { statusLogger.log({ rooms : rooms, peers : peers }); } } async function runHttpsServer() { app.use(compression()); app.get(`${config.pathPrefix}/api/ping`, function (req, res /*, next*/) { res.send('PONG') }) app.get(`${config.pathPrefix}/api/sessions`, function (req, res /*, next*/) { //TODO json.stringify res.json({ id : "testId" }) }) // Check if the room exists app.get(`${config.pathPrefix}/api/sessions/:session_id`, function (req, res /*, next*/) { console.log("Checking for room") let room = rooms.get(req.params.session_id); if (!room) { console.log("doesn't exist") res.status(404).send() } else { console.log("exist") res.status(200).send() } }) // Create room and return id app.post(`${config.pathPrefix}/api/sessions`, async function (req, res /*, next*/) { console.log("Creating new room"); //FIXME we're truncating because of kolab4 database layout (should be fixed instead) const roomId = uuidv4().substring(0, 16) await getOrCreateRoom({ roomId }); res.json({ id : roomId }) }) app.post(`${config.pathPrefix}/api/signal`, async function (req, res /*, next*/) { let data = req.body; const roomId = data.roomId; const emit = (socket) => { socket.emit('notification', { method: `signal:${data.type}`, data: data.data }) }; if ('role' in data) { peers.forEach(peer => { if (peer.socket && peer.roomId == roomId && peer.hasRole(data.role)) { emit(peer.socket); } }) } else { emit(io.to(roomId)); } res.json({}) }); // Create connection in room (just wait for websocket instead? // $post = [ // 'json' => [ // 'role' => self::OV_ROLE_PUBLISHER, // 'data' => json_encode(['role' => $role]) // ] // ]; app.post(`${config.pathPrefix}/api/sessions/:session_id/connection`, function (req, res /*, next*/) { logger.info('Creating peer connection [roomId:"%s"]', req.params.session_id); let roomId = req.params.session_id let data = req.body; //FIXME we're truncating because of kolab4 database layout (should be fixed instnead) const peerId = uuidv4().substring(0, 16) //TODO create room already? let peer = new Peer({ id: peerId, roomId }); peers.set(peerId, peer); peer.on('close', () => { peers.delete(peerId); statusLog(); }); if ('role' in data) peer.setRole(data.role); const proto = config.publicDomain.includes('localhost') || config.publicDomain.includes('127.0.0.1') ? 'ws' : 'wss'; res.json({ id: peerId, // Note: socket.io client will end up using (hardcoded) /meetmedia/signaling path token: `${proto}://${config.publicDomain}?peerId=${peerId}&roomId=${roomId}` }); }) if (config.httpOnly === true) { // http mainListener = http.createServer(app); } else { // https mainListener = spdy.createServer(tls, app); // http const redirectListener = http.createServer(app); if (config.listeningHost) redirectListener.listen(config.listeningRedirectPort, config.listeningHost); else redirectListener.listen(config.listeningRedirectPort); } console.info(`Listening on ${config.listeningPort} ${config.listeningHost}`) // https or http if (config.listeningHost) mainListener.listen(config.listeningPort, config.listeningHost); else mainListener.listen(config.listeningPort); } /** * Create a WebSocketServer to allow WebSocket connections from browsers. */ async function runWebSocketServer() { io = require('socket.io')(mainListener, { path: `${config.pathPrefix}/signaling`, cookie: false }); io.use( sharedSession(session, sharedCookieParser, { autoSave: true }) ); // Handle connections from clients. io.on('connection', (socket) => { logger.info("websocket connection") const { roomId, peerId } = socket.handshake.query; if (!roomId || !peerId) { logger.warn('connection request without roomId and/or peerId'); socket.disconnect(true); return; } logger.info('connection request [roomId:"%s", peerId:"%s"]', roomId, peerId); queue.push(async () => { const room = await getOrCreateRoom({ roomId }); let peer = peers.get(peerId); if (!peer) { logger.warn("Peer does not exist %s", peerId); socket.disconnect(true); return; } peer.socket = socket; room.handlePeer({ peer }); statusLog(); }) .catch((error) => { logger.error('room creation or room joining failed [error:"%o"]', error); if (socket) socket.disconnect(true); }); }); } /** * Launch as many mediasoup Workers as given in the configuration file. */ async function runMediasoupWorkers() { const { numWorkers } = config.mediasoup; logger.info('running %d mediasoup Workers...', numWorkers); for (let i = 0; i < numWorkers; ++i) { const worker = await mediasoup.createWorker( { logLevel : config.mediasoup.worker.logLevel, logTags : config.mediasoup.worker.logTags, rtcMinPort : config.mediasoup.worker.rtcMinPort, rtcMaxPort : config.mediasoup.worker.rtcMaxPort }); worker.on('died', () => { logger.error( 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid); setTimeout(() => process.exit(1), 2000); }); mediasoupWorkers.push(worker); } } /** * Get a Room instance (or create one if it does not exist). */ async function getOrCreateRoom({ roomId }) { let room = rooms.get(roomId); // If the Room does not exist create a new one. if (!room) { logger.info('creating a new Room [roomId:"%s"]', roomId); // Get existing peers in the room const roomPeers = {} peers.forEach(peer => { if (!peer.closed && peer.roomId == roomId) roomPeers[peer.id] = peer; }); // Create the room - room = await Room.create({ mediasoupWorkers, roomId, peers: roomPeers }); + room = await Room.create({ mediasoupWorkers, roomId, peers: roomPeers, webhook }); rooms.set(roomId, room); statusLog(); room.on('close', () => { logger.info('closing a Room [roomId:"%s"]', roomId); rooms.delete(roomId); statusLog(); if (webhook) { webhook.post('', { roomId, event: 'roomClosed' }) .then(function (response) { logger.info(`Room ${roomId} closed. Webhook succeeded.`); }) .catch(function (error) { logger.error(error); }); } }); } return room; } run(); module.exports = app; // export for testing diff --git a/src/app/Http/Controllers/API/V4/OpenViduController.php b/src/app/Http/Controllers/API/V4/OpenViduController.php index b741ac26..fb33c121 100644 --- a/src/app/Http/Controllers/API/V4/OpenViduController.php +++ b/src/app/Http/Controllers/API/V4/OpenViduController.php @@ -1,404 +1,288 @@ first(); - - // This isn't a room, bye bye - if (!$room) { - return $this->errorResponse(404, \trans('meet.room-not-found')); - } - - // Only the moderator can do it - if (!$this->isModerator($room)) { - return $this->errorResponse(403); - } - - if (!$room->requestAccept($reqid)) { - return $this->errorResponse(500, \trans('meet.session-request-accept-error')); - } - - return response()->json(['status' => 'success']); - } - - /** - * Deny the room join request. - * - * @param string $id Room identifier (name) - * @param string $reqid Request identifier - * - * @return \Illuminate\Http\JsonResponse - */ - public function denyJoinRequest($id, $reqid) - { - $room = Room::where('name', $id)->first(); - - // This isn't a room, bye bye - if (!$room) { - return $this->errorResponse(404, \trans('meet.room-not-found')); - } - - // Only the moderator can do it - if (!$this->isModerator($room)) { - return $this->errorResponse(403); - } - - if (!$room->requestDeny($reqid)) { - return $this->errorResponse(500, \trans('meet.session-request-deny-error')); - } - - return response()->json(['status' => 'success']); - } - /** * Listing of rooms that belong to the authenticated user. * * @return \Illuminate\Http\JsonResponse */ public function index() { $user = Auth::guard()->user(); $rooms = Room::where('user_id', $user->id)->orderBy('name')->get(); if (count($rooms) == 0) { // Create a room for the user (with a random and unique name) while (true) { $name = strtolower(\App\Utils::randStr(3, 3, '-')); if (!Room::where('name', $name)->count()) { break; } } $room = Room::create([ 'name' => $name, 'user_id' => $user->id ]); $rooms = collect([$room]); } $result = [ 'list' => $rooms, 'count' => count($rooms), ]; return response()->json($result); } /** * Join the room session. Each room has one owner, and the room isn't open until the owner * joins (and effectively creates the session). * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse */ public function joinRoom($id) { $room = Room::where('name', $id)->first(); // Room does not exist, or the owner is deleted if (!$room || !$room->owner) { return $this->errorResponse(404, \trans('meet.room-not-found')); } // Check if there's still a valid meet entitlement for the room owner if (!$room->owner->hasSku('meet')) { return $this->errorResponse(404, \trans('meet.room-not-found')); } $user = Auth::guard()->user(); $isOwner = $user && $user->id == $room->user_id; $init = !empty(request()->input('init')); // There's no existing session if (!$room->hasSession()) { // Participants can't join the room until the session is created by the owner if (!$isOwner) { return $this->errorResponse(422, \trans('meet.session-not-found'), ['code' => 323]); } // The room owner can create the session on request if (!$init) { return $this->errorResponse(422, \trans('meet.session-not-found'), ['code' => 324]); } $session = $room->createSession(); if (empty($session)) { return $this->errorResponse(500, \trans('meet.session-create-error')); } } $settings = $room->getSettings(['locked', 'nomedia', 'password']); $password = (string) $settings['password']; $config = [ 'locked' => $settings['locked'] === 'true', 'nomedia' => $settings['nomedia'] === 'true', 'password' => $isOwner ? $password : '', 'requires_password' => !$isOwner && strlen($password), ]; $response = ['config' => $config]; // Validate room password if (!$isOwner && strlen($password)) { $request_password = request()->input('password'); if ($request_password !== $password) { return $this->errorResponse(422, \trans('meet.session-password-error'), $response + ['code' => 325]); } } // Handle locked room if (!$isOwner && $config['locked']) { $nickname = request()->input('nickname'); $picture = request()->input('picture'); $requestId = request()->input('requestId'); $request = $requestId ? $room->requestGet($requestId) : null; $error = \trans('meet.session-room-locked-error'); // Request already has been processed (not accepted yet, but it could be denied) if (empty($request['status']) || $request['status'] != Room::REQUEST_ACCEPTED) { if (!$request) { if (empty($nickname) || empty($requestId) || !preg_match('/^[a-z0-9]{8,32}$/i', $requestId)) { return $this->errorResponse(422, $error, $response + ['code' => 326]); } if (empty($picture)) { $svg = file_get_contents(resource_path('images/user.svg')); $picture = 'data:image/svg+xml;base64,' . base64_encode($svg); } elseif (!preg_match('|^data:image/png;base64,[a-zA-Z0-9=+/]+$|', $picture)) { return $this->errorResponse(422, $error, $response + ['code' => 326]); } // TODO: Resize when big/make safe the user picture? $request = ['nickname' => $nickname, 'requestId' => $requestId, 'picture' => $picture]; if (!$room->requestSave($requestId, $request)) { // FIXME: should we use error code 500? return $this->errorResponse(422, $error, $response + ['code' => 326]); } // Send the request (signal) to all moderators $result = $room->signal('joinRequest', $request, Room::ROLE_MODERATOR); } return $this->errorResponse(422, $error, $response + ['code' => 327]); } } // Initialize connection tokens if ($init) { // Choose the connection role $canPublish = !empty(request()->input('canPublish')) && (empty($config['nomedia']) || $isOwner); $role = $canPublish ? Room::ROLE_PUBLISHER : Room::ROLE_SUBSCRIBER; if ($isOwner) { $role |= Room::ROLE_MODERATOR; $role |= Room::ROLE_OWNER; } // Create session token for the current user/connection $response = $room->getSessionToken($role); if (empty($response)) { return $this->errorResponse(500, \trans('meet.session-join-error')); } $response_code = 200; $response['role'] = $role; $response['config'] = $config; } else { $response_code = 422; $response['code'] = 322; } return response()->json($response, $response_code); } /** * Set the domain configuration. * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse|void */ public function setRoomConfig($id) { $room = Room::where('name', $id)->first(); // Room does not exist, or the owner is deleted if (!$room || !$room->owner) { return $this->errorResponse(404); } $user = Auth::guard()->user(); // Only room owner can configure the room if ($user->id != $room->user_id) { return $this->errorResponse(403); } $input = request()->input(); $errors = []; foreach ($input as $key => $value) { switch ($key) { case 'password': if ($value === null || $value === '') { $input[$key] = null; } else { // TODO: Do we have to validate the password in any way? } break; case 'locked': $input[$key] = $value ? 'true' : null; break; case 'nomedia': $input[$key] = $value ? 'true' : null; break; default: $errors[$key] = \trans('meet.room-unsupported-option-error'); } } if (!empty($errors)) { return response()->json(['status' => 'error', 'errors' => $errors], 422); } if (!empty($input)) { $room->setSettings($input); } return response()->json([ 'status' => 'success', 'message' => \trans('meet.room-setconfig-success'), ]); } /** * Webhook as triggered from OpenVidu server * * @param \Illuminate\Http\Request $request The API request. * * @return \Illuminate\Http\Response The response */ public function webhook(Request $request) { \Log::debug($request->getContent()); - switch ((string) $request->input('event')) { + $sessionId = $request->input('roomId'); + $event = (string) $request->input('event'); + + switch ($event) { case 'roomClosed': // When all participants left the room the server will dispatch roomClosed // event. We'll remove the session reference from the database. - $sessionId = $request->input('roomId'); $room = Room::where('session_id', $sessionId)->first(); if ($room) { $room->session_id = null; $room->save(); } break; - case 'requestAccepted': - case 'requestDenied': - // TODO - break; - } - - return response('Success', 200); - } - - /** - * Check if current user is a moderator for the specified room. - * - * @param \App\OpenVidu\Room $room The room - * - * @return bool True if the current user is the room moderator - */ - protected function isModerator(Room $room): bool - { - $user = Auth::guard()->user(); - - // The room owner is a moderator - if ($user && $user->id == $room->user_id) { - return true; - } - - // Moderator's authentication via the extra request header - if ( - ($connection = $this->getConnectionFromRequest()) - && $connection->session_id === $room->session_id - && $connection->role & Room::ROLE_MODERATOR - ) { - return true; - } + case 'joinRequestAccepted': + case 'joinRequestDenied': + $room = Room::where('session_id', $sessionId)->first(); - return false; - } + if ($room) { + $method = $event == 'joinRequestAccepted' ? 'requestAccept' : 'requestDeny'; - /** - * Check if current user "owns" the specified connection. - * - * @param \App\OpenVidu\Connection $connection The connection - * - * @return bool - */ - protected function isSelfConnection(Connection $connection): bool - { - return ($conn = $this->getConnectionFromRequest()) - && $conn->id === $connection->id; - } + $room->{$method}($request->input('requestId')); + } - /** - * Get the connection object for the token in current request headers. - * It will also validate the token. - * - * @return \App\OpenVidu\Connection|null Connection (if exists and the token is valid) - */ - protected function getConnectionFromRequest() - { - // Authenticate the user via the extra request header - if ($token = request()->header(self::AUTH_HEADER)) { - list($connId, ) = explode(':', base64_decode($token), 2); - - if ( - ($connection = Connection::find($connId)) - && $connection->metadata['authToken'] === $token - ) { - return $connection; - } + break; } - return null; + return response('Success', 200); } } diff --git a/src/app/OpenVidu/Room.php b/src/app/OpenVidu/Room.php index 3dcaafd5..8e0b4d93 100644 --- a/src/app/OpenVidu/Room.php +++ b/src/app/OpenVidu/Room.php @@ -1,371 +1,281 @@ false, // No exceptions from Guzzle 'base_uri' => \config('openvidu.api_url'), 'verify' => \config('openvidu.api_verify_tls'), 'auth' => [ \config('openvidu.api_username'), \config('openvidu.api_password') ], 'on_stats' => function (\GuzzleHttp\TransferStats $stats) { $threshold = \config('logging.slow_log'); if ($threshold && ($sec = $stats->getTransferTime()) > $threshold) { $url = $stats->getEffectiveUri(); $method = $stats->getRequest()->getMethod(); \Log::warning(sprintf("[STATS] %s %s: %.4f sec.", $method, $url, $sec)); } }, ] ); } return self::$client; } - /** - * Destroy a OpenVidu connection - * - * @param string $conn Connection identifier - * - * @return bool True on success, False otherwise - * @throws \Exception if session does not exist - */ - public function closeOVConnection($conn): bool - { - if (!$this->session_id) { - throw new \Exception("The room session does not exist"); - } - - $url = 'sessions/' . $this->session_id . '/connection/' . urlencode($conn); - - $response = $this->client()->request('DELETE', $url); - - return $response->getStatusCode() == 204; - } - - /** - * Fetch a OpenVidu connection information. - * - * @param string $conn Connection identifier - * - * @return ?array Connection data on success, Null otherwise - * @throws \Exception if session does not exist - */ - public function getOVConnection($conn): ?array - { - // Note: getOVConnection() not getConnection() because Eloquent\Model::getConnection() exists - // TODO: Maybe use some other name? getParticipant? - if (!$this->session_id) { - throw new \Exception("The room session does not exist"); - } - - $url = 'sessions/' . $this->session_id . '/connection/' . urlencode($conn); - - $response = $this->client()->request('GET', $url); - - if ($response->getStatusCode() == 200) { - return json_decode($response->getBody(), true); - } - - return null; - } - /** * Create a OpenVidu session * * @return array|null Session data on success, NULL otherwise */ public function createSession(): ?array { $params = [ 'json' => [ /* request params here */ ] ]; $response = $this->client()->request('POST', "sessions", $params); if ($response->getStatusCode() !== 200) { $this->session_id = null; $this->save(); return null; // TODO: Log an error/warning } $session = json_decode($response->getBody(), true); $this->session_id = $session['id']; $this->save(); return $session; } - /** - * Returns metadata for every connection in a session. - * - * @return array Connections metadata, indexed by connection identifier - * @throws \Exception if session does not exist - */ - public function getSessionConnections(): array - { - if (!$this->session_id) { - throw new \Exception("The room session does not exist"); - } - - return Connection::where('session_id', $this->session_id) - // Ignore screen sharing connection for now - ->whereRaw("(role & " . self::ROLE_SCREEN . ") = 0") - ->get() - ->keyBy('id') - ->map(function ($item) { - // Warning: Make sure to not return all metadata here as it might contain sensitive data. - return [ - 'role' => $item->role, - 'hand' => $item->metadata['hand'] ?? 0, - 'language' => $item->metadata['language'] ?? null, - ]; - }) - // Sort by order in the queue, so UI can re-build the existing queue in order - ->sort(function ($a, $b) { - return $a['hand'] <=> $b['hand']; - }) - ->all(); - } - /** * Create a OpenVidu session (connection) token * * @param int $role User role (see self::ROLE_* constants) * * @return array|null Token data on success, NULL otherwise * @throws \Exception if session does not exist */ public function getSessionToken($role = self::ROLE_SUBSCRIBER): ?array { if (!$this->session_id) { throw new \Exception("The room session does not exist"); } $url = 'sessions/' . $this->session_id . '/connection'; $post = [ 'json' => [ 'role' => $role, ] ]; $response = $this->client()->request('POST', $url, $post); if ($response->getStatusCode() == 200) { $json = json_decode($response->getBody(), true); - $authToken = base64_encode($json['id'] . ':' . \random_bytes(16)); + // TODO: make use of the authentication token - $connectionToken = $json['token']; - $connectionId = $json['id']; - - // Create the connection reference in our database - $conn = new Connection(); - $conn->id = $connectionId; - $conn->session_id = $this->session_id; - $conn->room_id = $this->id; - $conn->role = $role; - $conn->metadata = ['token' => $connectionToken, 'authToken' => $authToken]; - $conn->save(); + $authToken = base64_encode($json['id'] . ':' . \random_bytes(16)); return [ 'session' => $this->session_id, - 'token' => $connectionToken, + 'token' => $json['token'], 'authToken' => $authToken, 'role' => $role, ]; } // TODO: Log an error/warning on non-200 response return null; } /** * Check if the room has an active session * * @return bool True when the session exists, False otherwise */ public function hasSession(): bool { if (!$this->session_id) { return false; } $response = $this->client()->request('GET', "sessions/{$this->session_id}"); return $response->getStatusCode() == 200; } /** * The room owner. * * @return \Illuminate\Database\Eloquent\Relations\BelongsTo */ public function owner() { return $this->belongsTo('\App\User', 'user_id', 'id'); } /** * Accept the join request. * * @param string $id Request identifier * * @return bool True on success, False on failure */ public function requestAccept(string $id): bool { $request = Cache::get($this->session_id . '-' . $id); if ($request) { $request['status'] = self::REQUEST_ACCEPTED; return Cache::put($this->session_id . '-' . $id, $request, now()->addHours(1)); } return false; } /** * Deny the join request. * * @param string $id Request identifier * * @return bool True on success, False on failure */ public function requestDeny(string $id): bool { $request = Cache::get($this->session_id . '-' . $id); if ($request) { $request['status'] = self::REQUEST_DENIED; return Cache::put($this->session_id . '-' . $id, $request, now()->addHours(1)); } return false; } /** * Get the join request data. * * @param string $id Request identifier * * @return array|null Request data (e.g. nickname, status, picture?) */ public function requestGet(string $id): ?array { return Cache::get($this->session_id . '-' . $id); } /** * Save the join request. * * @param string $id Request identifier * @param array $request Request data * * @return bool True on success, False on failure */ public function requestSave(string $id, array $request): bool { // We don't really need the picture in the cache // As we use this cache for the request status only unset($request['picture']); return Cache::put($this->session_id . '-' . $id, $request, now()->addHours(1)); } /** * Any (additional) properties of this room. * * @return \Illuminate\Database\Eloquent\Relations\HasMany */ public function settings() { return $this->hasMany('App\OpenVidu\RoomSetting', 'room_id'); } /** * Send a OpenVidu signal to the session participants (connections) * * @param string $name Signal name (type) * @param array $data Signal data array * @param int $target Limit targets by their participant role * * @return bool True on success, False on failure * @throws \Exception if session does not exist */ public function signal(string $name, array $data = [], $target = null): bool { if (!$this->session_id) { throw new \Exception("The room session does not exist"); } $post = [ 'roomId' => $this->session_id, 'type' => $name, 'role' => $target, 'data' => $data, ]; $response = $this->client()->request('POST', 'signal', ['json' => $post]); return $response->getStatusCode() == 200; } } diff --git a/src/resources/js/meet/client.js b/src/resources/js/meet/client.js index ec2ae0fa..76fece7f 100644 --- a/src/resources/js/meet/client.js +++ b/src/resources/js/meet/client.js @@ -1,1002 +1,1019 @@ 'use strict' import { Device, parseScalabilityMode } from 'mediasoup-client' import Config from './config.js' import { Media } from './media.js' import { Roles } from './constants.js' import { Socket } from './socket.js' function Client() { let eventHandlers = {} let camProducer let micProducer let screenProducer let consumers = {} let socket let sendTransportInfo let sendTransport let recvTransport let iceServers = [] let nickname = '' let channel = null let channels = [] let peers = {} let joinProps = {} let videoSource let audioSource const VIDEO_CONSTRAINTS = { 'low': { width: { ideal: 320 } }, 'medium': { width: { ideal: 640 } }, 'high': { width: { ideal: 1280 } }, 'veryhigh': { width: { ideal: 1920 } }, 'ultra': { width: { ideal: 3840 } } } // Create a device (use browser auto-detection) const device = new Device() // A helper for basic browser media operations const media = new Media() this.media = media navigator.mediaDevices.addEventListener('devicechange', () => { trigger('deviceChange') }) /** * Start a session (join a room) */ this.joinSession = (token, props) => { // Store the join properties for later joinProps = props // Initialize the socket, 'roomReady' request handler will do the rest of the job socket = initSocket(token) } /** * Close the session (disconnect) */ this.closeSession = async (reason) => { // If room owner, send the request to close the room if (reason === true && peers.self && peers.self.role & Roles.OWNER) { await socket.sendRequest('moderator:closeRoom') } trigger('closeSession', { reason: reason || 'disconnected' }) if (socket) { socket.close() } media.setupStop() // Close mediasoup Transports. if (sendTransport) { sendTransport.close() sendTransport = null } if (recvTransport) { recvTransport.close() recvTransport = null } // Remove peers' video elements Object.values(peers).forEach(peer => { if (peer.videoElement) { $(peer.videoElement).remove() } }) // Reset state eventHandlers = {} camProducer = null micProducer = null screenProducer = null consumers = {} peers = {} channels = [] } + /** + * Returns True if user already joined the room session + */ this.isJoined = () => { return 'self' in peers } + /** + * Accept the join request + */ + this.joinRequestAccept = (requestId) => { + socket.sendRequest('moderator:joinRequestAccept', { requestId }) + } + + /** + * Deny the join request + */ + this.joinRequestDeny = (requestId) => { + socket.sendRequest('moderator:joinRequestDeny', { requestId }) + } + /** * Disable the current user camera */ this.camMute = async () => { if (camProducer) { camProducer.pause() await socket.sendRequest('pauseProducer', { producerId: camProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.camStatus() } /** * Enable the current user camera */ this.camUnmute = async () => { if (camProducer) { camProducer.resume() await socket.sendRequest('resumeProducer', { producerId: camProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.camStatus() } /** * Get the current user camera status */ this.camStatus = () => { return !!(camProducer && !camProducer.paused && !camProducer.closed) } /** * Mute the current user microphone */ this.micMute = async () => { if (micProducer) { micProducer.pause() await socket.sendRequest('pauseProducer', { producerId: micProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.micStatus() } /** * Unmute the current user microphone */ this.micUnmute = async () => { if (micProducer) { micProducer.resume() await socket.sendRequest('resumeProducer', { producerId: micProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.micStatus() } /** * Get the current user microphone status */ this.micStatus = () => { return !!(micProducer && !micProducer.paused && !micProducer.closed) } /** * Kick a user out of the room */ this.kickPeer = (peerId) => { socket.sendRequest('moderator:kickPeer', { peerId }) } /** * Send a chat message to the server */ this.chatMessage = (message) => { socket.sendRequest('chatMessage', { message }) } /** * Mute microphone of another user */ this.peerMicMute = (peerId) => { Object.values(consumers).forEach(consumer => { if (consumer.peerId == peerId && consumer.kind == 'audio') { consumer.consumerPaused = true if (!consumer.paused) { consumer.pause() socket.sendRequest('pauseConsumer', { consumerId: consumer.id }) } } }) } /** * Unmute microphone of another user */ this.peerMicUnmute = (peerId) => { Object.values(consumers).forEach(consumer => { if (consumer.peerId == peerId && consumer.kind == 'audio') { consumer.consumerPaused = false if (consumer.paused && !consumer.producerPaused && !consumer.channelPaused) { consumer.resume() socket.sendRequest('resumeConsumer', { consumerId: consumer.id }) } } }) } /** * Set 'raisedHand' state of the current user */ this.raiseHand = async (status) => { if (peers.self.raisedHand != status) { peers.self.raisedHand = status await socket.sendRequest('raisedHand', { raisedHand: status }) trigger('updatePeer', peers.self, ['raisedHand']) } return status } /** * Set nickname of the current user */ this.setNickname = (nickname) => { if (peers.self.nickname != nickname) { peers.self.nickname = nickname socket.sendRequest('changeNickname', { nickname }) trigger('updatePeer', peers.self, ['nickname']) } } /** * Set language channel for the current user */ this.setLanguageChannel = (language) => { channel = language updateChannels(true) } /** * Set language for the current user (make him an interpreter) */ this.setLanguage = (peerId, language) => { socket.sendRequest('moderator:changeLanguage', { peerId, language }) } /** * Add a role to a user */ this.addRole = (peerId, role) => { socket.sendRequest('moderator:addRole', { peerId, role }) } /** * Remove a role from a user */ this.removeRole = (peerId, role) => { socket.sendRequest('moderator:removeRole', { peerId, role }) } /** * Register event handlers */ this.on = (eventName, callback) => { eventHandlers[eventName] = callback } /** * Execute an event handler */ const trigger = (...args) => { const eventName = args.shift() console.log(eventName, args) if (eventName in eventHandlers) { eventHandlers[eventName].apply(null, args) } } /** * Initialize websocket connection, register event handlers */ const initSocket = (token) => { // Connect to websocket socket = new Socket(token) socket.on('disconnect', reason => { // this.closeSession() }) socket.on('reconnectFailed', () => { // this.closeSession() }) socket.on('request', async (request, cb) => { switch (request.method) { case 'newConsumer': const { peerId, producerId, id, kind, rtpParameters, type, appData, producerPaused } = request.data const consumer = await recvTransport.consume({ id, producerId, kind, rtpParameters }) consumer.peerId = peerId consumer.on('transportclose', () => { // TODO: What actually else needs to be done here? delete consumers[consumer.id] }) consumers[consumer.id] = consumer // We are ready. Answer the request so the server will // resume this Consumer (which was paused for now). cb(null) if (producerPaused) { consumer.producerPaused = true consumer.pause() } let peer = peers[peerId] if (!peer) { return } addPeerTrack(peer, consumer.track) trigger('updatePeer', peer) updateChannels() break default: console.error('Unknow request method: ' + request.method) } }) socket.on('notification', (notification) => { switch (notification.method) { case 'roomReady': iceServers = notification.data.iceServers joinRoom() return case 'newPeer': peers[notification.data.id] = notification.data trigger('addPeer', notification.data) updateChannels() return case 'peerClosed': const { peerId } = notification.data delete peers[peerId] trigger('removePeer', peerId) updateChannels() return case 'consumerClosed': { const { consumerId } = notification.data const consumer = consumers[consumerId] if (!consumer) { return } consumer.close() delete consumers[consumerId] let peer = peers[consumer.peerId] if (peer) { // TODO: Update peer state, remove track trigger('updatePeer', peer) } return } case 'consumerPaused': { const { consumerId } = notification.data const consumer = consumers[consumerId] if (!consumer) { return } consumer.producerPaused = true if (!consumer.paused) { consumer.pause() socket.sendRequest('pauseConsumer', { consumerId: consumer.id }) } let peer = peers[consumer.peerId] if (peer) { trigger('updatePeer', updatePeerState(peer)) } return } case 'consumerResumed': { const { consumerId } = notification.data const consumer = consumers[consumerId] if (!consumer) { return } consumer.producerPaused = false if (consumer.paused && !consumer.consumerPaused && !consumer.channelPaused) { consumer.resume() socket.sendRequest('resumeConsumer', { consumerId: consumer.id }) } let peer = peers[consumer.peerId] if (peer) { trigger('updatePeer', updatePeerState(peer)) } return } case 'changeLanguage': updatePeerProperty(notification.data, 'language') updateChannels() return case 'changeNickname': updatePeerProperty(notification.data, 'nickname') return case 'changeRole': { const { peerId, role } = notification.data const peer = peers.self.id === peerId ? peers.self : peers[peerId] if (!peer) { return } let changes = ['role'] const rolePublisher = role & Roles.PUBLISHER const roleModerator = role & Roles.MODERATOR const isPublisher = peer.role & Roles.PUBLISHER const isModerator = peer.role & Roles.MODERATOR if (isPublisher && !rolePublisher) { // demoted to a subscriber changes.push('publisherRole') if (peer.isSelf) { // stop publishing any streams this.setMic('', true) this.setCamera('', true) } else { // remove the video element peer.videoElement = null // TODO: Do we need to remove/stop consumers? } } else if (!isPublisher && rolePublisher) { // promoted to a publisher changes.push('publisherRole') // create a video element with no tracks setPeerTracks(peer, []) } if ((!isModerator && roleModerator) || (isModerator && !roleModerator)) { changes.push('moderatorRole') } peer.role = role trigger('updatePeer', peer, changes) return } case 'chatMessage': notification.data.isSelf = notification.data.peerId == peers.self.id trigger('chatMessage', notification.data) return case 'moderator:closeRoom': this.closeSession('session-closed') return case 'moderator:kickPeer': this.closeSession('session-closed') return case 'raisedHand': updatePeerProperty(notification.data, 'raisedHand') return case 'signal:joinRequest': trigger('joinRequest', notification.data) return default: console.error('Unknow notification method: ' + notification.method) } }) return socket } /** * Join the session (room) */ const joinRoom = async () => { const routerRtpCapabilities = await socket.getRtpCapabilities() routerRtpCapabilities.headerExtensions = routerRtpCapabilities.headerExtensions .filter(ext => ext.uri !== 'urn:3gpp:video-orientation') await device.load({ routerRtpCapabilities }) // Setup the consuming transport (for handling streams of other participants) await setRecvTransport() // Send the "join" request, get room data, participants, etc. const { peers: existing, role, id: peerId } = await socket.sendRequest('join', { nickname: joinProps.nickname, rtpCapabilities: device.rtpCapabilities }) trigger('joinSuccess') let peer = { id: peerId, role, nickname: joinProps.nickname, audioActive: false, videoActive: false, isSelf: true } // Add self to the list peers.self = peer // Start publishing webcam and mic (and setup the producing transport) await this.setCamera(joinProps.videoSource, true) await this.setMic(joinProps.audioSource, true) updatePeerState(peer) trigger('addPeer', peer) // Trigger addPeer event for all peers already in the room, maintain peers list existing.forEach(peer => { let tracks = [] // We receive newConsumer requests before we add the peer to peers list, // therefore we look here for any consumers that belong to this peer and update // the peer. If we do not do this we have to wait about 20 seconds for repeated // newConsumer requests Object.keys(consumers).forEach(cid => { if (consumers[cid].peerId === peer.id) { tracks.push(consumers[cid].track) } }) if (tracks.length) { setPeerTracks(peer, tracks) } peers[peer.id] = peer trigger('addPeer', peer) }) updateChannels() } /** * Set the camera device for the current user */ this.setCamera = async (deviceId, noUpdate) => { // Actually selected device, do nothing if (deviceId == videoSource) { return } // Remove current device, stop producer if (camProducer && !camProducer.closed) { camProducer.close() await socket.sendRequest('closeProducer', { producerId: camProducer.id }) setPeerTracks(peers.self, []) } peers.self.videoSource = videoSource = deviceId if (!deviceId) { if (!noUpdate) { trigger('updatePeer', updatePeerState(peers.self), ['videoSource']) } return } if (!device.canProduce('video')) { throw new Error('cannot produce video') } const { aspectRatio, frameRate, resolution } = Config.videoOptions const track = await media.getTrack({ video: { deviceId: { ideal: deviceId }, ...VIDEO_CONSTRAINTS[resolution], frameRate } }) await setSendTransport() // TODO: Simulcast support? camProducer = await sendTransport.produce({ track, appData: { source : 'webcam' } }) /* camProducer.on('transportclose', () => { camProducer = null }) camProducer.on('trackended', () => { // disableWebcam() }) */ // Create/Update the video element addPeerTrack(peers.self, track) if (!noUpdate) { trigger('updatePeer', peers.self, ['videoSource']) } } /** * Set the microphone device for the current user */ this.setMic = async (deviceId, noUpdate) => { // Actually selected device, do nothing if (deviceId == audioSource) { return } // Remove current device, stop producer if (micProducer && !micProducer.closed) { micProducer.close() await socket.sendRequest('closeProducer', { producerId: micProducer.id }) } peers.self.audioSource = audioSource = deviceId if (!deviceId) { if (!noUpdate) { trigger('updatePeer', updatePeerState(peers.self), ['audioSource']) } return } if (!device.canProduce('audio')) { throw new Error('cannot produce audio') } const { autoGainControl, echoCancellation, noiseSuppression, sampleRate, channelCount, volume, sampleSize, opusStereo, opusDtx, opusFec, opusPtime, opusMaxPlaybackRate } = Config.audioOptions const track = await media.getTrack({ audio: { sampleRate, channelCount, volume, autoGainControl, echoCancellation, noiseSuppression, sampleSize, deviceId: { ideal: deviceId } } }) await setSendTransport() micProducer = await sendTransport.produce({ track, codecOptions: { opusStereo, opusDtx, opusFec, opusPtime, opusMaxPlaybackRate }, appData: { source : 'mic' } }) /* micProducer.on('transportclose', () => { micProducer = null }) micProducer.on('trackended', () => { // disableMic() }) */ // Note: We're not adding this track to the video element if (!noUpdate) { trigger('updatePeer', updatePeerState(peers.self), ['audioSource']) } } /** * Set the media stream tracks for a video element of a peer */ const setPeerTracks = (peer, tracks) => { if (!peer.videoElement) { peer.videoElement = media.createVideoElement(tracks, { mirror: peer.isSelf }) } else { const stream = new MediaStream() tracks.forEach(track => stream.addTrack(track)) peer.videoElement.srcObject = stream } updatePeerState(peer) } /** * Add a media stream track to a video element of a peer */ const addPeerTrack = (peer, track) => { if (!peer.videoElement) { setPeerTracks(peer, [ track ]) return } const stream = peer.videoElement.srcObject if (track.kind == 'video') { media.removeTracksFromStream(stream, 'Video') } else { media.removeTracksFromStream(stream, 'Audio') } stream.addTrack(track) updatePeerState(peer) } /** * Update peer state */ const updatePeerState = (peer) => { if (peer.isSelf) { peer.videoActive = this.camStatus() peer.audioActive = this.micStatus() } else { peer.videoActive = false peer.audioActive = false Object.keys(consumers).forEach(cid => { const consumer = consumers[cid] if (consumer.peerId == peer.id) { peer[consumer.kind + 'Active'] = !consumer.closed && !consumer.producerPaused && !consumer.channelPaused } }) } return peer } /** * Configure transport for producer (publisher) streams */ const setSendTransport = async () => { if (sendTransport && !sendTransport.closed) { return } if (!sendTransportInfo) { sendTransportInfo = await socket.sendRequest('createWebRtcTransport', { forceTcp: false, producing: true, consuming: false }) } const { id, iceParameters, iceCandidates, dtlsParameters } = sendTransportInfo const iceTransportPolicy = (device.handlerName.toLowerCase().includes('firefox') && iceServers) ? 'relay' : undefined sendTransport = device.createSendTransport({ id, iceParameters, iceCandidates, dtlsParameters, iceServers, iceTransportPolicy, proprietaryConstraints: { optional: [{ googDscp: true }] } }) sendTransport.on('connect', ({ dtlsParameters }, callback, errback) => { socket.sendRequest('connectWebRtcTransport', { transportId: sendTransport.id, dtlsParameters }) .then(callback) .catch(errback) }) sendTransport.on('produce', async ({ kind, rtpParameters, appData }, callback, errback) => { try { const { id } = await socket.sendRequest('produce', { transportId: sendTransport.id, kind, rtpParameters, appData }) callback({ id }) } catch (error) { errback(error) } }) } /** * Configure transport for consumer streams */ const setRecvTransport = async () => { const transportInfo = await socket.sendRequest('createWebRtcTransport', { forceTcp: false, producing: false, consuming: true }) const { id, iceParameters, iceCandidates, dtlsParameters } = transportInfo const iceTransportPolicy = (device.handlerName.toLowerCase().includes('firefox') && iceServers) ? 'relay' : undefined recvTransport = device.createRecvTransport({ id, iceParameters, iceCandidates, dtlsParameters, iceServers, iceTransportPolicy }) recvTransport.on('connect', ({ dtlsParameters }, callback, errback) => { socket.sendRequest('connectWebRtcTransport', { transportId: recvTransport.id, dtlsParameters }) .then(callback) .catch(errback) }) } /** * A helper for a peer property update (received via websocket) */ const updatePeerProperty = (data, prop) => { const peerId = data.peerId const peer = peers.self.id === peerId ? peers.self : peers[peerId] if (!peer) { return } peer[prop] = data[prop] trigger('updatePeer', peer, [ prop ]) } /** * Update list of existing language interpretation channels and update * audio state of all participants according to the selected channel. */ const updateChannels = (update) => { let list = [] Object.values(peers).forEach(peer => { if (peer.language && !list.includes(peer.language)) { list.push(peer.language) } }) update = update || channels.join() != list.join() channels = list // The channel user was using has been removed (or the participant stopped being an interpreter) if (channel && !channels.includes(channel)) { channel = null update = true } // Mute/unmute all peers depending on the selected channel Object.values(consumers).forEach(consumer => { if (consumer.kind == 'audio' && !consumer.closed) { let peer = peers[consumer.peerId] // It can happen because consumers are being removed after the peer if (!peer) { return } // When a channel is selected we mute everyone except the interpreter of the language. // When a channel is not selected we mute language interpreters only consumer.channelPaused = channel && peer.language != channel if (consumer.channelPaused && !consumer.paused) { consumer.pause() socket.sendRequest('pauseConsumer', { consumerId: consumer.id }) } else if (!consumer.channelPaused && consumer.paused && !consumer.consumerPaused && !consumer.producerPaused ) { consumer.resume() socket.sendRequest('resumeConsumer', { consumerId: consumer.id }) } const state = !consumer.producerPaused && !consumer.channelPaused if (peer.audioActive != state) { peer.audioActive = state trigger('updatePeer', peer) } } }) if (update) { trigger('updateSession', sessionData()) } } /** * Returns all relevant information about the current session/user state */ const sessionData = () => { return { channel, channels, audioActive: peers.self.audioActive, videoActive: peers.self.videoActive, audioSource: peers.self.audioSource, videoSource: peers.self.videoSource, screenActive: peers.self.screenActive, raisedHand: peers.self.raisedHand } } } export { Client } diff --git a/src/resources/js/meet/room.js b/src/resources/js/meet/room.js index bd1a0d90..36131167 100644 --- a/src/resources/js/meet/room.js +++ b/src/resources/js/meet/room.js @@ -1,1019 +1,1066 @@ 'use strict' import anchorme from 'anchorme' import { Client } from './client.js' import { Roles } from './constants.js' import { Dropdown } from 'bootstrap' import { library } from '@fortawesome/fontawesome-svg-core' function Room(container) { let sessionData // Room session metadata let peers = {} // Participants in the session (including self) let publishersContainer // Container element for publishers let subscribersContainer // Container element for subscribers let selfId // peer Id of the current user let chatCount = 0 let scrollStop let $t + let $toast const client = new Client() // Disconnect participant when browser's window close window.addEventListener('beforeunload', () => { leaveRoom() }) window.addEventListener('resize', resize) // Public methods this.isScreenSharingSupported = isScreenSharingSupported this.joinRoom = joinRoom this.leaveRoom = leaveRoom this.raiseHand = raiseHand this.setupStart = setupStart this.setupStop = setupStop this.setupSetAudioDevice = setupSetAudioDevice this.setupSetVideoDevice = setupSetVideoDevice this.switchAudio = switchAudio this.switchChannel = switchChannel this.switchScreen = switchScreen this.switchVideo = switchVideo /** * Join the room session * * @param data Session metadata and event handlers: * token - A token for the main connection, * nickname - Participant name, * languages - Supported languages (code-to-label map) * chatElement - DOM element for the chat widget, * counterElement - DOM element for the participants counter, * menuElement - DOM element of the room toolbar, * queueElement - DOM element for the Q&A queue (users with a raised hand) * onSuccess - Callback for session connection (join) success * onError - Callback for session connection (join) error * onDestroy - Callback for session disconnection event, - * onJoinRequest - Callback for join request, * onMediaSetup - Called when user clicks the Media setup button * onUpdate - Callback for current user/session update, + * toast - Toast widget * translate - Translation function */ function joinRoom(data) { // Create a container for subscribers and publishers publishersContainer = $('
').appendTo(container).get(0) subscribersContainer = $('
').appendTo(container).get(0) resize() $t = data.translate + $toast = data.toast // Make sure all supported callbacks exist, so we don't have to check // their existence everywhere anymore - let events = ['Success', 'Error', 'Destroy', 'JoinRequest', 'Update', 'MediaSetup'] + let events = ['Success', 'Error', 'Destroy', 'Update', 'MediaSetup'] events.map(event => 'on' + event).forEach(event => { if (!data[event]) { data[event] = () => {} } }) sessionData = data // Handle new participants (including self) client.on('addPeer', (event) => { event.element = participantCreate(event) peers[event.id] = event if (event.isSelf) { selfId = event.id } }) // Handle removed participants client.on('removePeer', (peerId) => { let peer = peers[peerId] if (peer) { // Remove elements related to the participant peerHandDown(peer) $(peer.element).remove() delete peers[peerId] } resize() }) // Participant properties changed e.g. audio/video muted/unmuted client.on('updatePeer', (event, changed) => { let peer = peers[event.id] if (!peer) { return } event.element = peer.element if (event.videoElement && event.videoElement.parentNode != event.element) { $(event.element).prepend(event.videoElement) } else if (!event.videoElement) { $(event.element).find('video').remove() } if (changed && changed.length) { if (changed && changed.includes('nickname')) { nicknameUpdate(event.nickname, event.id) } if (changed.includes('raisedHand')) { if (event.raisedHand) { peerHandUp(event) } else { peerHandDown(event) } } } event.element = participantUpdate(event.element, event) // It's me, got publisher role if (peer.isSelf && (event.role & Roles.PUBLISHER) && changed && changed.includes('publisherRole')) { // Open the media setup dialog sessionData.onMediaSetup() } peers[event.id] = event if (changed && changed.includes('moderatorRole')) { updateParticipantAll() } }) // Handle successful connection to the room client.on('joinSuccess', () => { data.onSuccess() client.media.setupStop() }) // Handle join requests from other users (knocking to the room) client.on('joinRequest', event => { - data.onJoinRequest(event) + joinRequest(event) }) // Handle session disconnection events client.on('closeSession', event => { // Notify the UI data.onDestroy(event) // Remove all participant elements Object.keys(peers).forEach(peerId => { $(peers[peerId].element).remove() }) peers = {} // refresh the matrix resize() }) // Handle session update events (e.g. channel, channels list changes) client.on('updateSession', event => { // Inform the vue component, so it can update some UI controls sessionData.onUpdate(event) }) const { audioSource, videoSource } = client.media.setupData() // Start the session client.joinSession(data.token, { videoSource, audioSource, nickname: data.nickname }) // Prepare the chat initChat() } /** * Leave the room (disconnect) */ function leaveRoom(forced) { client.closeSession(forced) peers = {} } + /** + * Handler for an event received by the moderator when a participant + * is asking for a permission to join the room + */ + function joinRequest(data) { + const id = data.requestId + + // The toast for this user request already exists, ignore + // It's not really needed as we do this on server-side already + if ($('#i' + id).length) { + return + } + + const body = $( + `
` + + `
` + + `
` + + `

` + + `
` + + `` + + `` + ) + + $toast.message({ + className: 'join-request', + icon: 'user', + timeout: 0, + title: $t('meet.join-request'), + // titleClassName: '', + body: body.html(), + onShow: element => { + $(element).find('p').text($t('meet.join-requested', { user: data.nickname || '' })) + + // add id attribute, so we can identify it + $(element).attr('id', 'i' + id) + // add action to the buttons + .find('button.accept,button.deny').on('click', e => { + const action = $(e.target).is('.accept') ? 'Accept' : 'Deny' + client['joinRequest' + action](id) + $('#i' + id).remove() + }) + } + }) + } + /** * Raise or lower the hand * * @param status Hand raised or not */ async function raiseHand(status) { return await client.raiseHand(status) } /** * Sets the audio and video devices for the session. * This will ask user for permission to access media devices. * * @param props Setup properties (videoElement, volumeElement, onSuccess, onError) */ function setupStart(props) { client.media.setupStart(props) // When setting up devices while the session is ongoing we have to // disable currently selected devices (temporarily) otherwise e.g. // changing a mic or camera to another device will not be possible. if (client.isJoined()) { client.setMic('') client.setCamera('') } } /** * Stop the setup "process", cleanup after it. */ async function setupStop() { client.media.setupStop() // Apply device changes to the client const { audioSource, videoSource } = client.media.setupData() await client.setMic(audioSource) await client.setCamera(videoSource) } /** * Change the publisher audio device * * @param deviceId Device identifier string */ async function setupSetAudioDevice(deviceId) { return await client.media.setupSetAudio(deviceId) } /** * Change the publisher video device * * @param deviceId Device identifier string */ async function setupSetVideoDevice(deviceId) { return await client.media.setupSetVideo(deviceId) } /** * Setup the chat UI */ function initChat() { // Handle arriving chat messages client.on('chatMessage', pushChatMessage) // The UI elements are created in the vue template // Here we add a logic for how they work const chat = $(sessionData.chatElement).find('.chat').get(0) const textarea = $(sessionData.chatElement).find('textarea') const button = $(sessionData.menuElement).find('.link-chat') textarea.on('keydown', e => { if (e.keyCode == 13 && !e.shiftKey) { if (textarea.val().length) { client.chatMessage(textarea.val()) textarea.val('') } return false } }) // Add an element for the count of unread messages on the chat button button.append('') .on('click', () => { button.find('.badge').text('') chatCount = 0 // When opening the chat scroll it to the bottom, or we shouldn't? scrollStop = false chat.scrollTop = chat.scrollHeight }) $(chat).on('scroll', event => { // Detect manual scrollbar moves, disable auto-scrolling until // the scrollbar is positioned on the element bottom again scrollStop = chat.scrollTop + chat.offsetHeight < chat.scrollHeight }) } /** * Add a message to the chat * * @param data Object with a message, nickname, id (of the connection, empty for self) */ function pushChatMessage(data) { let message = $('').text(data.message).text() // make the message secure // Format the message, convert emails and urls to links message = anchorme({ input: message, options: { attributes: { target: "_blank" }, // any link above 20 characters will be truncated // to 20 characters and ellipses at the end truncate: 20, // characters will be taken out of the middle middleTruncation: true } // TODO: anchorme is extensible, we could support // github/phabricator's markup e.g. backticks for code samples }) message = message.replace(/\r?\n/, '
') // Display the message let chat = $(sessionData.chatElement).find('.chat') let box = chat.find('.message').last() message = $('
').html(message) message.find('a').attr('rel', 'noreferrer') if (box.length && box.data('id') == data.peerId) { // A message from the same user as the last message, no new box needed message.appendTo(box) } else { box = $('
').data('id', data.peerId) .append($('
').text(data.nickname || '')) .append(message) .appendTo(chat) if (data.isSelf) { box.addClass('self') } } // Count unread messages if (!$(sessionData.chatElement).is('.open')) { if (!data.isSelf) { chatCount++ } } else { chatCount = 0 } $(sessionData.menuElement).find('.link-chat .badge').text(chatCount ? chatCount : '') // Scroll the chat element to the end if (!scrollStop) { chat.get(0).scrollTop = chat.get(0).scrollHeight } } /** * Switch interpreted language channel * * @param channel Two-letter language code */ function switchChannel(channel) { client.setLanguageChannel(channel) } /** * Mute/Unmute audio for current session publisher */ async function switchAudio() { const isActive = client.micStatus() if (isActive) { return await client.micMute() } else { return await client.micUnmute() } } /** * Mute/Unmute video for current session publisher */ async function switchVideo() { const isActive = client.camStatus() if (isActive) { return await client.camMute() } else { return await client.camUnmute() } } /** * Switch on/off screen sharing */ function switchScreen(callback) { // TODO } /** * Detect if screen sharing is supported by the browser */ function isScreenSharingSupported() { return false // TODO !!(navigator.mediaDevices && navigator.mediaDevices.getDisplayMedia) } /** * Handler for Hand-Up "signal" */ function peerHandUp(peer) { let element = $(nicknameWidget(peer)) participantUpdate(element, peer) element.attr('id', 'qa' + peer.id) .appendTo($(sessionData.queueElement).show()) setTimeout(() => element.addClass('widdle'), 50) } /** * Handler for Hand-Down "signal" */ function peerHandDown(peer) { let list = $(sessionData.queueElement) list.find('#qa' + peer.id).remove() if (!list.find('.meet-nickname').length) { list.hide() } } /** * Update participant nickname in the UI * * @param nickname Nickname * @param peerId Connection identifier of the user */ function nicknameUpdate(nickname, peerId) { if (peerId) { $(sessionData.chatElement).find('.chat').find('.message').each(function() { let elem = $(this) if (elem.data('id') == peerId) { elem.find('.nickname').text(nickname || '') } }) $(sessionData.queueElement).find('#qa' + peerId + ' .content').text(nickname || '') } } /** * Create a participant element in the matrix. Depending on the peer role * parameter it will be a video element wrapper inside the matrix or a simple * tag-like element on the subscribers list. * * @param params Peer metadata/params * @param content Optional content to prepend to the element * * @return The element */ function participantCreate(params, content) { let element if ((!params.language && params.role & Roles.PUBLISHER) || params.role & Roles.SCREEN) { // publishers and shared screens element = publisherCreate(params, content) if (params.videoElement) { $(element).prepend(params.videoElement) } } else { // subscribers and language interpreters element = subscriberCreate(params, content) } setTimeout(resize, 50) return element } /** * Create a