diff --git a/docker-compose.yml b/docker-compose.yml index e7b0c0c3..a800ede5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,169 +1,168 @@ version: '3' services: coturn: build: context: ./docker/coturn/ container_name: kolab-coturn environment: - - REDIS_DBNAME=${COTURN_REDIS_DATABASE} - - REDIS_PASSWORD=${COTURN_REDIS_PASSWORD} - - REDIS_IP=${COTURN_REDIS_IP} - TURN_PUBLIC_IP=${COTURN_PUBLIC_IP} - TURN_LISTEN_PORT=3478 + - TURN_STATIC_SECRET==${COTURN_STATIC_SECRET} hostname: sturn.mgmt.com image: kolab-coturn network_mode: host restart: on-failure tty: true kolab: build: context: ./docker/kolab/ container_name: kolab depends_on: - mariadb extra_hosts: - "kolab.mgmt.com:127.0.0.1" environment: - DB_HOST=${DB_HOST} - DB_ROOT_PASSWORD=Welcome2KolabSystems healthcheck: interval: 10s test: test -f /tmp/kolab-init.done timeout: 5s retries: 30 hostname: kolab.mgmt.com image: kolab network_mode: host tmpfs: - /run - /tmp - /var/run - /var/tmp tty: true volumes: - /etc/letsencrypt/:/etc/letsencrypt/:ro - ./docker/certs/ca.cert:/etc/pki/tls/certs/ca.cert:ro - ./docker/certs/ca.cert:/etc/pki/ca-trust/source/anchors/ca.cert:ro - ./docker/certs/kolab.hosted.com.cert:/etc/pki/tls/certs/kolab.hosted.com.cert - ./docker/certs/kolab.hosted.com.key:/etc/pki/tls/certs/kolab.hosted.com.key - ./docker/certs/kolab.mgmt.com.cert:/etc/pki/tls/certs/kolab.mgmt.com.cert - ./docker/certs/kolab.mgmt.com.key:/etc/pki/tls/certs/kolab.mgmt.com.key - ./docker/kolab/utils:/root/utils:ro - ./src/.env:/.dockerenv:ro - /sys/fs/cgroup:/sys/fs/cgroup:ro mariadb: container_name: kolab-mariadb environment: MYSQL_ROOT_PASSWORD: Welcome2KolabSystems TZ: "+02:00" healthcheck: interval: 10s test: test -e /var/run/mysqld/mysqld.sock timeout: 5s retries: 30 image: mariadb network_mode: host nginx: build: context: ./docker/nginx/ args: APP_WEBSITE_DOMAIN: ${APP_WEBSITE_DOMAIN:?err} container_name: kolab-nginx hostname: nginx.hosted.com image: kolab-nginx network_mode: host tmpfs: - /run - /tmp - /var/run - /var/tmp tty: true volumes: - ./docker/certs/imap.hosted.com.cert:/etc/pki/tls/certs/imap.hosted.com.cert - ./docker/certs/imap.hosted.com.key:/etc/pki/tls/private/imap.hosted.com.key pdns-sql: build: context: ./docker/pdns-sql/ container_name: kolab-pdns-sql depends_on: - mariadb hostname: pdns-sql image: apheleia/kolab-pdns-sql network_mode: host tmpfs: - /run - /tmp - /var/run - /var/tmp tty: true volumes: - /sys/fs/cgroup:/sys/fs/cgroup:ro proxy: build: context: ./docker/proxy/ container_name: kolab-proxy hostname: kanarip.internet-box.ch image: kolab-proxy network_mode: host tmpfs: - /run - /tmp - /var/run - /var/tmp tty: true volumes: - ./docker/certs/:/etc/certs/:ro - /etc/letsencrypt/:/etc/letsencrypt/:ro - /sys/fs/cgroup:/sys/fs/cgroup:ro redis: build: context: ./docker/redis/ container_name: kolab-redis hostname: redis image: redis network_mode: host volumes: - ./docker/redis/redis.conf:/usr/local/etc/redis/redis.conf:ro swoole: build: context: ./docker/swoole/ container_name: kolab-swoole image: apheleia/swoole:4.6.x worker: build: context: ./docker/worker/ container_name: kolab-worker depends_on: - kolab hostname: worker image: kolab-worker network_mode: host tmpfs: - /run - /tmp - /var/run - /var/tmp tty: true volumes: - ./src:/home/worker/src.orig:ro - /sys/fs/cgroup:/sys/fs/cgroup:ro meet: build: context: ./docker/meet/ environment: - REDIS_IP=${MEET_REDIS_IP} - REDIS_PORT=${MEET_REDIS_PORT} - REDIS_DBNAME=${MEET_REDIS_DATABASE} - REDIS_PASSWORD=${MEET_REDIS_PASSWORD} - PUBLIC_IP=${MEET_PUBLIC_IP} - PUBLIC_DOMAIN=${MEET_PUBLIC_DOMAIN} - TURN_SERVER=${MEET_TURN_SERVER} + - TURN_STATIC_SECRET==${COTURN_STATIC_SECRET} network_mode: host container_name: kolab-meet image: kolab-meet volumes: - /etc/letsencrypt/:/etc/letsencrypt/:ro - ./meet/server:/src/meet/:ro - ./docker/meet/build/node_modules:/root/node_modules - ./docker/certs/kolab.hosted.com.cert:/etc/pki/tls/certs/kolab.hosted.com.cert - ./docker/certs/kolab.hosted.com.key:/etc/pki/tls/certs/kolab.hosted.com.key diff --git a/docker/coturn/rootfs/usr/local/bin/coturn.sh b/docker/coturn/rootfs/usr/local/bin/coturn.sh index 8af7612f..983d1e1b 100755 --- a/docker/coturn/rootfs/usr/local/bin/coturn.sh +++ b/docker/coturn/rootfs/usr/local/bin/coturn.sh @@ -1,28 +1,30 @@ #!/bin/bash cd /tmp/ cat > ./turnserver.conf << EOF external-ip=${TURN_PUBLIC_IP:-127.0.0.1} listening-port=${TURN_LISTEN_PORT:-3478} fingerprint -lt-cred-mech # Temporary for testing -user=username1:password1 allow-loopback-peers cli-password=qwerty # Disabled by default to avoid DoS attacks. Logs all bind attempts in verbose log mode (useful for debugging) log-binding max-port=${MAX_PORT:-65535} min-port=${MIN_PORT:-40000} pidfile="$(pwd)/turnserver.pid" realm=kolabmeet log-file=stdout -redis-userdb="ip=${REDIS_IP:-127.0.0.1} dbname=${REDIS_DBNAME:-2} password=${REDIS_PASSWORD:-turn} connect_timeout=30" + +# Dynamically generate username/password for turn +use-auth-secret +static-auth-secret=${TURN_STATIC_SECRET:-uzYguvIl9tpZFMuQOE78DpOi6Jc7VFSD0UAnvgMsg5n4e74MgIf6vQvbc6LWzZjz} + verbose EOF /usr/bin/turnserver -c ./turnserver.conf diff --git a/meet/server/config/config.js b/meet/server/config/config.js index 2a58fad8..3a020791 100644 --- a/meet/server/config/config.js +++ b/meet/server/config/config.js @@ -1,174 +1,159 @@ const os = require('os'); module.exports = { - // URI and key for requesting geoip-based TURN server closest to the client - // turnAPIKey : 'examplekey', - // turnAPIURI : 'https://example.com/api/turn', - // turnAPIparams : { - // 'uri_schema' : 'turn', - // 'transport' : 'tcp', - // 'ip_ver' : 'ipv4', - // 'servercount' : '2' - // }, - // turnAPITimeout : 2 * 1000, - // Backup turnservers if REST fails or is not configured - backupTurnServers: process.env.TURN_SERVER == 'none' ? [] : [ - { - urls : [ - process.env.TURN_SERVER || 'turn:127.0.0.1:3478?transport=tcp' - ], - //FIXME we use hardcoded credentials for now - username : 'username1', - credential : 'password1' - } - ], + turn: { + urls : [ + process.env.TURN_SERVER || 'turn:127.0.0.1:3478?transport=tcp' + ], + staticSecret: process.env.TURN_STATIC_SECRET || 'uzYguvIl9tpZFMuQOE78DpOi6Jc7VFSD0UAnvgMsg5n4e74MgIf6vQvbc6LWzZjz', + }, // redis server options used for session storage redisOptions : { host: process.env.REDIS_IP || '127.0.0.1', port: process.env.REDIS_PORT || 6379, db: process.env.REDIS_DB || '3', ...(process.env.REDIS_PASSWORD ? {password: process.env.REDIS_PASSWORD} : {}) }, // session cookie secret cookieSecret : 'T0P-S3cR3t_cook!e', cookieName : 'kolabmeet.sid', // if you use encrypted private key the set the passphrase tls: { // passphrase: 'key_password' cert: process.env.SSL_CERT || `/etc/pki/tls/certs/kolab.hosted.com.cert`, key: process.env.SSL_KEY || `/etc/pki/tls/certs/kolab.hosted.com.key`, }, // listening Host or IP // If omitted listens on every IP. ("0.0.0.0" and "::") listeningHost: process.env.LISTENING_HOST || '0.0.0.0', // Listening port for https server. listeningPort: process.env.LISTENING_PORT || 12443, // Any http request is redirected to https. // Listening port for http server. listeningRedirectPort : 12080, // Listens only on http, only on listeningPort // listeningRedirectPort disabled // use case: loadbalancer backend httpOnly : true, publicDomain : process.env.PUBLIC_DOMAIN || '127.0.0.1:12443', pathPrefix : '/meetmedia', // WebServer/Express trust proxy config for httpOnly mode // You can find more info: // - https://expressjs.com/en/guide/behind-proxies.html // - https://www.npmjs.com/package/proxy-addr // use case: loadbalancer backend trustProxy : '', // When truthy, the room will be open to all users when as long as there // are allready users in the room activateOnHostJoin : true, // When set, maxUsersPerRoom defines how many users can join // a single room. If not set, there is no limit. // maxUsersPerRoom : 20, // Room size before spreading to new router routerScaleSize : process.env.ROUTER_SCALE_SIZE || 40, // Socket timout value requestTimeout : 20000, // Socket retries when timeout requestRetries : 3, // Mediasoup settings mediasoup : { numWorkers : process.env.MEDIASOUP_NUM_WORKERS || Object.keys(os.cpus()).length, // mediasoup Worker settings. worker : { logLevel : 'warn', logTags : [ 'info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp' ], rtcMinPort : 40000, rtcMaxPort : 49999 }, // mediasoup Router settings. router : { // Router media codecs. mediaCodecs : [ { kind : 'audio', mimeType : 'audio/opus', clockRate : 48000, channels : 2 }, { kind : 'video', mimeType : 'video/VP8', clockRate : 90000, parameters : { 'x-google-start-bitrate' : 1000 } }, { kind : 'video', mimeType : 'video/VP9', clockRate : 90000, parameters : { 'profile-id' : 2, 'x-google-start-bitrate' : 1000 } }, { kind : 'video', mimeType : 'video/h264', clockRate : 90000, parameters : { 'packetization-mode' : 1, 'profile-level-id' : '4d0032', 'level-asymmetry-allowed' : 1, 'x-google-start-bitrate' : 1000 } }, { kind : 'video', mimeType : 'video/h264', clockRate : 90000, parameters : { 'packetization-mode' : 1, 'profile-level-id' : '42e01f', 'level-asymmetry-allowed' : 1, 'x-google-start-bitrate' : 1000 } } ] }, // mediasoup WebRtcTransport settings. webRtcTransport: { listenIps: [ { ip: process.env.PUBLIC_IP || '127.0.0.1', announcedIp: null } ], initialAvailableOutgoingBitrate: 1000000, minimumAvailableOutgoingBitrate: 600000, // Additional options that are not part of WebRtcTransportOptions. maxIncomingBitrate: 1500000 } } /* , // Prometheus exporter prometheus : { deidentify : false, // deidentify IP addresses // listen : 'localhost', // exporter listens on this address numeric : false, // show numeric IP addresses port : 8889, // allocated port quiet : false // include fewer labels } */ }; diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js index 9aa8b148..d6a66946 100644 --- a/meet/server/lib/Room.js +++ b/meet/server/lib/Room.js @@ -1,1160 +1,1157 @@ const EventEmitter = require('events').EventEmitter; const AwaitQueue = require('awaitqueue'); -const axios = require('axios'); +const crypto = require('crypto'); const Logger = require('./Logger'); const { SocketTimeoutError } = require('./errors'); const Roles = require('./userRoles'); const config = require('../config/config'); const logger = new Logger('Room'); const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter { static calculateLoads(mediasoupWorkers, peers, mediasoupRouters) { const routerLoads = new Map(); const workerLoads = new Map(); const pipedRoutersIds = new Set(); // Calculate router loads by adding up peers per router, and collected piped routers for (const peer of peers) { const routerId = peer.routerId; if (routerId) { if (mediasoupRouters.has(routerId)) { pipedRoutersIds.add(routerId); } if (routerLoads.has(routerId)) { routerLoads.set(routerId, routerLoads.get(routerId) + 1); } else { routerLoads.set(routerId, 1); } } } // Calculate worker loads by adding up router loads per worker for (const worker of mediasoupWorkers) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (workerLoads.has(worker._pid)) { workerLoads.set(worker._pid, workerLoads.get(worker._pid) + (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } else { workerLoads.set(worker._pid, (routerLoads.has(routerId)?routerLoads.get(routerId):0)); } } } return {routerLoads, workerLoads, pipedRoutersIds}; } /* * Find a router that is on a worker that is least loaded. * * A worker with a router that we are already piping to is preferred. */ static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters) { const {workerLoads, pipedRoutersIds} = Room.calculateLoads(mediasoupWorkers, peers.values(), mediasoupRouters); const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort( (a, b) => a[1] - b[1])); // we don't care about if router is piped, just choose the least loaded worker if (pipedRoutersIds.size === 0 || pipedRoutersIds.size === mediasoupRouters.size) { const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } else { // find if there is a piped router that is on a worker that is below limit for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries()) { for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; // on purpose we check if the worker load is below the limit, // as in reality the worker load is imortant, // not the router load if (mediasoupRouters.has(routerId) && pipedRoutersIds.has(routerId) && workerLoad < ROUTER_SCALE_SIZE) { return routerId; } } } } } // no piped router found, we need to return router from least loaded worker const workerId = sortedWorkerLoads.keys().next().value; for (const worker of mediasoupWorkers) { if (worker._pid === workerId) { for (const router of worker._routers) { const routerId = router._internal.routerId; if (mediasoupRouters.has(routerId)) { return routerId; } } } } } } /** * Factory function that creates and returns Room instance. * * @async * * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ static async create({ mediasoupWorkers, roomId, peers }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediasoupRouters = new Map(); for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); mediasoupRouters.set(router.id, router); } const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter( mediasoupWorkers, peers, mediasoupRouters)); // Create a mediasoup AudioLevelObserver on first router const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); return new Room({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }); } constructor({ roomId, mediasoupRouters, audioLevelObserver, mediasoupWorkers, peers }) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // this._uuid = uuidv4(); this._mediasoupWorkers = mediasoupWorkers; // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Joining queue this._queue = new AwaitQueue(); this._peers = peers; this._selfDestructTimeout = null; // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; this._audioLevelObserver = audioLevelObserver; } dumpStats() { const peers = this.getPeers(); const {routerLoads, workerLoads, pipedRoutersIds} = Room.calculateLoads(this._mediasoupWorkers, peers, this._mediasoupRouters); let stats = { numberOfWorkers: this._mediasoupWorkers.length, numberOfRouters: this._mediasoupRouters.size, numberOfPeers: peers.length, routerLoads: routerLoads, workerLoads: workerLoads, pipedRoutersIds: pipedRoutersIds, }; console.log(stats); } close() { logger.debug('close()'); this._closed = true; this._queue.close(); this._queue = null; if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = null; // Close the peers. for (const peer of this._peers.values()) { if (!peer.closed) peer.close(); } this._peers = {}; // Close the mediasoup Routers. for (const router of this._mediasoupRouters.values()) { router.close(); } this._mediasoupWorkers = null; this._mediasoupRouters.clear(); this._audioLevelObserver = null; // Emit 'close' event. this.emit('close'); } handlePeer({ peer }) { logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role); // Should not happen if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', peer.id); } this._peerJoining(peer); } logStatus() { logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, Object.keys(this._peers).length ); } dump() { return { roomId : this._roomId, peers : Object.keys(this._peers).length }; } get id() { return this._roomId; } selfDestructCountdown() { logger.debug('selfDestructCountdown() started'); clearTimeout(this._selfDestructTimeout); this._selfDestructTimeout = setTimeout(() => { if (this._closed) return; if (this.checkEmpty()) { logger.info( 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } else logger.debug('selfDestructCountdown() aborted; room is not empty!'); }, 10000); } checkEmpty() { return Object.keys(this._peers).length === 0; } + _getTURNCredentials(name, secret) { + const unixTimeStamp = parseInt(Date.now()/1000) + 24*3600; // this credential would be valid for the next 24 hours + // If there is no name, the timestamp alone can also be used. + const username = name ? `${unixTimeStamp}:${name}` : `${unixTimeStamp}`; + const hmac = crypto.createHmac('sha1', secret); + hmac.setEncoding('base64'); + hmac.write(username); + hmac.end(); + const password = hmac.read(); + return { + username, + password + }; + } + _peerJoining(peer) { this._queue.push(async () => { peer.socket.join(this._roomId); this._peers[peer.id] = peer; // Assign routerId peer.routerId = await this._getRouterId(); this._handlePeer(peer); - let iceServers; - - if ('turnAPIURI' in config) { - try { - const { data } = await axios.get( - config.turnAPIURI, - { - timeout : config.turnAPITimeout || 2000, - params : { - ...config.turnAPIparams, - 'api_key' : config.turnAPIKey, - 'ip' : peer.socket.request.connection.remoteAddress - } - }); + let iceServers = [] + if ('turn' in config) { + // Generate time-limited credentials. The name is only relevant for the logs. + const {username, password} = this._getTURNCredentials(peer.id, config.turn.staticSecret); - iceServers = [ { - urls : data.uris, - username : data.username, - credential : data.password - } ]; - } catch (error) { - if ('backupTurnServers' in config && config.backupTurnServers.length) - iceServers = config.backupTurnServers; - - logger.error('_peerJoining() | error on REST turn [error:"%o"]', error); - } - } else if ('backupTurnServers' in config && config.backupTurnServers.length) { - iceServers = config.backupTurnServers; + iceServers = [ { + urls : config.turn.urls, + username : username, + credential : password + } ]; } this._notification(peer.socket, 'roomReady', { iceServers }); }) .catch((error) => { logger.error('_peerJoining() [error:"%o"]', error); }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); peer.on('close', () => { this._handlePeerClose(peer); }); peer.on('nicknameChanged', () => { // Spread to others this._notification(peer.socket, 'changeNickname', { peerId: peer.id, nickname: peer.nickname }, true); }); peer.on('gotRole', ({ newRole }) => { // Spread to others this._notification(peer.socket, 'changeRole', { peerId: peer.id, role: newRole }, true, true); }); peer.socket.on('request', (request, cb) => { logger.debug( 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); // Peer left before we were done joining if (peer.closed) this._handlePeerClose(peer); } _handlePeerClose(peer) { logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); if (this._closed) return; this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); delete this._peers[peer.id]; // If this is the last Peer in the room close the room after a while. if (this.checkEmpty()) this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) { const router = this._mediasoupRouters.get(peer.routerId); switch (request.method) { case 'getRouterRtpCapabilities': { cb(null, router.rtpCapabilities); break; } case 'dumpStats': { this.dumpStats() cb(null); break; } case 'join': { const { nickname, rtpCapabilities } = request.data; // Store client data into the Peer data object. peer.nickname = nickname; peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. const otherPeers = this.getPeers(peer); const peerInfos = otherPeers.map(otherPeer => otherPeer.peerInfo); cb(null, { id: peer.id, role: peer.role, peers: peerInfos, }); // Create Consumers for existing Producers. for (const otherPeer of otherPeers) { for (const producer of otherPeer.producers.values()) { this._createConsumer({ consumerPeer: peer, producerPeer: otherPeer, producer }); } } // Notify the new Peer to all other Peers. this._notification(peer.socket, 'newPeer', peer.peerInfo, true); logger.debug( 'peer joined [peer: "%s", nickname: "%s"]', peer.id, nickname); break; } case 'createPlainTransport': { const { producing, consuming } = request.data; const transport = await router.createPlainTransport( { //When consuming we manually connect using connectPlainTransport, //otherwise we let the port autodetection work. comedia: producing, // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP) rtcpMux: false, listenIp: { ip: "127.0.0.1", announcedIp: null }, appData : { producing, consuming } } ); // await transport.enableTraceEvent([ "probation", "bwe" ]); // transport.on("trace", (trace) => { // console.log(trace); // }); peer.addTransport(transport.id, transport); cb( null, { id : transport.id, ip : transport.tuple.localIp, port : transport.tuple.localPort, rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined }); break; } case 'connectPlainTransport': { const { transportId, ip, port, rtcpPort } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ ip: ip, port: port, rtcpPort: rtcpPort, }); cb(); break; } case 'createWebRtcTransport': { // NOTE: Don't require that the Peer is joined here, so the client can // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransport, appData : { producing, consuming } }; webRtcTransportOptions.enableTcp = true; if (forceTcp) webRtcTransportOptions.enableUdp = false; else { webRtcTransportOptions.enableUdp = true; webRtcTransportOptions.preferUdp = true; } const transport = await router.createWebRtcTransport( webRtcTransportOptions ); transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') { logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); } }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); cb( null, { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters }); const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } catch (error) { logger.info("Setting the incoming bitrate failed") } } break; } case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); await transport.connect({ dtlsParameters }); cb(); break; } /* case 'restartIce': { const { transportId } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); const iceParameters = await transport.restartIce(); cb(null, iceParameters); break; } */ case 'produce': { let { appData } = request.data; if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source)) throw new Error('invalid producer source'); if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER)) throw new Error('peer not authorized'); const { transportId, kind, rtpParameters } = request.data; const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); // Add peerId into appData to later get the associated Peer during // the 'loudest' event of the audioLevelObserver. appData = { ...appData, peerId: peer.id }; const producer = await transport.produce({ kind, rtpParameters, appData }); const pipeRouters = this._getRoutersToPipeTo(peer.routerId); for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, router : destinationRouter }); } } // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); producer.on('videoorientationchange', (videoOrientation) => { logger.debug( 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); // Trace individual packets for debugging // await producer.enableTraceEvent([ "rtp", "pli", "keyframe", "nack" ]); // producer.on("trace", (trace) => { // console.log(`Trace on ${producer.id}`, trace); // }); cb(null, { id: producer.id }); // Optimization: Create a server-side Consumer for each Peer. for (const otherPeer of this.getPeers(peer)) { this._createConsumer( { consumerPeer : otherPeer, producerPeer : peer, producer }); } // Add into the audioLevelObserver. if (kind === 'audio') { this._audioLevelObserver.addProducer({ producerId: producer.id }) .catch(() => {}); } break; } case 'closeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); producer.close(); // Remove from its map. peer.removeProducer(producer.id); cb(); break; } case 'pauseProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.pause(); cb(); break; } case 'resumeProducer': { const { producerId } = request.data; const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); await producer.resume(); cb(); break; } case 'pauseConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.pause(); cb(); break; } case 'resumeConsumer': { const { consumerId } = request.data; const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); await consumer.resume(); cb(); break; } case 'changeNickname': { const { nickname } = request.data; peer.nickname = nickname; // This will be spread through events from the peer object // Return no error cb(); break; } case 'chatMessage': { const { message } = request.data; // Spread to others this._notification(peer.socket, 'chatMessage', { peerId: peer.id, nickname: peer.nickname, message: message }, true, true); // Return no error cb(); break; } case 'moderator:addRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (!rolePeer.hasRole(role)) { // This will propagate the event automatically rolePeer.setRole(rolePeer.role | role); } // Return no error cb(); break; } case 'moderator:removeRole': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId, role } = request.data; const rolePeer = this._peers[peerId]; if (!rolePeer) throw new Error(`peer with id "${peerId}" not found`); if (!rolePeer.isValidRole(role)) throw new Error('invalid role'); if (rolePeer.hasRole(role)) { // This will propagate the event automatically rolePeer.setRole(rolePeer.role ^ role); } // Return no error cb(); break; } case 'moderator:closeRoom': { if (!peer.hasRole(Roles.OWNER)) throw new Error('peer not authorized'); this._notification(peer.socket, 'moderator:closeRoom', null, true); cb(); // Close the room this.close(); // TODO: remove the room? break; } case 'moderator:kickPeer': { if (!peer.hasRole(Roles.MODERATOR)) throw new Error('peer not authorized'); const { peerId } = request.data; const kickPeer = this._peers[peerId]; if (!kickPeer) throw new Error(`peer with id "${peerId}" not found`); this._notification(kickPeer.socket, 'moderator:kickPeer'); kickPeer.close(); cb(); break; } case 'raisedHand': { const { raisedHand } = request.data; peer.raisedHand = raisedHand; // Spread to others this._notification(peer.socket, 'raisedHand', { peerId: peer.id, raisedHand: raisedHand, }, true); // Return no error cb(); break; } default: { logger.error('unknown request.method "%s"', request.method); cb(500, `unknown request.method "${request.method}"`); } } } /** * Creates a mediasoup Consumer for the given mediasoup Producer. * * @async */ async _createConsumer({ consumerPeer, producerPeer, producer }) { logger.debug( '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', consumerPeer.id, producerPeer.id, producer.id ); const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. // - Upon receipt of the response, resume the server-side Consumer. // - If video, this will mean a single key frame requested by the // server-side Consumer (when resuming it). // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities }) ) { return; } // Must take the Transport the remote Peer is using for consuming. const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) { logger.warn('_createConsumer() | Transport for consuming not found'); return; } // Create the Consumer in paused mode. let consumer; try { consumer = await transport.consume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); if (producer.kind === 'audio') await consumer.setPriority(255); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); return; } // Trace individual packets for debugging // await consumer.enableTraceEvent([ "rtp", "pli", "fir" ]); // consumer.on("trace", (trace) => { // console.log(`Trace on ${consumer.id}`, trace); // }); // Store the Consumer into the consumerPeer data Object. consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); consumer.on('producerpause', () => { this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id }); }); consumer.on('producerresume', () => { this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id }); }); // Send a request to the remote Peer with Consumer parameters. try { await this._request( consumerPeer.socket, 'newConsumer', { peerId : producerPeer.id, kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, type : consumer.type, appData : producer.appData, producerPaused : consumer.producerPaused } ); // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. await consumer.resume(); } catch (error) { logger.warn('_createConsumer() | [error:"%o"]', error); } } /** * Get the list of peers. */ getPeers(excludePeer = undefined) { return Object.values(this._peers) .filter((peer) => peer !== excludePeer); } _timeoutCallback(callback) { let called = false; const interval = setTimeout( () => { if (called) return; called = true; callback(new SocketTimeoutError('Request timed out')); }, config.requestTimeout || 20000 ); return (...args) => { if (called) return; called = true; clearTimeout(interval); callback(...args); }; } _sendRequest(socket, method, data = {}) { return new Promise((resolve, reject) => { socket.emit( 'request', { method, data }, this._timeoutCallback((err, response) => { if (err) { reject(err); } else { resolve(response); } }) ); }); } async _request(socket, method, data) { logger.debug('_request() [method:"%s", data:"%o"]', method, data); const { requestRetries = 3 } = config; for (let tries = 0; tries < requestRetries; tries++) { try { return await this._sendRequest(socket, method, data); } catch (error) { if ( error instanceof SocketTimeoutError && tries < requestRetries ) logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries); else throw error; } } } _notification(socket, method, data = {}, broadcast = false, includeSender = false) { if (broadcast) { socket.broadcast.to(this._roomId).emit( 'notification', { method, data } ); if (includeSender) socket.emit('notification', { method, data }); } else { socket.emit('notification', { method, data }); } } /* * Pipe producers of peers that are running under another router to this router. */ async _pipeProducersToRouter(routerId) { const router = this._mediasoupRouters.get(routerId); // All peers that have a different router const peersToPipe = Object.values(this._peers) .filter((peer) => peer.routerId !== routerId && peer.routerId !== null); for (const peer of peersToPipe) { const srcRouter = this._mediasoupRouters.get(peer.routerId); for (const producerId of peer.producers.keys()) { if (router._producers.has(producerId)) { continue; } await srcRouter.pipeToRouter({ producerId : producerId, router : router }); } } } async _getRouterId() { const routerId = Room.getLeastLoadedRouter( this._mediasoupWorkers, this._peers, this._mediasoupRouters); await this._pipeProducersToRouter(routerId); return routerId; } // Returns an array of router ids we need to pipe to: // The combined set of routers of all peers, exluding the router of the peer itself. _getRoutersToPipeTo(originRouterId) { return Object.values(this._peers) .map((peer) => peer.routerId) .filter((routerId, index, self) => routerId !== originRouterId && self.indexOf(routerId) === index ); } } module.exports = Room; diff --git a/src/.env.example b/src/.env.example index 4b6cae7c..2a898458 100644 --- a/src/.env.example +++ b/src/.env.example @@ -1,166 +1,163 @@ APP_NAME=Kolab APP_ENV=local APP_KEY= APP_DEBUG=true APP_URL=http://127.0.0.1:8000 #APP_PASSPHRASE= APP_PUBLIC_URL= APP_DOMAIN=kolabnow.com APP_WEBSITE_DOMAIN=kolabnow.com APP_THEME=default APP_TENANT_ID=5 APP_LOCALE=en APP_LOCALES= APP_WITH_ADMIN=1 APP_WITH_RESELLER=1 APP_WITH_SERVICES=1 ASSET_URL=http://127.0.0.1:8000 WEBMAIL_URL=/apps SUPPORT_URL=/support SUPPORT_EMAIL= LOG_CHANNEL=stack LOG_SLOW_REQUESTS=5 DB_CONNECTION=mysql DB_DATABASE=kolabdev DB_HOST=127.0.0.1 DB_PASSWORD=kolab DB_PORT=3306 DB_USERNAME=kolabdev BROADCAST_DRIVER=redis CACHE_DRIVER=redis QUEUE_CONNECTION=redis SESSION_DRIVER=file SESSION_LIFETIME=120 OPENEXCHANGERATES_API_KEY="from openexchangerates.org" MFA_DSN=mysql://roundcube:Welcome2KolabSystems@127.0.0.1/roundcube MFA_TOTP_DIGITS=6 MFA_TOTP_INTERVAL=30 MFA_TOTP_DIGEST=sha1 IMAP_URI=ssl://127.0.0.1:11993 IMAP_ADMIN_LOGIN=cyrus-admin IMAP_ADMIN_PASSWORD=Welcome2KolabSystems IMAP_VERIFY_HOST=false IMAP_VERIFY_PEER=false LDAP_BASE_DN="dc=mgmt,dc=com" LDAP_DOMAIN_BASE_DN="ou=Domains,dc=mgmt,dc=com" LDAP_HOSTS=127.0.0.1 LDAP_PORT=389 LDAP_SERVICE_BIND_DN="uid=kolab-service,ou=Special Users,dc=mgmt,dc=com" LDAP_SERVICE_BIND_PW="Welcome2KolabSystems" LDAP_USE_SSL=false LDAP_USE_TLS=false # Administrative LDAP_ADMIN_BIND_DN="cn=Directory Manager" LDAP_ADMIN_BIND_PW="Welcome2KolabSystems" LDAP_ADMIN_ROOT_DN="dc=mgmt,dc=com" # Hosted (public registration) LDAP_HOSTED_BIND_DN="uid=hosted-kolab-service,ou=Special Users,dc=mgmt,dc=com" LDAP_HOSTED_BIND_PW="Welcome2KolabSystems" LDAP_HOSTED_ROOT_DN="dc=hosted,dc=com" -COTURN_REDIS_IP=127.0.0.1 -COTURN_REDIS_PORT=6379 -COTURN_REDIS_DATABASE=3 -COTURN_REDIS_PASSWORD=0 COTURN_PUBLIC_IP=127.0.0.1 +COTURN_STATIC_SECRET="Welcome2KolabSystems" MEET_REDIS_IP=127.0.0.1 MEET_REDIS_PORT=6379 MEET_REDIS_DATABASE=3 MEET_REDIS_PASSWORD=0 MEET_PUBLIC_IP=127.0.0.1 MEET_PUBLIC_DOMAIN=127.0.0.1:12443 MEET_TURN_SERVER='turn:127.0.0.1:3478?transport=tcp' # "CDR" events, see https://docs.openvidu.io/en/2.13.0/reference-docs/openvidu-server-cdr/ #OPENVIDU_WEBHOOK_EVENTS=[sessionCreated,sessionDestroyed,participantJoined,participantLeft,webrtcConnectionCreated,webrtcConnectionDestroyed,recordingStatusChanged,filterEventDispatched,mediaNodeStatusChanged] #OPENVIDU_WEBHOOK_HEADERS=[\"Authorization:\ Basic\ SOMETHING\"] PGP_ENABLED= PGP_BINARY= PGP_AGENT= PGP_GPGCONF= PGP_LENGTH= REDIS_HOST=127.0.0.1 REDIS_PASSWORD=null REDIS_PORT=6379 SWOOLE_HOT_RELOAD_ENABLE=true SWOOLE_HTTP_ACCESS_LOG=true SWOOLE_HTTP_HOST=127.0.0.1 SWOOLE_HTTP_PORT=8000 SWOOLE_HTTP_REACTOR_NUM=1 SWOOLE_HTTP_WEBSOCKET=true SWOOLE_HTTP_WORKER_NUM=1 SWOOLE_OB_OUTPUT=true PAYMENT_PROVIDER= MOLLIE_KEY= STRIPE_KEY= STRIPE_PUBLIC_KEY= STRIPE_WEBHOOK_SECRET= MAIL_DRIVER=smtp MAIL_HOST=smtp.mailtrap.io MAIL_PORT=2525 MAIL_USERNAME=null MAIL_PASSWORD=null MAIL_ENCRYPTION=null MAIL_FROM_ADDRESS="noreply@example.com" MAIL_FROM_NAME="Example.com" MAIL_REPLYTO_ADDRESS="replyto@example.com" MAIL_REPLYTO_NAME=null DNS_TTL=3600 DNS_SPF="v=spf1 mx -all" DNS_STATIC="%s. MX 10 ext-mx01.mykolab.com." DNS_COPY_FROM=null AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_DEFAULT_REGION=us-east-1 AWS_BUCKET= PUSHER_APP_ID= PUSHER_APP_KEY= PUSHER_APP_SECRET= PUSHER_APP_CLUSTER=mt1 MIX_ASSET_PATH='/' MIX_PUSHER_APP_KEY="${PUSHER_APP_KEY}" MIX_PUSHER_APP_CLUSTER="${PUSHER_APP_CLUSTER}" # Generate with ./artisan passport:client --password #PASSPORT_PROXY_OAUTH_CLIENT_ID= #PASSPORT_PROXY_OAUTH_CLIENT_SECRET= PASSPORT_PRIVATE_KEY= PASSPORT_PUBLIC_KEY= COMPANY_NAME= COMPANY_ADDRESS= COMPANY_DETAILS= COMPANY_EMAIL= COMPANY_LOGO= COMPANY_FOOTER= VAT_COUNTRIES=CH,LI VAT_RATE=7.7 KB_ACCOUNT_DELETE= KB_ACCOUNT_SUSPENDED=