diff --git a/meet/server/lib/Peer.js b/meet/server/lib/Peer.js index f3e595fa..aa498767 100644 --- a/meet/server/lib/Peer.js +++ b/meet/server/lib/Peer.js @@ -1,232 +1,237 @@ const EventEmitter = require('events').EventEmitter; const Logger = require('./Logger'); const Roles = require('./userRoles'); const logger = new Logger('Peer'); class Peer extends EventEmitter { constructor({ id, roomId }) { logger.info('constructor() [id:"%s"]', id); super(); this._id = id; this._roomId = roomId; this._socket = null; this._closed = false; this._role = 0; this._nickname = false; + this._language = null; + this._routerId = null; this._rtpCapabilities = null; this._raisedHand = false; this._transports = new Map(); this._producers = new Map(); this._consumers = new Map(); } close() { logger.info('close()'); this._closed = true; // Iterate and close all mediasoup Transport associated to this Peer, so all // its Producers and Consumers will also be closed. for (const transport of this.transports.values()) { transport.close(); } if (this.socket) this.socket.disconnect(true); this.emit('close'); } get id() { return this._id; } set id(id) { this._id = id; } get roomId() { return this._roomId; } set roomId(roomId) { this._roomId = roomId; } get socket() { return this._socket; } set socket(socket) { this._socket = socket; if (this.socket) { this.socket.on('disconnect', () => { if (this.closed) return; logger.debug('"disconnect" event [id:%s]', this.id); this.close(); }); } } get closed() { return this._closed; } get role() { return this._role; } get nickname() { return this._nickname; } + get language() { + return this._language; + } + set nickname(nickname) { if (nickname !== this._nickname) { this._nickname = nickname; this.emit('nicknameChanged', {}); } } + set language(language) { + if (language != this._language) { + this._language = language; + + this.emit('languageChanged', {}); + } + } + get routerId() { return this._routerId; } set routerId(routerId) { this._routerId = routerId; } get rtpCapabilities() { return this._rtpCapabilities; } set rtpCapabilities(rtpCapabilities) { this._rtpCapabilities = rtpCapabilities; } get raisedHand() { return this._raisedHand; } set raisedHand(raisedHand) { this._raisedHand = raisedHand; } get transports() { return this._transports; } get producers() { return this._producers; } get consumers() { return this._consumers; } setRole(newRole) { if (this._role != newRole) { - // It is either screen sharing or publisher/subscriber - if (newRole & Roles.SCREEN) { - if (newRole & Roles.PUBLISHER) { - newRole ^= Roles.PUBLISHER; - } - if (newRole & Roles.SUBSCRIBER) { - newRole ^= Roles.SUBSCRIBER; - } - } - this._role = newRole; - this.emit('gotRole', { newRole }); + this.emit('roleChanged', {}); } } isValidRole(newRole) { Object.keys(Roles).forEach(roleId => { const role = Roles[roleId] if (newRole & role) { newRole = newRole ^ role; } }) return newRole == 0; } hasRole(role) { return !!(this._role & role); } addTransport(id, transport) { this.transports.set(id, transport); } getTransport(id) { return this.transports.get(id); } getConsumerTransport() { return Array.from(this.transports.values()) .find((t) => t.appData.consuming); } removeTransport(id) { this.transports.delete(id); } addProducer(id, producer) { this.producers.set(id, producer); } getProducer(id) { return this.producers.get(id); } removeProducer(id) { this.producers.delete(id); } addConsumer(id, consumer) { this.consumers.set(id, consumer); } getConsumer(id) { return this.consumers.get(id); } removeConsumer(id) { this.consumers.delete(id); } get peerInfo() { const peerInfo = { id: this.id, + language: this.language, nickname: this.nickname, role: this.role, raisedHand: this.raisedHand }; return peerInfo; } } module.exports = Peer; diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index d6a66946..7368b030 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1157 +1,1179 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); const crypto = require('crypto'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const Roles = require('./userRoles'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter { static calculateLoads(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); // Calculate router loads by adding up peers per router, and collected piped routers for (const peer of peers) { const routerId = peer.routerId; if (routerId) { if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); } else { routerLoads.set(routerId, 1); } } } // Calculate worker loads by adding up router loads per worker for (const worker of mediasoupWorkers) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } return {routerLoads, workerLoads, pipedRoutersIds}; } /* * Find a router that is on a worker that is least loaded. * * A worker with a router that we are already piping to is preferred. */ static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const {workerLoads, pipedRoutersIds} = Room.calculateLoads(mediasoupWorkers, peers.values(), mediasoupRouters); const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } else { // find if there is a piped router that is on a worker that is below limit for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ static async create({ mediasoupWorkers, roomId, peers }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); this._peers = peers; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; this._audioLevelObserver = audioLevelObserver; } dumpStats() { const peers = this.getPeers(); const {routerLoads, workerLoads, pipedRoutersIds} = Room.calculateLoads(this._mediasoupWorkers, peers, this._mediasoupRouters); let stats = { numberOfWorkers: this._mediasoupWorkers.length, numberOfRouters: this._mediasoupRouters.size, numberOfPeers: peers.length, routerLoads: routerLoads, workerLoads: workerLoads, pipedRoutersIds: pipedRoutersIds, }; console.log(stats); } close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; // Close the peers. for (const peer of this._peers.values()) { if (!peer.closed) peer.close(); } this._peers = {}; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._mediasoupWorkers = null; this._mediasoupRouters.clear(); this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } handlePeer({ peer }) { logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role); // Should not happen if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } this._peerJoining(peer); } logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } get id() { return this._roomId; } selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } checkEmpty() { return Object.keys(this._peers).length === 0; } _getTURNCredentials(name, secret) { const unixTimeStamp = parseInt(Date.now()/1000) + 24*3600; // this credential would be valid for the next 24 hours // If there is no name, the timestamp alone can also be used. const username = name ? `${unixTimeStamp}:${name}` : `${unixTimeStamp}`; const hmac = crypto.createHmac('sha1', secret); hmac.setEncoding('base64'); hmac.write(username); hmac.end(); const password = hmac.read(); return { username, password }; } _peerJoining(peer) { this._queue.push(async () => { peer.socket.join(this._roomId); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); let iceServers = [] if ('turn' in config) { // Generate time-limited credentials. The name is only relevant for the logs. const {username, password} = this._getTURNCredentials(peer.id, config.turn.staticSecret); iceServers = [ { urls : config.turn.urls, username : username, credential : password } ]; } this._notification(peer.socket, 'roomReady', { iceServers }); }) .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); peer.on('close', () => { this._handlePeerClose(peer); }); peer.on('nicknameChanged', () => { // Spread to others - this._notification(peer.socket, 'changeNickname', { - peerId: peer.id, - nickname: peer.nickname - }, true); + const data = { peerId: peer.id, nickname: peer.nickname }; + this._notification(peer.socket, 'changeNickname', data, true); }); - peer.on('gotRole', ({ newRole }) => { - // Spread to others - this._notification(peer.socket, 'changeRole', { - peerId: peer.id, - role: newRole - }, true, true); + peer.on('languageChanged', () => { + // Spread to others (and self) + const data = { peerId: peer.id, language: peer.language }; + this._notification(peer.socket, 'changeLanguage', data, true, true); + }); + + peer.on('roleChanged', () => { + // Spread to others (and self) + const data = { peerId: peer.id, role: peer.role }; + this._notification(peer.socket, 'changeRole', data, true, true); }); peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); switch (request.method) { case 'getRouterRtpCapabilities': { cb(null, router.rtpCapabilities); break; } case 'dumpStats': { this.dumpStats() cb(null); break; } case 'join': { const { nickname, rtpCapabilities } = request.data; // Store client data into the Peer data object. peer.nickname = nickname; peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. const otherPeers = this.getPeers(peer); const peerInfos = otherPeers.map(otherPeer => otherPeer.peerInfo); cb(null, { id: peer.id, role: peer.role, peers: peerInfos, }); // Create Consumers for existing Producers. for (const otherPeer of otherPeers) { for (const producer of otherPeer.producers.values()) { this._createConsumer({ consumerPeer: peer, producerPeer: otherPeer, producer }); } } // Notify the new Peer to all other Peers. this._notification(peer.socket, 'newPeer', peer.peerInfo, true); logger.debug( 'peer joined [peer: "%s", nickname: "%s"]', peer.id, nickname); break; } +/* case 'createPlainTransport': { const { producing, consuming } = request.data; const transport = await router.createPlainTransport( { //When consuming we manually connect using connectPlainTransport, //otherwise we let the port autodetection work. comedia: producing, // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) rtcpMux: false, listenIp: { ip: "127.0.0.1", announcedIp: null }, appData : { producing, consuming } } ); // await transport.enableTraceEvent([ "probation", "bwe" ]); // transport.on("trace", (trace) => { // console.log(trace); // }); peer.addTransport(transport.id, transport); cb( null, { id : transport.id, ip : transport.tuple.localIp, port : transport.tuple.localPort, rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined }); break; } case 'connectPlainTransport': { const { transportId, ip, port, rtcpPort } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ ip: ip, port: port, rtcpPort: rtcpPort, }); cb(); break; } - +*/ case 'createWebRtcTransport': { // NOTE: Don't require that the Peer is joined here, so the client can // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; webRtcTransportOptions.enableTcp = true; if (forceTcp) webRtcTransportOptions.enableUdp = false; else { webRtcTransportOptions.enableUdp = true; webRtcTransportOptions.preferUdp = true; } const transport = await router.createWebRtcTransport( webRtcTransportOptions ); transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); } }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters }); const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } catch (error) { logger.info("Setting the incoming bitrate failed") } } break; } case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ dtlsParameters }); cb(); break; } /* case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } */ case 'produce': { let { appData } = request.data; if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) throw new Error('invalid producer source'); if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); const { transportId, kind, rtpParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); // Add peerId into appData to later get the associated Peer during // the 'loudest' event of the audioLevelObserver. appData = { ...appData, peerId: peer.id }; const producer = await transport.produce({ kind, rtpParameters, appData }); const pipeRouters = this._getRoutersToPipeTo(peer.routerId); for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, router : destinationRouter }); } } // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); producer.on('videoorientationchange', (videoOrientation) => { logger.debug( 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); // Trace individual packets for debugging // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); // producer.on("trace", (trace) => { // console.log(`Trace on ${producer.id}`, trace); // }); cb(null, { id: producer.id }); // Optimization: Create a server-side Consumer for each Peer. for (const otherPeer of this.getPeers(peer)) { - this._createConsumer( - { - consumerPeer : otherPeer, - producerPeer : peer, + this._createConsumer({ + consumerPeer: otherPeer, + producerPeer: peer, producer - }); + }); } // Add into the audioLevelObserver. if (kind === 'audio') { this._audioLevelObserver.addProducer({ producerId: producer.id }) .catch(() => {}); } break; } case 'closeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); producer.close(); // Remove from its map. peer.removeProducer(producer.id); cb(); break; } case 'pauseProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.pause(); cb(); break; } case 'resumeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.resume(); cb(); break; } case 'pauseConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.pause(); cb(); break; } case 'resumeConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.resume(); cb(); break; } case 'changeNickname': { const { nickname } = request.data; peer.nickname = nickname; // This will be spread through events from the peer object // Return no error cb(); break; } case 'chatMessage': { const { message } = request.data; // Spread to others this._notification(peer.socket, 'chatMessage', { peerId: peer.id, nickname: peer.nickname, message: message }, true, true); // Return no error cb(); break; } case 'moderator:addRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (!rolePeer.hasRole(role)) { // This will propagate the event automatically rolePeer.setRole(rolePeer.role | role); } // Return no error cb(); break; } case 'moderator:removeRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (rolePeer.hasRole(role)) { // This will propagate the event automatically rolePeer.setRole(rolePeer.role ^ role); } // Return no error cb(); break; } + case 'moderator:changeLanguage': + { + if (!peer.hasRole(Roles.MODERATOR)) + throw new Error('peer not authorized'); + + const { language } = request.data; + + if (language && !/^[a-z]{2}$/.test(language)) + throw new Error('invalid language code'); + + peer.language = language; + + // This will be spread through events from the peer object + + // Return no error + cb(); + + break; + } + case 'moderator:closeRoom': { if (!peer.hasRole(Roles.OWNER)) throw new Error('peer not authorized'); this._notification(peer.socket, 'moderator:closeRoom', null, true); cb(); // Close the room this.close(); // TODO: remove the room? break; } case 'moderator:kickPeer': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const kickPeer = this._peers[peerId]; if (!kickPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(kickPeer.socket, 'moderator:kickPeer'); kickPeer.close(); cb(); break; } case 'raisedHand': { const { raisedHand } = request.data; peer.raisedHand = raisedHand; // Spread to others this._notification(peer.socket, 'raisedHand', { peerId: peer.id, raisedHand: raisedHand, }, true); // Return no error cb(); break; } default: { logger.error('unknown request.method "%s"', request.method); cb(500, `unknown request.method "${request.method}"`); } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Trace individual packets for debugging // await consumer.enableTraceEvent([ "rtp", "pli", "fir" ]); // consumer.on("trace", (trace) => { // console.log(`Trace on ${consumer.id}`, trace); // }); // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } /** * Get the list of peers. */ getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } _timeoutCallback(callback) { let called = false; const interval = setTimeout( () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } _sendRequest(socket, method, data = {}) { return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, this._timeoutCallback((err, response) => { if (err) { reject(err); } else { resolve(response); } }) ); }); } async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; for (let tries = 0; tries < requestRetries; tries++) { try { return await this._sendRequest(socket, method, data); } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } _notification(socket, method, data = {}, broadcast = false, includeSender = false) { if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); } else { socket.emit('notification', { method, data }); } } /* * Pipe producers of peers that are running under another router to this router. */ async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); // All peers that have a different router const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); for (const producerId of peer.producers.keys()) { if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._peers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to: // The combined set of routers of all peers, exluding the router of the peer itself. _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/src/app/Http/Controllers/API/V4/OpenViduController.php b/src/app/Http/Controllers/API/V4/OpenViduController.php index 5a6e7df7..317c0d2c 100644 --- a/src/app/Http/Controllers/API/V4/OpenViduController.php +++ b/src/app/Http/Controllers/API/V4/OpenViduController.php @@ -1,518 +1,499 @@ first(); // This isn't a room, bye bye if (!$room) { return $this->errorResponse(404, \trans('meet.room-not-found')); } // Only the moderator can do it if (!$this->isModerator($room)) { return $this->errorResponse(403); } if (!$room->requestAccept($reqid)) { return $this->errorResponse(500, \trans('meet.session-request-accept-error')); } return response()->json(['status' => 'success']); } /** * Deny the room join request. * * @param string $id Room identifier (name) * @param string $reqid Request identifier * * @return \Illuminate\Http\JsonResponse */ public function denyJoinRequest($id, $reqid) { $room = Room::where('name', $id)->first(); // This isn't a room, bye bye if (!$room) { return $this->errorResponse(404, \trans('meet.room-not-found')); } // Only the moderator can do it if (!$this->isModerator($room)) { return $this->errorResponse(403); } if (!$room->requestDeny($reqid)) { return $this->errorResponse(500, \trans('meet.session-request-deny-error')); } return response()->json(['status' => 'success']); } /** * Create a connection for screen sharing. * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse */ public function createConnection($id) { $room = Room::where('name', $id)->first(); // This isn't a room, bye bye if (!$room) { return $this->errorResponse(404, \trans('meet.room-not-found')); } $connection = $this->getConnectionFromRequest(); if ( !$connection || $connection->session_id != $room->session_id || ($connection->role & Room::ROLE_PUBLISHER) == 0 ) { return $this->errorResponse(403); } $response = $room->getSessionToken(Room::ROLE_SCREEN); return response()->json(['status' => 'success', 'token' => $response['token']]); } /** * Listing of rooms that belong to the authenticated user. * * @return \Illuminate\Http\JsonResponse */ public function index() { $user = Auth::guard()->user(); $rooms = Room::where('user_id', $user->id)->orderBy('name')->get(); if (count($rooms) == 0) { // Create a room for the user (with a random and unique name) while (true) { $name = strtolower(\App\Utils::randStr(3, 3, '-')); if (!Room::where('name', $name)->count()) { break; } } $room = Room::create([ 'name' => $name, 'user_id' => $user->id ]); $rooms = collect([$room]); } $result = [ 'list' => $rooms, 'count' => count($rooms), ]; return response()->json($result); } /** * Join the room session. Each room has one owner, and the room isn't open until the owner * joins (and effectively creates the session). * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse */ public function joinRoom($id) { $room = Room::where('name', $id)->first(); // Room does not exist, or the owner is deleted if (!$room || !$room->owner) { return $this->errorResponse(404, \trans('meet.room-not-found')); } // Check if there's still a valid meet entitlement for the room owner if (!$room->owner->hasSku('meet')) { return $this->errorResponse(404, \trans('meet.room-not-found')); } $user = Auth::guard()->user(); $isOwner = $user && $user->id == $room->user_id; $init = !empty(request()->input('init')); // There's no existing session if (!$room->hasSession()) { // Participants can't join the room until the session is created by the owner if (!$isOwner) { return $this->errorResponse(422, \trans('meet.session-not-found'), ['code' => 323]); } // The room owner can create the session on request if (!$init) { return $this->errorResponse(422, \trans('meet.session-not-found'), ['code' => 324]); } $session = $room->createSession(); if (empty($session)) { return $this->errorResponse(500, \trans('meet.session-create-error')); } } $settings = $room->getSettings(['locked', 'nomedia', 'password']); $password = (string) $settings['password']; $config = [ 'locked' => $settings['locked'] === 'true', 'nomedia' => $settings['nomedia'] === 'true', 'password' => $isOwner ? $password : '', 'requires_password' => !$isOwner && strlen($password), ]; $response = ['config' => $config]; // Validate room password if (!$isOwner && strlen($password)) { $request_password = request()->input('password'); if ($request_password !== $password) { return $this->errorResponse(422, \trans('meet.session-password-error'), $response + ['code' => 325]); } } // Handle locked room if (!$isOwner && $config['locked']) { $nickname = request()->input('nickname'); $picture = request()->input('picture'); $requestId = request()->input('requestId'); $request = $requestId ? $room->requestGet($requestId) : null; $error = \trans('meet.session-room-locked-error'); // Request already has been processed (not accepted yet, but it could be denied) if (empty($request['status']) || $request['status'] != Room::REQUEST_ACCEPTED) { if (!$request) { if (empty($nickname) || empty($requestId) || !preg_match('/^[a-z0-9]{8,32}$/i', $requestId)) { return $this->errorResponse(422, $error, $response + ['code' => 326]); } if (empty($picture)) { $svg = file_get_contents(resource_path('images/user.svg')); $picture = 'data:image/svg+xml;base64,' . base64_encode($svg); } elseif (!preg_match('|^data:image/png;base64,[a-zA-Z0-9=+/]+$|', $picture)) { return $this->errorResponse(422, $error, $response + ['code' => 326]); } // TODO: Resize when big/make safe the user picture? $request = ['nickname' => $nickname, 'requestId' => $requestId, 'picture' => $picture]; if (!$room->requestSave($requestId, $request)) { // FIXME: should we use error code 500? return $this->errorResponse(422, $error, $response + ['code' => 326]); } // Send the request (signal) to all moderators $result = $room->signal('joinRequest', $request, Room::ROLE_MODERATOR); } return $this->errorResponse(422, $error, $response + ['code' => 327]); } } // Initialize connection tokens if ($init) { // Choose the connection role $canPublish = !empty(request()->input('canPublish')) && (empty($config['nomedia']) || $isOwner); $role = $canPublish ? Room::ROLE_PUBLISHER : Room::ROLE_SUBSCRIBER; if ($isOwner) { $role |= Room::ROLE_MODERATOR; $role |= Room::ROLE_OWNER; } // Create session token for the current user/connection $response = $room->getSessionToken($role); if (empty($response)) { return $this->errorResponse(500, \trans('meet.session-join-error')); } - // Get up-to-date connections metadata - $response['connections'] = $room->getSessionConnections(); - $response_code = 200; $response['role'] = $role; $response['config'] = $config; } else { $response_code = 422; $response['code'] = 322; } return response()->json($response, $response_code); } /** * Set the domain configuration. * * @param string $id Room identifier (name) * * @return \Illuminate\Http\JsonResponse|void */ public function setRoomConfig($id) { $room = Room::where('name', $id)->first(); // Room does not exist, or the owner is deleted if (!$room || !$room->owner) { return $this->errorResponse(404); } $user = Auth::guard()->user(); // Only room owner can configure the room if ($user->id != $room->user_id) { return $this->errorResponse(403); } $input = request()->input(); $errors = []; foreach ($input as $key => $value) { switch ($key) { case 'password': if ($value === null || $value === '') { $input[$key] = null; } else { // TODO: Do we have to validate the password in any way? } break; case 'locked': $input[$key] = $value ? 'true' : null; break; case 'nomedia': $input[$key] = $value ? 'true' : null; break; default: $errors[$key] = \trans('meet.room-unsupported-option-error'); } } if (!empty($errors)) { return response()->json(['status' => 'error', 'errors' => $errors], 422); } if (!empty($input)) { $room->setSettings($input); } return response()->json([ 'status' => 'success', 'message' => \trans('meet.room-setconfig-success'), ]); } /** * Update the participant/connection parameters (e.g. role). * * @param string $id Room identifier (name) * @param string $conn Connection identifier * * @return \Illuminate\Http\JsonResponse */ public function updateConnection($id, $conn) { $connection = Connection::where('id', $conn)->first(); // There's no such connection, bye bye if (!$connection || $connection->room->name != $id) { return $this->errorResponse(404, \trans('meet.connection-not-found')); } foreach (request()->input() as $key => $value) { switch ($key) { - case 'language': - // Only the moderator can do it - if (!$this->isModerator($connection->room)) { - return $this->errorResponse(403); - } - - if ($value) { - if (preg_match('/^[a-z]{2}$/', $value)) { - $connection->metadata = ['language' => $value] + $connection->metadata; - } - } else { - $connection->metadata = array_diff_key($connection->metadata, ['language' => 0]); - } - - break; - case 'role': // Only the moderator can do it if (!$this->isModerator($connection->room)) { return $this->errorResponse(403); } // The 'owner' role is not assignable if ($value & Room::ROLE_OWNER && !($connection->role & Room::ROLE_OWNER)) { return $this->errorResponse(403); } elseif (!($value & Room::ROLE_OWNER) && ($connection->role & Room::ROLE_OWNER)) { return $this->errorResponse(403); } // The room owner has always a 'moderator' role if (!($value & Room::ROLE_MODERATOR) && $connection->role & Room::ROLE_OWNER) { $value |= Room::ROLE_MODERATOR; } // Promotion to publisher? Put the user hand down if ($value & Room::ROLE_PUBLISHER && !($connection->role & Room::ROLE_PUBLISHER)) { $connection->metadata = array_diff_key($connection->metadata, ['hand' => 0]); } // Non-publisher cannot be a language interpreter if (!($value & Room::ROLE_PUBLISHER)) { $connection->metadata = array_diff_key($connection->metadata, ['language' => 0]); } $connection->{$key} = $value; break; } } // The connection observer will send a signal to everyone when needed $connection->save(); return response()->json(['status' => 'success']); } /** * Webhook as triggered from OpenVidu server * * @param \Illuminate\Http\Request $request The API request. * * @return \Illuminate\Http\Response The response */ public function webhook(Request $request) { \Log::debug($request->getContent()); switch ((string) $request->input('event')) { case 'sessionDestroyed': // When all participants left the room OpenVidu dispatches sessionDestroyed // event. We'll remove the session reference from the database. $sessionId = $request->input('sessionId'); $room = Room::where('session_id', $sessionId)->first(); if ($room) { $room->session_id = null; $room->save(); } // Remove all connections // Note: We could remove connections one-by-one via the 'participantLeft' event // but that could create many INSERTs when the session (with many participants) ends // So, it is better to remove them all in a single INSERT. Connection::where('session_id', $sessionId)->delete(); break; // TODO: We need to update connection state via webhook // I.e. isModerator() checks here require up-to-date // participant role information. Another option might be accepting/denying // join requests via websocket } return response('Success', 200); } /** * Check if current user is a moderator for the specified room. * * @param \App\OpenVidu\Room $room The room * * @return bool True if the current user is the room moderator */ protected function isModerator(Room $room): bool { $user = Auth::guard()->user(); // The room owner is a moderator if ($user && $user->id == $room->user_id) { return true; } // Moderator's authentication via the extra request header if ( ($connection = $this->getConnectionFromRequest()) && $connection->session_id === $room->session_id && $connection->role & Room::ROLE_MODERATOR ) { return true; } return false; } /** * Check if current user "owns" the specified connection. * * @param \App\OpenVidu\Connection $connection The connection * * @return bool */ protected function isSelfConnection(Connection $connection): bool { return ($conn = $this->getConnectionFromRequest()) && $conn->id === $connection->id; } /** * Get the connection object for the token in current request headers. * It will also validate the token. * * @return \App\OpenVidu\Connection|null Connection (if exists and the token is valid) */ protected function getConnectionFromRequest() { // Authenticate the user via the extra request header if ($token = request()->header(self::AUTH_HEADER)) { list($connId, ) = explode(':', base64_decode($token), 2); if ( ($connection = Connection::find($connId)) && $connection->metadata['authToken'] === $token ) { return $connection; } } return null; } } diff --git a/src/resources/js/meet/client.js b/src/resources/js/meet/client.js index 6cb8d497..b9bc5e2a 100644 --- a/src/resources/js/meet/client.js +++ b/src/resources/js/meet/client.js @@ -1,776 +1,774 @@ 'use strict' import { Device, parseScalabilityMode } from 'mediasoup-client' import Config from './config.js' import { Media } from './media.js' import { Roles } from './constants.js' import { Socket } from './socket.js' function Client() { let eventHandlers = {} let camProducer let micProducer let screenProducer let consumers = {} let socket let sendTransportInfo let sendTransport let recvTransport let iceServers = [] let nickname = '' let peers = {} let joinProps = {} let videoSource let audioSource const VIDEO_CONSTRAINTS = { 'low': { width: { ideal: 320 } }, 'medium': { width: { ideal: 640 } }, 'high': { width: { ideal: 1280 } }, 'veryhigh': { width: { ideal: 1920 } }, 'ultra': { width: { ideal: 3840 } } } // Create a device (use browser auto-detection) const device = new Device() // A helper for basic browser media operations const media = new Media() this.media = media navigator.mediaDevices.addEventListener('devicechange', () => { trigger('deviceChange') }) /** * Start a session (join a room) */ this.joinSession = (token, props) => { // Store the join properties for later joinProps = props // Initialize the socket, 'roomReady' request handler will do the rest of the job socket = initSocket(token) } /** * Close the session (disconnect) */ this.closeSession = async (reason) => { // If room owner, send the request to close the room if (reason === true && peers.self && peers.self.role & Roles.OWNER) { await socket.sendRequest('moderator:closeRoom') } trigger('closeSession', { reason: reason || 'disconnected' }) if (socket) { socket.close() } media.setupStop() // Close mediasoup Transports. if (sendTransport) { sendTransport.close() sendTransport = null } if (recvTransport) { recvTransport.close() recvTransport = null } // Remove peers' video elements Object.keys(peers).forEach(id => { let peer = peers[id] if (peer.videoElement) { $(peer.videoElement).remove() } }) // Reset state eventHandlers = {} camProducer = null micProducer = null screenProducer = null consumers = {} peers = {} } this.isJoined = () => { return 'self' in peers } this.camMute = async () => { if (camProducer) { camProducer.pause() await socket.sendRequest('pauseProducer', { producerId: camProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.camStatus() } this.camUnmute = async () => { if (camProducer) { camProducer.resume() await socket.sendRequest('resumeProducer', { producerId: camProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.camStatus() } this.camStatus = () => { return camProducer && !camProducer.paused && !camProducer.closed } this.micMute = async () => { if (micProducer) { micProducer.pause() await socket.sendRequest('pauseProducer', { producerId: micProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.micStatus() } this.micUnmute = async () => { if (micProducer) { micProducer.resume() await socket.sendRequest('resumeProducer', { producerId: micProducer.id }) trigger('updatePeer', updatePeerState(peers.self)) } return this.micStatus() } this.micStatus = () => { return micProducer && !micProducer.paused && !micProducer.closed } this.kickPeer = (peerId) => { socket.sendRequest('moderator:kickPeer', { peerId }) } this.chatMessage = (message) => { socket.sendRequest('chatMessage', { message }) } this.raiseHand = async (status) => { if (peers.self.raisedHand != status) { peers.self.raisedHand = status await socket.sendRequest('raisedHand', { raisedHand: status }) trigger('updatePeer', peers.self, ['raisedHand']) } return status } this.setNickname = (nickname) => { if (peers.self.nickname != nickname) { peers.self.nickname = nickname socket.sendRequest('changeNickname', { nickname }) trigger('updatePeer', peers.self, ['nickname']) } } + this.setLanguage = (peerId, language) => { + socket.sendRequest('moderator:changeLanguage', { peerId, language }) + } + this.addRole = (peerId, role) => { socket.sendRequest('moderator:addRole', { peerId, role }) } this.removeRole = (peerId, role) => { socket.sendRequest('moderator:removeRole', { peerId, role }) } /** * Register event handlers */ this.on = (eventName, callback) => { eventHandlers[eventName] = callback } /** * Execute an event handler */ const trigger = (...args) => { const eventName = args.shift() if (eventName in eventHandlers) { eventHandlers[eventName].apply(null, args) } } const initSocket = (token) => { // Connect to websocket socket = new Socket(token) socket.on('disconnect', reason => { // this.closeSession() }) socket.on('reconnectFailed', () => { // this.closeSession() }) socket.on('request', async (request, cb) => { switch (request.method) { case 'newConsumer': const { peerId, producerId, id, kind, rtpParameters, type, appData, producerPaused } = request.data const consumer = await recvTransport.consume({ id, producerId, kind, rtpParameters }) consumer.peerId = peerId consumer.on('transportclose', () => { // TODO: What actually else needs to be done here? delete consumers[consumer.id] }) consumers[consumer.id] = consumer // We are ready. Answer the request so the server will // resume this Consumer (which was paused for now). cb(null) let peer = peers[peerId] if (!peer) { return } addPeerTrack(peer, consumer.track) trigger('updatePeer', peer) break default: console.error('Unknow request method: ' + request.method) } }) socket.on('notification', (notification) => { switch (notification.method) { case 'roomReady': iceServers = notification.data.iceServers joinRoom() return case 'newPeer': peers[notification.data.id] = notification.data trigger('addPeer', notification.data) return case 'peerClosed': const { peerId } = notification.data delete peers[peerId] trigger('removePeer', peerId) return case 'consumerClosed': { const { consumerId } = notification.data const consumer = consumers[consumerId] if (!consumer) { return } consumer.close() delete consumers[consumerId] let peer = peers[consumer.peerId] if (peer) { // TODO: Update peer state, remove track trigger('updatePeer', peer) } return } case 'consumerPaused': case 'consumerResumed': { const { consumerId } = notification.data const consumer = consumers[consumerId] if (!consumer) { return } consumer[notification.method == 'consumerPaused' ? 'pause' : 'resume']() let peer = peers[consumer.peerId] if (peer) { trigger('updatePeer', updatePeerState(peer)) } return } - case 'changeNickname': { - const { peerId, nickname } = notification.data - const peer = peers[peerId] - - if (!peer) { - return - } - - peer.nickname = nickname + case 'changeLanguage': + updatePeerProperty(notification.data, 'language') + return - trigger('updatePeer', peer, ['nickname']) + case 'changeNickname': + updatePeerProperty(notification.data, 'nickname') return - } case 'changeRole': { const { peerId, role } = notification.data const peer = peers.self.id === peerId ? peers.self : peers[peerId] if (!peer) { return } let changes = ['role'] const rolePublisher = role & Roles.PUBLISHER const roleModerator = role & Roles.MODERATOR const isPublisher = peer.role & Roles.PUBLISHER const isModerator = peer.role & Roles.MODERATOR if (isPublisher && !rolePublisher) { // demoted to a subscriber changes.push('publisherRole') if (peer.isSelf) { // stop publishing any streams this.setMic('', true) this.setCamera('', true) } else { // remove the video element peer.videoElement = null // TODO: Do we need to remove/stop consumers? } } else if (!isPublisher && rolePublisher) { // promoted to a publisher changes.push('publisherRole') // create a video element with no tracks setPeerTracks(peer, []) } if ((!isModerator && roleModerator) || (isModerator && !roleModerator)) { changes.push('moderatorRole') } peer.role = role trigger('updatePeer', peer, changes) return } - case 'chatMessage': { + case 'chatMessage': + notification.data.isSelf = notification.data.peerId == peers.self.id trigger('chatMessage', notification.data) return - } - case 'moderator:closeRoom': { + case 'moderator:closeRoom': this.closeSession('session-closed') return - } - case 'moderator:kickPeer': { + case 'moderator:kickPeer': this.closeSession('session-closed') return - } - - case 'raisedHand': { - const { peerId, raisedHand } = notification.data - const peer = peers[peerId] - if (!peer) { - return - } - - peer.raisedHand = raisedHand - - trigger('updatePeer', peer, ['raisedHand']) + case 'raisedHand': + updatePeerProperty(notification.data, 'raisedHand') return - } - case 'signal:joinRequest': { + case 'signal:joinRequest': trigger('joinRequest', notification.data) return - } default: console.error('Unknow notification method: ' + notification.method) } }) return socket } const joinRoom = async () => { const routerRtpCapabilities = await socket.getRtpCapabilities() routerRtpCapabilities.headerExtensions = routerRtpCapabilities.headerExtensions .filter(ext => ext.uri !== 'urn:3gpp:video-orientation') await device.load({ routerRtpCapabilities }) // Setup the consuming transport (for handling streams of other participants) await setRecvTransport() // Send the "join" request, get room data, participants, etc. const { peers: existing, role, id: peerId } = await socket.sendRequest('join', { nickname: joinProps.nickname, rtpCapabilities: device.rtpCapabilities }) trigger('joinSuccess') let peer = { id: peerId, role, nickname: joinProps.nickname, isSelf: true } // Add self to the list peers.self = peer // Start publishing webcam and mic (and setup the producing transport) await this.setCamera(joinProps.videoSource, true) await this.setMic(joinProps.audioSource, true) updatePeerState(peer) trigger('addPeer', peer) // Trigger addPeer event for all peers already in the room, maintain peers list existing.forEach(peer => { let tracks = [] // We receive newConsumer requests before we add the peer to peers list, // therefore we look here for any consumers that belong to this peer and update // the peer. If we do not do this we have to wait about 20 seconds for repeated // newConsumer requests Object.keys(consumers).forEach(cid => { if (consumers[cid].peerId === peer.id) { tracks.push(consumers[cid].track) } }) if (tracks.length) { setPeerTracks(peer, tracks) } peers[peer.id] = peer trigger('addPeer', peer) }) } this.setCamera = async (deviceId, noUpdate) => { // Actually selected device, do nothing if (deviceId == videoSource) { return } // Remove current device, stop producer if (camProducer && !camProducer.closed) { camProducer.close() await socket.sendRequest('closeProducer', { producerId: camProducer.id }) setPeerTracks(peers.self, []) } peers.self.videoSource = videoSource = deviceId if (!deviceId) { if (!noUpdate) { trigger('updatePeer', updatePeerState(peers.self), ['videoSource']) } return } if (!device.canProduce('video')) { throw new Error('cannot produce video') } const { aspectRatio, frameRate, resolution } = Config.videoOptions const track = await media.getTrack({ video: { deviceId: { ideal: deviceId }, ...VIDEO_CONSTRAINTS[resolution], frameRate } }) await setSendTransport() // TODO: Simulcast support? camProducer = await sendTransport.produce({ track, appData: { source : 'webcam' } }) /* camProducer.on('transportclose', () => { camProducer = null }) camProducer.on('trackended', () => { // disableWebcam() }) */ // Create/Update the video element addPeerTrack(peers.self, track) if (!noUpdate) { trigger('updatePeer', peers.self, ['videoSource']) } } this.setMic = async (deviceId, noUpdate) => { // Actually selected device, do nothing if (deviceId == audioSource) { return } // Remove current device, stop producer if (micProducer && !micProducer.closed) { micProducer.close() await socket.sendRequest('closeProducer', { producerId: micProducer.id }) } peers.self.audioSource = audioSource = deviceId if (!deviceId) { if (!noUpdate) { trigger('updatePeer', updatePeerState(peers.self), ['audioSource']) } return } if (!device.canProduce('audio')) { throw new Error('cannot produce audio') } const { autoGainControl, echoCancellation, noiseSuppression, sampleRate, channelCount, volume, sampleSize, opusStereo, opusDtx, opusFec, opusPtime, opusMaxPlaybackRate } = Config.audioOptions const track = await media.getTrack({ audio: { sampleRate, channelCount, volume, autoGainControl, echoCancellation, noiseSuppression, sampleSize, deviceId: { ideal: deviceId } } }) await setSendTransport() micProducer = await sendTransport.produce({ track, codecOptions: { opusStereo, opusDtx, opusFec, opusPtime, opusMaxPlaybackRate }, appData: { source : 'mic' } }) /* micProducer.on('transportclose', () => { micProducer = null }) micProducer.on('trackended', () => { // disableMic() }) */ // Note: We're not adding this track to the video element if (!noUpdate) { trigger('updatePeer', updatePeerState(peers.self), ['audioSource']) } } const setPeerTracks = (peer, tracks) => { if (!peer.videoElement) { peer.videoElement = media.createVideoElement(tracks, { mirror: peer.isSelf }) } else { const stream = new MediaStream() tracks.forEach(track => stream.addTrack(track)) peer.videoElement.srcObject = stream } updatePeerState(peer) } const addPeerTrack = (peer, track) => { if (!peer.videoElement) { setPeerTracks(peer, [ track ]) return } const stream = peer.videoElement.srcObject if (track.kind == 'video') { media.removeTracksFromStream(stream, 'Video') } else { media.removeTracksFromStream(stream, 'Audio') } stream.addTrack(track) updatePeerState(peer) } const updatePeerState = (peer) => { if (peer.isSelf) { peer.videoActive = this.camStatus() peer.audioActive = this.micStatus() } else { peer.videoActive = false peer.audioActive = false Object.keys(consumers).forEach(cid => { const consumer = consumers[cid] if (consumer.peerId == peer.id) { peer[consumer.kind + 'Active'] = !consumer.paused && !consumer.closed && !consumer.producerPaused } }) } return peer } const setSendTransport = async () => { if (sendTransport && !sendTransport.closed) { return } if (!sendTransportInfo) { sendTransportInfo = await socket.sendRequest('createWebRtcTransport', { forceTcp: false, producing: true, consuming: false }) } const { id, iceParameters, iceCandidates, dtlsParameters } = sendTransportInfo const iceTransportPolicy = (device.handlerName.toLowerCase().includes('firefox') && iceServers) ? 'relay' : undefined sendTransport = device.createSendTransport({ id, iceParameters, iceCandidates, dtlsParameters, iceServers, iceTransportPolicy, proprietaryConstraints: { optional: [{ googDscp: true }] } }) sendTransport.on('connect', ({ dtlsParameters }, callback, errback) => { socket.sendRequest('connectWebRtcTransport', { transportId: sendTransport.id, dtlsParameters }) .then(callback) .catch(errback) }) sendTransport.on('produce', async ({ kind, rtpParameters, appData }, callback, errback) => { try { const { id } = await socket.sendRequest('produce', { transportId: sendTransport.id, kind, rtpParameters, appData }) callback({ id }) } catch (error) { errback(error) } }) } const setRecvTransport = async () => { const transportInfo = await socket.sendRequest('createWebRtcTransport', { forceTcp: false, producing: false, consuming: true }) const { id, iceParameters, iceCandidates, dtlsParameters } = transportInfo const iceTransportPolicy = (device.handlerName.toLowerCase().includes('firefox') && iceServers) ? 'relay' : undefined recvTransport = device.createRecvTransport({ id, iceParameters, iceCandidates, dtlsParameters, iceServers, iceTransportPolicy }) recvTransport.on('connect', ({ dtlsParameters }, callback, errback) => { socket.sendRequest('connectWebRtcTransport', { transportId: recvTransport.id, dtlsParameters }) .then(callback) .catch(errback) }) } + + const updatePeerProperty = (data, prop) => { + const peerId = data.peerId + const peer = peers.self.id === peerId ? peers.self : peers[peerId] + + if (!peer) { + return + } + + peer[prop] = data[prop] + + trigger('updatePeer', peer, [ prop ]) + } } export { Client } diff --git a/src/resources/js/meet/room.js b/src/resources/js/meet/room.js index 9f03367d..c6174c19 100644 --- a/src/resources/js/meet/room.js +++ b/src/resources/js/meet/room.js @@ -1,1083 +1,1081 @@ 'use strict' import anchorme from 'anchorme' import { Client } from './client.js' import { Roles } from './constants.js' import { Dropdown } from 'bootstrap' import { library } from '@fortawesome/fontawesome-svg-core' function Room(container) { let sessionData // Room session metadata let peers = {} // Participants in the session (including self) let publishersContainer // Container element for publishers let subscribersContainer // Container element for subscribers let chatCount = 0 let scrollStop let $t const client = new Client() // Disconnect participant when browser's window close window.addEventListener('beforeunload', () => { leaveRoom() }) window.addEventListener('resize', resize) // Public methods this.isScreenSharingSupported = isScreenSharingSupported this.joinRoom = joinRoom this.leaveRoom = leaveRoom this.raiseHand = raiseHand this.setupStart = setupStart this.setupStop = setupStop this.setupSetAudioDevice = setupSetAudioDevice this.setupSetVideoDevice = setupSetVideoDevice this.switchAudio = switchAudio this.switchChannel = switchChannel this.switchScreen = switchScreen this.switchVideo = switchVideo - this.updateSession = updateSession /** * Join the room session * * @param data Session metadata and event handlers: * token - A token for the main connection, * shareToken - A token for screen-sharing connection, * nickname - Participant name, * languages - Supported languages (code-to-label map) * chatElement - DOM element for the chat widget, * counterElement - DOM element for the participants counter, * menuElement - DOM element of the room toolbar, * queueElement - DOM element for the Q&A queue (users with a raised hand) - * onSuccess - Callback for session connection (join) success - * onError - Callback for session connection (join) error - * onDestroy - Callback for session disconnection event, - * onJoinRequest - Callback for join request, - * onSessionDataUpdate - Callback for current user connection update, - * onMediaSetup - Called when user clicks the Media setup button - * translate - Translation function + * onSuccess - Callback for session connection (join) success + * onError - Callback for session connection (join) error + * onDestroy - Callback for session disconnection event, + * onJoinRequest - Callback for join request, + * onUpdate - Callback for current user/session update, + * onMediaSetup - Called when user clicks the Media setup button + * translate - Translation function */ function joinRoom(data) { // Create a container for subscribers and publishers publishersContainer = $('
').appendTo(container).get(0) subscribersContainer = $('
').appendTo(container).get(0) resize(); $t = data.translate // Make sure all supported callbacks exist, so we don't have to check // their existence everywhere anymore let events = ['Success', 'Error', 'Destroy', 'JoinRequest', 'SessionDataUpdate', 'MediaSetup'] events.map(event => 'on' + event).forEach(event => { if (!data[event]) { data[event] = () => {} } }) sessionData = data // Participant added (including self) client.on('addPeer', (event) => { console.log('addPeer', event) event.element = participantCreate(event) peers[event.id] = event + + updateSession() }) // Participant removed client.on('removePeer', (peerId) => { console.log('removePeer', peerId) let peer = peers[peerId] if (peer) { // Remove elements related to the participant peerHandDown(peer) $(peer.element).remove() delete peers[peerId] } + updateSession() + resize() }) // Participant properties changed e.g. audio/video muted/unmuted client.on('updatePeer', (event, changed) => { console.log('updatePeer', event) let peer = peers[event.id] if (!peer) { return } event.element = peer.element if (event.videoElement && event.videoElement.parentNode != event.element) { $(event.element).prepend(event.videoElement) } else if (!event.videoElement) { $(event.element).find('video').remove() } if (changed && changed.length) { if (changed && changed.includes('nickname')) { nicknameUpdate(event.nickname, event.id) } if (changed.includes('raisedHand')) { if (event.raisedHand) { peerHandUp(event) } else { peerHandDown(event) } } } event.element = participantUpdate(event.element, event) // It's me, got publisher role if (peer.isSelf && (event.role & Roles.PUBLISHER) && changed && changed.includes('publisherRole')) { // Open the media setup dialog sessionData.onMediaSetup() } -/* - // Update channels list - sessionData.channels = getChannels(peers) - - // The channel user was using has been removed (or rather the participant stopped being an interpreter) - if (sessionData.channel && !sessionData.channels.includes(sessionData.channel)) { - sessionData.channel = null - refresh = true - } -*/ - if (changed && changed.includes('moderatorRole')) { - participantUpdateAll() - } - - // Inform the vue component, so it can update some UI controls - // sessionData.onSessionDataUpdate(sessionData) peers[event.id] = event + + updateSession(changed && changed.includes('moderatorRole')) }) client.on('joinSuccess', () => { data.onSuccess() client.media.setupStop() }) client.on('joinRequest', event => { data.onJoinRequest(event) }) // Handle session disconnection events client.on('closeSession', event => { // Notify the UI data.onDestroy(event) // Remove all participant elements Object.keys(peers).forEach(peerId => { $(peers[peerId].element).remove() }) peers = {} // refresh the matrix resize() }) const { audioSource, videoSource } = client.media.setupData() // Start the session client.joinSession(data.token, { videoSource, audioSource, nickname: data.nickname }) // Prepare the chat initChat() } /** * Leave the room (disconnect) */ function leaveRoom(forced) { client.closeSession(forced) peers = {} } /** * Raise or lower the hand * * @param status Hand raised or not */ async function raiseHand(status) { return await client.raiseHand(status) } /** * Sets the audio and video devices for the session. * This will ask user for permission to access media devices. * * @param props Setup properties (videoElement, volumeElement, onSuccess, onError) */ function setupStart(props) { client.media.setupStart(props) // When setting up devices while the session is ongoing we have to // disable currently selected devices (temporarily) otherwise e.g. // changing a mic or camera to another device will not be possible. if (client.isJoined()) { client.setMic('') client.setCamera('') } } /** * Stop the setup "process", cleanup after it. */ async function setupStop() { client.media.setupStop() // Apply device changes to the client const { audioSource, videoSource } = client.media.setupData() await client.setMic(audioSource) await client.setCamera(videoSource) } /** * Change the publisher audio device * * @param deviceId Device identifier string */ async function setupSetAudioDevice(deviceId) { return await client.media.setupSetAudio(deviceId) } /** * Change the publisher video device * * @param deviceId Device identifier string */ async function setupSetVideoDevice(deviceId) { return await client.media.setupSetVideo(deviceId) } /** * Setup the chat UI */ function initChat() { // Handle arriving chat messages client.on('chatMessage', pushChatMessage) // The UI elements are created in the vue template // Here we add a logic for how they work const chat = $(sessionData.chatElement).find('.chat').get(0) const textarea = $(sessionData.chatElement).find('textarea') const button = $(sessionData.menuElement).find('.link-chat') textarea.on('keydown', e => { if (e.keyCode == 13 && !e.shiftKey) { if (textarea.val().length) { client.chatMessage(textarea.val()) textarea.val('') } return false } }) // Add an element for the count of unread messages on the chat button button.append('') .on('click', () => { button.find('.badge').text('') chatCount = 0 // When opening the chat scroll it to the bottom, or we shouldn't? scrollStop = false chat.scrollTop = chat.scrollHeight }) $(chat).on('scroll', event => { // Detect manual scrollbar moves, disable auto-scrolling until // the scrollbar is positioned on the element bottom again scrollStop = chat.scrollTop + chat.offsetHeight < chat.scrollHeight }) } /** * Add a message to the chat * * @param data Object with a message, nickname, id (of the connection, empty for self) */ function pushChatMessage(data) { let message = $('').text(data.message).text() // make the message secure // Format the message, convert emails and urls to links message = anchorme({ input: message, options: { attributes: { target: "_blank" }, // any link above 20 characters will be truncated // to 20 characters and ellipses at the end truncate: 20, // characters will be taken out of the middle middleTruncation: true } // TODO: anchorme is extensible, we could support // github/phabricator's markup e.g. backticks for code samples }) message = message.replace(/\r?\n/, '
') // Display the message - let isSelf = false // TODO let chat = $(sessionData.chatElement).find('.chat') let box = chat.find('.message').last() message = $('
').html(message) message.find('a').attr('rel', 'noreferrer') if (box.length && box.data('id') == data.peerId) { // A message from the same user as the last message, no new box needed message.appendTo(box) } else { box = $('
').data('id', data.peerId) .append($('
').text(data.nickname || '')) .append(message) .appendTo(chat) - if (isSelf) { + if (data.isSelf) { box.addClass('self') } } // Count unread messages if (!$(sessionData.chatElement).is('.open')) { - if (!isSelf) { + if (!data.isSelf) { chatCount++ } } else { chatCount = 0 } $(sessionData.menuElement).find('.link-chat .badge').text(chatCount ? chatCount : '') // Scroll the chat element to the end if (!scrollStop) { chat.get(0).scrollTop = chat.get(0).scrollHeight } } /** * Switch interpreted language channel * * @param channel Two-letter language code */ function switchChannel(channel) { sessionData.channel = channel - - // Mute/unmute all peers depending on the selected channel - participantUpdateAll() + updateSession(true) } /** * Mute/Unmute audio for current session publisher */ async function switchAudio() { const isActive = client.micStatus() if (isActive) { return await client.micMute() } else { return await client.micUnmute() } } /** * Mute/Unmute video for current session publisher */ async function switchVideo() { const isActive = client.camStatus() if (isActive) { return await client.camMute() } else { return await client.camUnmute() } } /** * Switch on/off screen sharing */ function switchScreen(callback) { // TODO } /** * Detect if screen sharing is supported by the browser */ function isScreenSharingSupported() { return false // TODO !!(navigator.mediaDevices && navigator.mediaDevices.getDisplayMedia) } /** * Handler for Hand-Up "signal" */ function peerHandUp(peer) { let element = $(nicknameWidget(peer)) participantUpdate(element, peer) element.attr('id', 'qa' + peer.id) .appendTo($(sessionData.queueElement).show()) setTimeout(() => element.addClass('widdle'), 50) } /** * Handler for Hand-Down "signal" */ function peerHandDown(peer) { let list = $(sessionData.queueElement) list.find('#qa' + peer.id).remove(); if (!list.find('.meet-nickname').length) { list.hide(); } } /** * Update participant nickname in the UI * * @param nickname Nickname * @param peerId Connection identifier of the user */ function nicknameUpdate(nickname, peerId) { if (peerId) { $(sessionData.chatElement).find('.chat').find('.message').each(function() { let elem = $(this) if (elem.data('id') == peerId) { elem.find('.nickname').text(nickname || '') } }) $(sessionData.queueElement).find('#qa' + peerId + ' .content').text(nickname || '') } } /** * Create a participant element in the matrix. Depending on the peer role * parameter it will be a video element wrapper inside the matrix or a simple * tag-like element on the subscribers list. * * @param params Peer metadata/params * @param content Optional content to prepend to the element * * @return The element */ function participantCreate(params, content) { let element if ((!params.language && params.role & Roles.PUBLISHER) || params.role & Roles.SCREEN) { // publishers and shared screens element = publisherCreate(params, content) if (params.videoElement) { $(element).prepend(params.videoElement) } } else { // subscribers and language interpreters element = subscriberCreate(params, content) } setTimeout(resize, 50); return element } /** * Create a