diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index 19f86ca6..4c131ef4 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1229 +1,1230 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); const axios = require('axios'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const Roles = require('./userRoles'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter { /* * Find a router that is on a worker that is least loaded. * * A worker with a router that we are already piping to is preferred. */ static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); // Calculate router loads by adding up peers per router, // and collected piped routers for (const peer of peers.values()) { const routerId = peer.routerId; if (routerId) { if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); } else { routerLoads.set(routerId, 1); } } } // Calculate worker loads by adding up router loads per worker for (const worker of mediasoupWorkers) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } else { // find if there is a piped router that is on a worker that is below limit for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ static async create({ mediasoupWorkers, roomId, peers }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; this._allPeers = peers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); this._lastN = []; this._peers = {}; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; } close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; // Close the peers. for (const peer in this._peers) { if (!this._peers[peer].closed) this._peers[peer].close(); } this._peers = null; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._allPeers = null; this._mediasoupWorkers = null; this._mediasoupRouters.clear(); this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } handlePeer({ peer }) { logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role); // Should not happen if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } this._peerJoining(peer); } logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } get id() { return this._roomId; } selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } checkEmpty() { return Object.keys(this._peers).length === 0; } _peerJoining(peer) { this._queue.push(async () => { peer.socket.join(this._roomId); // If we don't have this peer, add to end !this._lastN.includes(peer.id) && this._lastN.push(peer.id); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); let turnServers; if ('turnAPIURI' in config) { try { const { data } = await axios.get( config.turnAPIURI, { timeout : config.turnAPITimeout || 2000, params : { ...config.turnAPIparams, 'api_key' : config.turnAPIKey, 'ip' : peer.socket.request.connection.remoteAddress } }); turnServers = [ { urls : data.uris, username : data.username, credential : data.password } ]; } catch (error) { if ('backupTurnServers' in config && config.backupTurnServers.length) turnServers = config.backupTurnServers; logger.error('_peerJoining() | error on REST turn [error:"%o"]', error); } } else if ('backupTurnServers' in config && config.backupTurnServers.length) { turnServers = config.backupTurnServers; } this._notification(peer.socket, 'roomReady', { turnServers }); }) .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); peer.on('close', () => { this._handlePeerClose(peer); }); peer.on('nicknameChanged', () => { // Spread to others this._notification(peer.socket, 'changeNickname', { peerId: peer.id, nickname: peer.nickname }, true); }); peer.on('gotRole', ({ newRole }) => { // Spread to others this._notification(peer.socket, 'gotRole', { peerId: peer.id, role: newRole }, true, true); }); peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); // Remove from lastN this._lastN = this._lastN.filter((id) => id !== peer.id); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); console.log(request.method); switch (request.method) { case 'getRouterRtpCapabilities': { cb(null, router.rtpCapabilities); break; } case 'join': { const { nickname, picture, rtpCapabilities } = request.data; // Store client data into the Peer data object. peer.nickname = nickname; peer.picture = picture; peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. // And also create Consumers for existing Producers. const otherPeers = this.getPeers(peer); const peerInfos = otherPeers .map((otherPeer) => (otherPeer.peerInfo)); cb(null, { id: peer.id, role: peer.role, peers: peerInfos, }); for (const otherPeer of otherPeers) { // Create Consumers for existing Producers. for (const producer of otherPeer.producers.values()) { this._createConsumer( { consumerPeer : peer, producerPeer : otherPeer, producer }); } } // Notify the new Peer to all other Peers. for (const otherPeer of this.getPeers(peer)) { this._notification( otherPeer.socket, 'newPeer', peer.peerInfo ); } logger.debug( 'peer joined [peer: "%s", nickname: "%s", picture: "%s"]', peer.id, nickname, picture); break; } case 'createPlainTransport': { const { producing, consuming } = request.data; const transport = await router.createPlainTransport( { - comedia: true, + //When consuming we manually connect using connectPlainTransport, + //otherwise we let the port autodetection work. + comedia: producing, // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) rtcpMux: false, listenIp: { ip: "127.0.0.1", announcedIp: null }, appData : { producing, consuming } } ); - // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { - id : transport.id, + id : transport.id, ip : transport.tuple.localIp, port : transport.tuple.localPort, rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined }); break; } case 'connectPlainTransport': { const { transportId, ip, port, rtcpPort } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ ip: ip, port: port, rtcpPort: rtcpPort, }); cb(); break; } case 'createWebRtcTransport': { // NOTE: Don't require that the Peer is joined here, so the client can // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; webRtcTransportOptions.enableTcp = true; if (forceTcp) webRtcTransportOptions.enableUdp = false; else { webRtcTransportOptions.enableUdp = true; webRtcTransportOptions.preferUdp = true; } const transport = await router.createWebRtcTransport( webRtcTransportOptions ); transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); } }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters }); const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } catch (error) { logger.info("Setting the incoming bitrate failed") } } break; } case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ dtlsParameters }); cb(); break; } /* case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } */ case 'produce': { let { appData } = request.data; if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) throw new Error('invalid producer source'); if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); const { transportId, kind, rtpParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); // Add peerId into appData to later get the associated Peer during // the 'loudest' event of the audioLevelObserver. appData = { ...appData, peerId: peer.id }; const producer = await transport.produce({ kind, rtpParameters, appData }); const pipeRouters = this._getRoutersToPipeTo(peer.routerId); for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, router : destinationRouter }); } } // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); producer.on('videoorientationchange', (videoOrientation) => { logger.debug( 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); // Trace individual packets for debugging // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); // producer.on("trace", (trace) => { // console.log(`Trace on ${producer.id}`, trace); // }); cb(null, { id: producer.id }); // Optimization: Create a server-side Consumer for each Peer. for (const otherPeer of this.getPeers(peer)) { this._createConsumer( { consumerPeer : otherPeer, producerPeer : peer, producer }); } // Add into the audioLevelObserver. if (kind === 'audio') { this._audioLevelObserver.addProducer({ producerId: producer.id }) .catch(() => {}); } break; } case 'closeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); producer.close(); // Remove from its map. peer.removeProducer(producer.id); cb(); break; } case 'pauseProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.pause(); cb(); break; } case 'resumeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.resume(); cb(); break; } case 'pauseConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.pause(); cb(); break; } case 'resumeConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.resume(); cb(); break; } case 'changeNickname': { const { nickname } = request.data; peer.nickname = nickname; // This will be spread through events from the peer object // Return no error cb(); break; } case 'chatMessage': { const { message } = request.data; // Spread to others this._notification(peer.socket, 'chatMessage', { peerId: peer.id, nickname: peer.nickname, message: message }, true, true); // Return no error cb(); break; } case 'moderator:setRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const giveRolePeer = this._peers[peerId]; if (!giveRolePeer) throw new Error(`peer with id "${peerId}" not found`); // TODO: check if role is valid value // This will propagate the event automatically giveRolePeer.setRole(role); // Return no error cb(); break; } case 'raisedHand': { const { raisedHand } = request.data; peer.raisedHand = raisedHand; // Spread to others this._notification(peer.socket, 'raisedHand', { peerId: peer.id, raisedHand: raisedHand, }, true); // Return no error cb(); break; } case 'moderator:closeRoom': { if (!peer.hasRole(Roles.OWNER)) throw new Error('peer not authorized'); this._notification(peer.socket, 'moderator:closeRoom', null, true); cb(); // Close the room this.close(); break; } case 'moderator:kickPeer': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const kickPeer = this._peers[peerId]; if (!kickPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(kickPeer.socket, 'moderator:kickPeer'); kickPeer.close(); cb(); break; } default: { logger.error('unknown request.method "%s"', request.method); cb(500, `unknown request.method "${request.method}"`); } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } /** * Get the list of peers. */ getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } _timeoutCallback(callback) { let called = false; const interval = setTimeout( () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } _sendRequest(socket, method, data = {}) { return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, this._timeoutCallback((err, response) => { if (err) { reject(err); } else { resolve(response); } }) ); }); } async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; for (let tries = 0; tries < requestRetries; tries++) { try { return await this._sendRequest(socket, method, data); } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } _notification(socket, method, data = {}, broadcast = false, includeSender = false) { if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); } else { socket.emit('notification', { method, data }); } } /* * Pipe producers of peers that are running under another routher to this router. */ async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); // All peers that have a different router const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); for (const producerId of peer.producers.keys()) { if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._allPeers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to: // The combined set of routers of all peers, exluding the router of the peer itself. _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/meet/server/package.json b/meet/server/package.json index 2920cd05..39eafe50 100644 --- a/meet/server/package.json +++ b/meet/server/package.json @@ -1,40 +1,41 @@ { "name": "kolabmeet-server", "version": "3.3.4", "private": true, "license": "MIT", "scripts": { "start": "node server.js", "connect": "node connect.js", "lint": "eslint -c .eslintrc.json --ext .js *.js lib/", "test": "mocha test/test*.js" }, "dependencies": { "awaitqueue": "^1.0.0", "axios": "^0.21.1", "body-parser": "^1.19.0", - "child_process": "^1.0.2", "colors": "^1.4.0", "compression": "^1.7.4", "connect-redis": "^4.0.3", "cookie-parser": "^1.4.4", "debug": "^4.1.1", "express": "^4.17.1", "express-session": "^1.17.0", "express-socket.io-session": "^1.3.5", "helmet": "^3.21.2", "mediasoup": "^3.5.14", "pidusage": "^2.0.17", "prom-client": ">=12.0.0", "redis": "^2.8.0", "socket.io": "^2.3.0", "spdy": "^4.0.1", "uuid": "^7.0.2" }, - "devDependencies": { + "devDependencies": { + "dgram": "^1.0.1", + "child_process": "^1.0.2", "eslint": "^6.8.0", "mediasoup-client": "^3.6.37", "mocha": "^9.1.1", "supertest": "^6.1.6" } } diff --git a/meet/server/test/performancetestbench.js b/meet/server/test/performancetestbench.js index e1d4caa2..1790553e 100644 --- a/meet/server/test/performancetestbench.js +++ b/meet/server/test/performancetestbench.js @@ -1,321 +1,237 @@ process.env.DEBUG = '*' const assert = require('assert'); let request = require('supertest') const io = require("socket.io-client"); const child_process = require("child_process"); +const udp = require('dgram'); +let recvUdpSocket +let recvRtcpUdpSocket let app let processes = []; let rtpParameters = { - mediaCodecs: [ + codecs: [ { - kind: "audio", - mimeType: "audio/opus", - preferredPayloadType: 111, - clockRate: 48000, - channels: 2, - parameters: { - minptime: 10, - useinbandfec: 1, - }, - }, - { - kind: "video", mimeType: "video/H264", - preferredPayloadType: 125, + payloadType: 125, clockRate: 90000, parameters: { "level-asymmetry-allowed": 1, "packetization-mode": 1, "profile-level-id": "42e01f", }, }, ], } -function startFFMPEGStream(transportInfos, ssrc) { +function startFFMPEGStream(peers, ssrc) { const cmdProgram = "ffmpeg"; //Build a video stream per producer - const streams = transportInfos.map((transportInfo) => `[select=v:f=rtp:ssrc=${ssrc}:payload_type=125]rtp://127.0.0.1:${transportInfo.port}?rtcpport=${transportInfo.rtcpPort}`); + const streams = peers.map((peer) => `[select=v:f=rtp:ssrc=${ssrc}:payload_type=125]rtp://127.0.0.1:${peer.senderTransportInfo.port}?rtcpport=${peer.senderTransportInfo.rtcpPort}`); const cmdArgStr = [ "-i /dev/video0", //We are streaming from the webcam (a looping videofile would be an alternative) `-c:v h264`, //The codec "-map 0:v:0", "-f tee", //This option allows us to read the source once, encode once, and then output multiple streams streams.join('|').trim() ].join(" ").trim(); console.log(`Run command: ${cmdProgram} ${cmdArgStr}`); let recProcess = child_process.spawn(cmdProgram, cmdArgStr.split(/\s+/)); recProcess.on("error", (err) => { console.error("Recording process error:", err); }); recProcess.on("exit", (code, signal) => { console.log("Recording process exit, code: %d, signal: %s", code, signal); recProcess = null; }); // FFmpeg writes its logs to stderr recProcess.stderr.on("data", (chunk) => { chunk .toString() .split(/\r?\n/g) .filter(Boolean) // Filter out empty strings .forEach((line) => { console.log(line); }); }); return recProcess; } +async function sendRequest(socket, method, data = null) { + return await new Promise((resolve, /*reject*/) => { + socket.emit( + 'request', + {method: method, + data: data}, + (error, response) => { + assert(!error) + resolve(response) + } + ) + }) +} + + +async function createPeer(roomId, request, receiverPort, receiverRtcpPort) { + let signalingSocket + let peerId + await request + .post(`/meetmedia/api/sessions/${roomId}/connection`) + .send({role: 31}) + .expect(200) + .then(async (res) => { + let data = res.body; + peerId = data['id']; + const signalingUrl = data['token']; + assert(signalingUrl.includes(peerId)) + assert(signalingUrl.includes(roomId)) + signalingSocket = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false }); + let roomReady = new Promise((resolve, /*reject*/) => { + signalingSocket.once('notification', (reason) => { + console.warn("Received notification", reason) + if (reason['method'] == 'roomReady') { + resolve(); + } + }); + }) + + signalingSocket.connect(); + await roomReady + }) + .catch(err => { console.warn(err); throw err }) + + + //Necessary later for the server to resume the consumer, + //once we join with another peer + signalingSocket.on('request', async (reason, cb) => { + console.warn("Received request", reason) + if (reason['method'] == 'newConsumer') { + cb(); + } + }); + + //Join + const { id, role/*, peers*/ } = await sendRequest(signalingSocket, 'join', { + nickname: "nickname", + rtpCapabilities: rtpParameters + }) + assert.equal(id, peerId) + assert.equal(role, 31) + + //Create sending transport + const senderTransportInfo = await sendRequest(signalingSocket, 'createPlainTransport', { + producing: true, + consuming: false, + }) + + //Create consuming transport + const consumerTransportInfo = await sendRequest(signalingSocket, 'createPlainTransport', { + producing: false, + consuming: true, + }) + + await sendRequest(signalingSocket, 'connectPlainTransport', { + transportId: consumerTransportInfo.id, + ip: '127.0.0.1', + port: receiverPort, + rtcpPort: receiverRtcpPort, + }) + + //Create sending producer + await sendRequest(signalingSocket, 'produce', { + transportId: senderTransportInfo.id, + kind: 'video', + + rtpParameters: { + codecs: [ + { + mimeType: "video/H264", + payloadType: 125, + clockRate: 90000, + parameters: { + "level-asymmetry-allowed": 1, + "packetization-mode": 1, + "profile-level-id": "42e01f", + }, + }, + ], + encodings: [{ ssrc: 2222 }] + }, + appData: { + source: 'webcam' + } + }) + + return {senderTransportInfo, consumerTransportInfo, signalingSocket}; +} + before(function (done) { process.env.SSL_CERT = "../../docker/certs/kolab.hosted.com.cert" process.env.SSL_KEY = "../../docker/certs/kolab.hosted.com.key" process.env.REDIS_IP = "none" process.env.MEDIASOUP_NUM_WORKERS = 1 app = require('../server.js') request = request(app); + + recvUdpSocket = udp.createSocket('udp4'); + recvUdpSocket.on('message',function(msg,info){ + console.warn("Received message", msg, info) + }); + + recvRtcpUdpSocket = udp.createSocket('udp4'); + recvRtcpUdpSocket.on('message',function(msg,info){ + console.warn("Received RTCP message", msg, info) + }); + app.on("ready", function(){ done(); }); }); -describe('Join room', function() { +describe('Testbench', function() { const roomId = "room1"; - let transportInfos = []; - - async function sendRequest(socket, method, data = null) { - return await new Promise((resolve, /*reject*/) => { - socket.emit( - 'request', - {method: method, - data: data}, - (error, response) => { - assert(!error) - resolve(response) - } - ) - }) - } - - it('create room', async () => { - let signalingSocket - let peerId - await request - .post(`/meetmedia/api/sessions/${roomId}/connection`) - .send({role: 31}) - .expect(200) - .then(async (res) => { - let data = res.body; - peerId = data['id']; - const signalingUrl = data['token']; - assert(signalingUrl.includes(peerId)) - assert(signalingUrl.includes(roomId)) - console.info(signalingUrl); - - signalingSocket = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false }); - let roomReady = new Promise((resolve, /*reject*/) => { - signalingSocket.on('notification', (reason) => { - if (reason['method'] == 'roomReady') { - resolve(); - } - }); - }) - - signalingSocket.connect(); - await roomReady - }) - .catch(err => { console.warn(err); throw err }) - - //Join - const { id, role, peers } = await sendRequest(signalingSocket, 'join', { - nickname: "nickname", - rtpCapabilities: rtpParameters - }) - assert.equal(id, peerId) - assert.equal(role, 31) - assert.equal(peers.length, 0) - - //Create sending transport - let transportInfo = await sendRequest(signalingSocket, 'createPlainTransport', { - producing: true, - consuming: false, - }) - - //Create sending producer - await sendRequest(signalingSocket, 'produce', { - transportId: transportInfo.id, - kind: 'video', - - rtpParameters: { - codecs: [ - { - mimeType: "video/H264", - payloadType: 125, - clockRate: 90000, - parameters: { - "level-asymmetry-allowed": 1, - "packetization-mode": 1, - "profile-level-id": "42e01f", - }, - }, - ], - encodings: [{ ssrc: 2222 }] - }, - appData: { - source: 'webcam' - } - }) - - console.warn(transportInfo); - transportInfos.push(transportInfo) - }); + let peers = []; - it('second participant room', async () => { - let signalingSocket - let peerId - await request - .post(`/meetmedia/api/sessions/${roomId}/connection`) - .send({role: 31}) - .expect(200) - .then(async (res) => { - let data = res.body; - peerId = data['id']; - const signalingUrl = data['token']; - assert(signalingUrl.includes(peerId)) - assert(signalingUrl.includes(roomId)) - console.info(signalingUrl); - - signalingSocket = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false }); - let roomReady = new Promise((resolve, /*reject*/) => { - signalingSocket.on('notification', (reason) => { - if (reason['method'] == 'roomReady') { - resolve(); - } - }); - }) - - signalingSocket.connect(); - await roomReady - }) - .catch(err => { console.warn(err); throw err }) - - //Join - const { id, role, peers } = await sendRequest(signalingSocket, 'join', { - nickname: "nickname", - rtpCapabilities: rtpParameters - }) - assert.equal(id, peerId) - assert.equal(role, 31) - assert.equal(peers.length, 1) - - //Create sending transport - let transportInfo = await sendRequest(signalingSocket, 'createPlainTransport', { - producing: true, - consuming: false, - }) + it('prepare udp sockets', async () => { + await new Promise(resolve => recvUdpSocket.bind(22222, '127.0.0.1', resolve)); + await new Promise(resolve => recvRtcpUdpSocket.bind(22223, '127.0.0.1', resolve)); + } - //Create sending producer - await sendRequest(signalingSocket, 'produce', { - transportId: transportInfo.id, - kind: 'video', - - rtpParameters: { - codecs: [ - { - mimeType: "video/H264", - payloadType: 125, - clockRate: 90000, - parameters: { - "level-asymmetry-allowed": 1, - "packetization-mode": 1, - "profile-level-id": "42e01f", - }, - }, - ], - encodings: [{ ssrc: 2222 }] - }, - appData: { - source: 'webcam' - } - }) + it('create peers', async () => { + for (var i = 0; i < 2; i++) { + peers.push(await createPeer(roomId, request, recvUdpSocket.address().port, recvRtcpUdpSocket.address().port)) + } - console.warn(transportInfo); - transportInfos.push(transportInfo) + // // Create a node-sctp receiving Socket. }); - it('ffmpeg', async () => { - processes.push(startFFMPEGStream(transportInfos, 2222)) + it('start ffmpg stream', async () => { + processes.push(startFFMPEGStream(peers, 2222)) }); - it('wait', async () => { - let recResolve; - const promise = new Promise((res, _rej) => { - recResolve = res; - }); + it('wait forever', async () => { + const promise = new Promise((res, _rej) => {}); return promise; }) - - // it('second peer joining', async () => { - // return request - // .post(`/meetmedia/api/sessions/${roomId}/connection`) - // .expect(200) - // .then(async (res) => { - // let data = res.body; - // const newId = data['id']; - // const signalingUrl = data['token']; - - // let signalingSocket2 = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false }); - - // let roomReady = new Promise((resolve, /*reject*/) => { - // signalingSocket2.on('notification', async (reason) => { - // if (reason['method'] == 'roomReady') { - // resolve(reason); - // } - // }); - // }) - - // let newPeer = new Promise((resolve, /*reject*/) => { - // signalingSocket.on('notification', (reason) => { - // if (reason.method == 'newPeer') { - // resolve(reason); - // } - // }); - // }) - - // signalingSocket.connect(); - - - // let reason = await roomReady; - // const { peers } = await sendRequest(signalingSocket2, 'join', { - // nickname: "nickname", - // rtpCapabilities: rtpParameters - // }) - // assert.equal(peers.length, 1) - // assert.equal(peers[0].id, peerId) - - // reason = await newPeer; - // assert(reason.data.id == newId); - // }) - // .catch(err => { console.warn(err); throw err }) - // }); }); after(function () { for (const process of processes) { process.kill() } process.exit(); }) diff --git a/meet/server/test/test.js b/meet/server/test/test.js index 9f52bea1..5bffc90d 100644 --- a/meet/server/test/test.js +++ b/meet/server/test/test.js @@ -1,199 +1,200 @@ const assert = require('assert'); let request = require('supertest') const io = require("socket.io-client"); const mediasoupClient = require('mediasoup-client'); const { FakeHandler } = require('mediasoup-client/lib/handlers/FakeHandler'); const fakeParameters = require('./fakeParameters'); let app before(function (done) { process.env.SSL_CERT = "../../docker/certs/kolab.hosted.com.cert" process.env.SSL_KEY = "../../docker/certs/kolab.hosted.com.key" process.env.REDIS_IP = "none" // process.env.DEBUG = '*' app = require('../server.js') request = request(app); app.on("ready", function(){ done(); }); }); describe('GET /ping', function() { it('responds', function(done) { request .get('/meetmedia/api/ping') .expect(200, done); }); }); describe('Join room', function() { const roomId = "room1"; let signalingSocket let peerId async function sendRequest(socket, method, data = null) { return await new Promise((resolve, /*reject*/) => { socket.emit( 'request', {method: method, data: data}, (error, response) => { assert(!error) resolve(response) } ) }) } it('create room', async () => { return request .post(`/meetmedia/api/sessions/${roomId}/connection`) + .send({role: 31}) .expect(200) .then(async (res) => { let data = res.body; peerId = data['id']; const signalingUrl = data['token']; assert(signalingUrl.includes(peerId)) assert(signalingUrl.includes(roomId)) console.info(signalingUrl); signalingSocket = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false }); let roomReady = new Promise((resolve, /*reject*/) => { signalingSocket.on('notification', (reason) => { if (reason['method'] == 'roomReady') { resolve(); } }); }) signalingSocket.connect(); await roomReady }) .catch(err => { console.warn(err); throw err }) }); it('getRtpCapabilities', async () => { const routerRtpCapabilities = await sendRequest(signalingSocket, 'getRouterRtpCapabilities') assert(Object.keys(routerRtpCapabilities).length != 0) }); it('join', async () => { const { id, role, peers } = await sendRequest(signalingSocket, 'join', { nickname: "nickname", rtpCapabilities: fakeParameters.generateNativeRtpCapabilities() }) assert.equal(id, peerId) - assert.equal(role, 0) + assert.equal(role, 31) assert.equal(peers.length, 0) }) it('second peer joining', async () => { return request .post(`/meetmedia/api/sessions/${roomId}/connection`) .expect(200) .then(async (res) => { let data = res.body; const newId = data['id']; const signalingUrl = data['token']; let signalingSocket2 = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false }); let roomReady = new Promise((resolve, /*reject*/) => { signalingSocket2.on('notification', async (reason) => { if (reason['method'] == 'roomReady') { resolve(reason); } }); }) let newPeer = new Promise((resolve, /*reject*/) => { signalingSocket.on('notification', (reason) => { if (reason.method == 'newPeer') { resolve(reason); } }); }) signalingSocket.connect(); let reason = await roomReady; const { peers } = await sendRequest(signalingSocket2, 'join', { nickname: "nickname", rtpCapabilities: fakeParameters.generateNativeRtpCapabilities() }) assert.equal(peers.length, 1) assert.equal(peers[0].id, peerId) reason = await newPeer; assert(reason.data.id == newId); }) .catch(err => { console.warn(err); throw err }) }); let transportInfo; it('createWebRtcTransport', async () => { transportInfo = await sendRequest(signalingSocket, 'createWebRtcTransport', { forceTcp: false, producing: true, consuming: false }) const { id, iceParameters, iceCandidates, dtlsParameters } = transportInfo console.warn(id); }); it('createDevice', async () => { let device; try{ device = new mediasoupClient.Device({ handlerFactory: FakeHandler.createFactory(fakeParameters) }); let caps = fakeParameters.generateRouterRtpCapabilities(); await device.load({routerRtpCapabilities: caps}) assert(device.canProduce('video')) console.info(transportInfo) const { id, iceParameters, iceCandidates, dtlsParameters } = transportInfo //FIXME it doesn't look like this device can actually connect let sendTransport = device.createSendTransport({ id, iceParameters, iceCandidates, dtlsParameters, // iceServers: turnServers, // iceTransportPolicy: iceTransportPolicy, proprietaryConstraints: { optional: [{ googDscp: true }] } }) sendTransport.on('connect', ({ dtlsParameters }, callback, errback) => { console.warn("on connect"); // done(); // socket.sendRequest('connectWebRtcTransport', // { transportId: sendTransport.id, dtlsParameters }) // .then(callback) // .catch(errback) }) //TODO we should get it to connected // assert.equal(sendTransport.connectionState, 'new'); } catch (error) { console.warn(error) } }); after(function () { signalingSocket.close(); }) }); after(function () { process.exit(); })