Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117749208
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
45 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/meet/server/lib/Room.js b/meet/server/lib/Room.js
index ff62e651..6935c8fc 100644
--- a/meet/server/lib/Room.js
+++ b/meet/server/lib/Room.js
@@ -1,1175 +1,1226 @@
const EventEmitter = require('events').EventEmitter;
const AwaitQueue = require('awaitqueue');
const axios = require('axios');
const Logger = require('./Logger');
const { SocketTimeoutError } = require('./errors');
const Roles = require('./userRoles');
const config = require('../config/config');
const logger = new Logger('Room');
const ROUTER_SCALE_SIZE = config.routerScaleSize || 40;
class Room extends EventEmitter
{
/*
* Find a router that is on a worker that is least loaded.
*
* A worker with a router that we are already piping to is preferred.
*/
static getLeastLoadedRouter(mediasoupWorkers, peers, mediasoupRouters)
{
const routerLoads = new Map();
const workerLoads = new Map();
const pipedRoutersIds = new Set();
// Calculate router loads by adding up peers per router,
// and collected piped routers
for (const peer of peers.values())
{
const routerId = peer.routerId;
if (routerId)
{
if (mediasoupRouters.has(routerId))
{
pipedRoutersIds.add(routerId);
}
if (routerLoads.has(routerId))
{
routerLoads.set(routerId, routerLoads.get(routerId) + 1);
}
else
{
routerLoads.set(routerId, 1);
}
}
}
// Calculate worker loads by adding up router loads per worker
for (const worker of mediasoupWorkers)
{
for (const router of worker._routers)
{
const routerId = router._internal.routerId;
if (workerLoads.has(worker._pid))
{
workerLoads.set(worker._pid, workerLoads.get(worker._pid) +
(routerLoads.has(routerId)?routerLoads.get(routerId):0));
}
else
{
workerLoads.set(worker._pid,
(routerLoads.has(routerId)?routerLoads.get(routerId):0));
}
}
}
const sortedWorkerLoads = new Map([ ...workerLoads.entries() ].sort(
(a, b) => a[1] - b[1]));
// we don't care about if router is piped, just choose the least loaded worker
if (pipedRoutersIds.size === 0 ||
pipedRoutersIds.size === mediasoupRouters.size)
{
const workerId = sortedWorkerLoads.keys().next().value;
for (const worker of mediasoupWorkers)
{
if (worker._pid === workerId)
{
for (const router of worker._routers)
{
const routerId = router._internal.routerId;
if (mediasoupRouters.has(routerId))
{
return routerId;
}
}
}
}
}
else
{
// find if there is a piped router that is on a worker that is below limit
for (const [ workerId, workerLoad ] of sortedWorkerLoads.entries())
{
for (const worker of mediasoupWorkers)
{
if (worker._pid === workerId)
{
for (const router of worker._routers)
{
const routerId = router._internal.routerId;
// on purpose we check if the worker load is below the limit,
// as in reality the worker load is imortant,
// not the router load
if (mediasoupRouters.has(routerId) &&
pipedRoutersIds.has(routerId) &&
workerLoad < ROUTER_SCALE_SIZE)
{
return routerId;
}
}
}
}
}
// no piped router found, we need to return router from least loaded worker
const workerId = sortedWorkerLoads.keys().next().value;
for (const worker of mediasoupWorkers)
{
if (worker._pid === workerId)
{
for (const router of worker._routers)
{
const routerId = router._internal.routerId;
if (mediasoupRouters.has(routerId))
{
return routerId;
}
}
}
}
}
}
/**
* Factory function that creates and returns Room instance.
*
* @async
*
* @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new
* mediasoup Router must be created.
* @param {String} roomId - Id of the Room instance.
*/
static async create({ mediasoupWorkers, roomId, peers })
{
logger.info('create() [roomId:"%s"]', roomId);
// Router media codecs.
const mediaCodecs = config.mediasoup.router.mediaCodecs;
const mediasoupRouters = new Map();
for (const worker of mediasoupWorkers)
{
const router = await worker.createRouter({ mediaCodecs });
mediasoupRouters.set(router.id, router);
}
const firstRouter = mediasoupRouters.get(Room.getLeastLoadedRouter(
mediasoupWorkers, peers, mediasoupRouters));
// Create a mediasoup AudioLevelObserver on first router
const audioLevelObserver = await firstRouter.createAudioLevelObserver(
{
maxEntries : 1,
threshold : -80,
interval : 800
});
return new Room({
roomId,
mediasoupRouters,
audioLevelObserver,
mediasoupWorkers,
peers
});
}
constructor({
roomId,
mediasoupRouters,
audioLevelObserver,
mediasoupWorkers,
peers
})
{
logger.info('constructor() [roomId:"%s"]', roomId);
super();
this.setMaxListeners(Infinity);
// this._uuid = uuidv4();
this._mediasoupWorkers = mediasoupWorkers;
this._allPeers = peers;
// Room ID.
this._roomId = roomId;
// Closed flag.
this._closed = false;
// Joining queue
this._queue = new AwaitQueue();
this._lastN = [];
this._peers = {};
this._selfDestructTimeout = null;
// Array of mediasoup Router instances.
this._mediasoupRouters = mediasoupRouters;
}
close()
{
logger.debug('close()');
this._closed = true;
this._queue.close();
this._queue = null;
if (this._selfDestructTimeout)
clearTimeout(this._selfDestructTimeout);
this._selfDestructTimeout = null;
// Close the peers.
for (const peer in this._peers)
{
if (!this._peers[peer].closed)
this._peers[peer].close();
}
this._peers = null;
// Close the mediasoup Routers.
for (const router of this._mediasoupRouters.values())
{
router.close();
}
this._allPeers = null;
this._mediasoupWorkers = null;
this._mediasoupRouters.clear();
this._audioLevelObserver = null;
// Emit 'close' event.
this.emit('close');
}
handlePeer({ peer })
{
logger.info('handlePeer() [peer:"%s", role:%s]', peer.id, peer.role);
// Should not happen
if (this._peers[peer.id])
{
logger.warn(
'handleConnection() | there is already a peer with same peerId [peer:"%s"]',
peer.id);
}
this._peerJoining(peer);
}
logStatus()
{
logger.info(
'logStatus() [room id:"%s", peers:"%s"]',
this._roomId,
Object.keys(this._peers).length
);
}
dump()
{
return {
roomId : this._roomId,
peers : Object.keys(this._peers).length
};
}
get id()
{
return this._roomId;
}
selfDestructCountdown()
{
logger.debug('selfDestructCountdown() started');
if (this._selfDestructTimeout)
clearTimeout(this._selfDestructTimeout);
this._selfDestructTimeout = setTimeout(() =>
{
if (this._closed)
return;
if (this.checkEmpty())
{
logger.info(
'Room deserted for some time, closing the room [roomId:"%s"]',
this._roomId);
this.close();
}
else
logger.debug('selfDestructCountdown() aborted; room is not empty!');
}, 10000);
}
checkEmpty()
{
return Object.keys(this._peers).length === 0;
}
_peerJoining(peer)
{
this._queue.push(async () =>
{
peer.socket.join(this._roomId);
// If we don't have this peer, add to end
!this._lastN.includes(peer.id) && this._lastN.push(peer.id);
this._peers[peer.id] = peer;
// Assign routerId
peer.routerId = await this._getRouterId();
this._handlePeer(peer);
let turnServers;
if ('turnAPIURI' in config)
{
try
{
const { data } = await axios.get(
config.turnAPIURI,
{
timeout : config.turnAPITimeout || 2000,
params : {
...config.turnAPIparams,
'api_key' : config.turnAPIKey,
'ip' : peer.socket.request.connection.remoteAddress
}
});
turnServers = [ {
urls : data.uris,
username : data.username,
credential : data.password
} ];
}
catch (error)
{
if ('backupTurnServers' in config && config.backupTurnServers.length)
turnServers = config.backupTurnServers;
logger.error('_peerJoining() | error on REST turn [error:"%o"]', error);
}
}
else if ('backupTurnServers' in config && config.backupTurnServers.length)
{
turnServers = config.backupTurnServers;
}
this._notification(peer.socket, 'roomReady', { turnServers });
})
.catch((error) =>
{
logger.error('_peerJoining() [error:"%o"]', error);
});
}
_handlePeer(peer)
{
logger.debug('_handlePeer() [peer:"%s"]', peer.id);
peer.on('close', () =>
{
this._handlePeerClose(peer);
});
peer.on('nicknameChanged', () =>
{
// Spread to others
this._notification(peer.socket, 'changeNickname', {
peerId: peer.id,
nickname: peer.nickname
}, true);
});
peer.on('gotRole', ({ newRole }) =>
{
// Spread to others
this._notification(peer.socket, 'gotRole', {
peerId: peer.id,
role: newRole
}, true, true);
});
peer.socket.on('request', (request, cb) =>
{
logger.debug(
'Peer "request" event [method:"%s", peerId:"%s"]',
request.method, peer.id);
this._handleSocketRequest(peer, request, cb)
.catch((error) =>
{
logger.error('"request" failed [error:"%o"]', error);
cb(error);
});
});
// Peer left before we were done joining
if (peer.closed)
this._handlePeerClose(peer);
}
_handlePeerClose(peer)
{
logger.debug('_handlePeerClose() [peer:"%s"]', peer.id);
if (this._closed)
return;
this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true);
// Remove from lastN
this._lastN = this._lastN.filter((id) => id !== peer.id);
delete this._peers[peer.id];
// If this is the last Peer in the room close the room after a while.
if (this.checkEmpty())
this.selfDestructCountdown();
}
async _handleSocketRequest(peer, request, cb)
{
const router = this._mediasoupRouters.get(peer.routerId);
console.log(request.method);
switch (request.method)
{
case 'getRouterRtpCapabilities':
{
cb(null, router.rtpCapabilities);
break;
}
case 'join':
{
const {
nickname,
picture,
rtpCapabilities
} = request.data;
// Store client data into the Peer data object.
peer.nickname = nickname;
peer.picture = picture;
peer.rtpCapabilities = rtpCapabilities;
// Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers.
const joinedPeers = this.getPeers(peer);
const peerInfos = joinedPeers
.map((joinedPeer) => (joinedPeer.peerInfo));
cb(null, {
id: peer.id,
role: peer.role,
peers: peerInfos,
});
for (const joinedPeer of joinedPeers)
{
// Create Consumers for existing Producers.
for (const producer of joinedPeer.producers.values())
{
this._createConsumer(
{
consumerPeer : peer,
producerPeer : joinedPeer,
producer
});
}
}
// Notify the new Peer to all other Peers.
for (const otherPeer of this.getPeers(peer))
{
this._notification(
otherPeer.socket,
'newPeer',
peer.peerInfo
);
}
logger.debug(
'peer joined [peer: "%s", nickname: "%s", picture: "%s"]',
peer.id, nickname, picture);
break;
}
+ case 'createPlainTransport':
+ {
+ const { producing, consuming } = request.data;
+
+ const transport = await router.createPlainTransport(
+ {
+ // No RTP will be received from the remote side
+ comedia: false,
+
+ // FFmpeg and GStreamer don't support RTP/RTCP multiplexing ("a=rtcp-mux" in SDP)
+ rtcpMux: false,
+ listenIp: { ip: "127.0.0.1", announcedIp: null },
+
+ appData : { producing, consuming }
+ }
+ );
+ // Store the WebRtcTransport into the Peer data Object.
+ peer.addTransport(transport.id, transport);
+
+ cb(
+ null,
+ {
+ id : transport.id,
+ ip : transport.tuple.localIp,
+ port : transport.tuple.localPort,
+ rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined
+ });
+
+ break;
+ }
+
+ case 'connectPlainTransport':
+ {
+ const { transportId, ip, port, rtcpPort } = request.data;
+ const transport = peer.getTransport(transportId);
+
+ if (!transport)
+ throw new Error(`transport with id "${transportId}" not found`);
+
+ await transport.connect({
+ ip: ip,
+ port: port,
+ rtcpPort: rtcpPort,
+ });
+
+
+ cb();
+
+ break;
+ }
+
case 'createWebRtcTransport':
{
// NOTE: Don't require that the Peer is joined here, so the client can
// initiate mediasoup Transports and be ready when he later joins.
const { forceTcp, producing, consuming } = request.data;
const webRtcTransportOptions =
{
...config.mediasoup.webRtcTransport,
appData : { producing, consuming }
};
webRtcTransportOptions.enableTcp = true;
if (forceTcp)
webRtcTransportOptions.enableUdp = false;
else
{
webRtcTransportOptions.enableUdp = true;
webRtcTransportOptions.preferUdp = true;
}
const transport = await router.createWebRtcTransport(
webRtcTransportOptions
);
transport.on('dtlsstatechange', (dtlsState) => {
if (dtlsState === 'failed' || dtlsState === 'closed') {
logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);
}
});
// Store the WebRtcTransport into the Peer data Object.
peer.addTransport(transport.id, transport);
cb(
null,
{
id : transport.id,
iceParameters : transport.iceParameters,
iceCandidates : transport.iceCandidates,
dtlsParameters : transport.dtlsParameters
});
const { maxIncomingBitrate } = config.mediasoup.webRtcTransport;
// If set, apply max incoming bitrate limit.
if (maxIncomingBitrate)
{
try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
catch (error) {
logger.info("Setting the incoming bitrate failed")
}
}
break;
}
case 'connectWebRtcTransport':
{
const { transportId, dtlsParameters } = request.data;
const transport = peer.getTransport(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
await transport.connect({ dtlsParameters });
cb();
break;
}
/*
case 'restartIce':
{
const { transportId } = request.data;
const transport = peer.getTransport(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
const iceParameters = await transport.restartIce();
cb(null, iceParameters);
break;
}
*/
case 'produce':
{
let { appData } = request.data;
if (!appData.source || ![ 'mic', 'webcam', 'screen' ].includes(appData.source))
throw new Error('invalid producer source');
if (appData.source === 'mic' && !peer.hasRole(Roles.PUBLISHER))
throw new Error('peer not authorized');
if (appData.source === 'webcam' && !peer.hasRole(Roles.PUBLISHER))
throw new Error('peer not authorized');
if (appData.source === 'screen' && !peer.hasRole(Roles.PUBLISHER))
throw new Error('peer not authorized');
const { transportId, kind, rtpParameters } = request.data;
const transport = peer.getTransport(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// Add peerId into appData to later get the associated Peer during
// the 'loudest' event of the audioLevelObserver.
appData = { ...appData, peerId: peer.id };
const producer =
await transport.produce({ kind, rtpParameters, appData });
const pipeRouters = this._getRoutersToPipeTo(peer.routerId);
for (const [ routerId, destinationRouter ] of this._mediasoupRouters)
{
if (pipeRouters.includes(routerId))
{
await router.pipeToRouter({
producerId : producer.id,
router : destinationRouter
});
}
}
// Store the Producer into the Peer data Object.
peer.addProducer(producer.id, producer);
producer.on('videoorientationchange', (videoOrientation) =>
{
logger.debug(
'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]',
producer.id, videoOrientation);
});
cb(null, { id: producer.id });
// Optimization: Create a server-side Consumer for each Peer.
for (const otherPeer of this.getPeers(peer))
{
this._createConsumer(
{
consumerPeer : otherPeer,
producerPeer : peer,
producer
});
}
// Add into the audioLevelObserver.
if (kind === 'audio')
{
this._audioLevelObserver.addProducer({ producerId: producer.id })
.catch(() => {});
}
break;
}
case 'closeProducer':
{
const { producerId } = request.data;
const producer = peer.getProducer(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
producer.close();
// Remove from its map.
peer.removeProducer(producer.id);
cb();
break;
}
case 'pauseProducer':
{
const { producerId } = request.data;
const producer = peer.getProducer(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
await producer.pause();
cb();
break;
}
case 'resumeProducer':
{
const { producerId } = request.data;
const producer = peer.getProducer(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
await producer.resume();
cb();
break;
}
case 'pauseConsumer':
{
const { consumerId } = request.data;
const consumer = peer.getConsumer(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.pause();
cb();
break;
}
case 'resumeConsumer':
{
const { consumerId } = request.data;
const consumer = peer.getConsumer(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.resume();
cb();
break;
}
case 'changeNickname':
{
const { nickname } = request.data;
peer.nickname = nickname;
// This will be spread through events from the peer object
// Return no error
cb();
break;
}
case 'chatMessage':
{
const { message } = request.data;
// Spread to others
this._notification(peer.socket, 'chatMessage', {
peerId: peer.id,
nickname: peer.nickname,
message: message
}, true, true);
// Return no error
cb();
break;
}
case 'moderator:setRole':
{
if (!peer.hasRole(Roles.MODERATOR))
throw new Error('peer not authorized');
const { peerId, role } = request.data;
const giveRolePeer = this._peers[peerId];
if (!giveRolePeer)
throw new Error(`peer with id "${peerId}" not found`);
// TODO: check if role is valid value
// This will propagate the event automatically
giveRolePeer.setRole(role);
// Return no error
cb();
break;
}
case 'raisedHand':
{
const { raisedHand } = request.data;
peer.raisedHand = raisedHand;
// Spread to others
this._notification(peer.socket, 'raisedHand', {
peerId: peer.id,
raisedHand: raisedHand,
}, true);
// Return no error
cb();
break;
}
case 'moderator:closeRoom':
{
if (!peer.hasRole(Roles.OWNER))
throw new Error('peer not authorized');
this._notification(peer.socket, 'moderator:closeRoom', null, true);
cb();
// Close the room
this.close();
break;
}
case 'moderator:kickPeer':
{
if (!peer.hasRole(Roles.MODERATOR))
throw new Error('peer not authorized');
const { peerId } = request.data;
const kickPeer = this._peers[peerId];
if (!kickPeer)
throw new Error(`peer with id "${peerId}" not found`);
this._notification(kickPeer.socket, 'moderator:kickPeer');
kickPeer.close();
cb();
break;
}
default:
{
logger.error('unknown request.method "%s"', request.method);
cb(500, `unknown request.method "${request.method}"`);
}
}
}
/**
* Creates a mediasoup Consumer for the given mediasoup Producer.
*
* @async
*/
async _createConsumer({ consumerPeer, producerPeer, producer })
{
logger.debug(
'_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]',
consumerPeer.id,
producerPeer.id,
producer.id
);
const router = this._mediasoupRouters.get(producerPeer.routerId);
// Optimization:
// - Create the server-side Consumer. If video, do it paused.
// - Tell its Peer about it and wait for its response.
// - Upon receipt of the response, resume the server-side Consumer.
// - If video, this will mean a single key frame requested by the
// server-side Consumer (when resuming it).
// NOTE: Don't create the Consumer if the remote Peer cannot consume it.
if (
!consumerPeer.rtpCapabilities ||
!router.canConsume(
{
producerId : producer.id,
rtpCapabilities : consumerPeer.rtpCapabilities
})
)
{
return;
}
// Must take the Transport the remote Peer is using for consuming.
const transport = consumerPeer.getConsumerTransport();
// This should not happen.
if (!transport)
{
logger.warn('_createConsumer() | Transport for consuming not found');
return;
}
// Create the Consumer in paused mode.
let consumer;
try
{
consumer = await transport.consume(
{
producerId : producer.id,
rtpCapabilities : consumerPeer.rtpCapabilities,
paused : producer.kind === 'video'
});
if (producer.kind === 'audio')
await consumer.setPriority(255);
}
catch (error)
{
logger.warn('_createConsumer() | [error:"%o"]', error);
return;
}
// Store the Consumer into the consumerPeer data Object.
consumerPeer.addConsumer(consumer.id, consumer);
// Set Consumer events.
consumer.on('transportclose', () =>
{
// Remove from its map.
consumerPeer.removeConsumer(consumer.id);
});
consumer.on('producerclose', () =>
{
// Remove from its map.
consumerPeer.removeConsumer(consumer.id);
this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id });
});
consumer.on('producerpause', () =>
{
this._notification(consumerPeer.socket, 'consumerPaused', { consumerId: consumer.id });
});
consumer.on('producerresume', () =>
{
this._notification(consumerPeer.socket, 'consumerResumed', { consumerId: consumer.id });
});
// Send a request to the remote Peer with Consumer parameters.
try
{
await this._request(
consumerPeer.socket,
'newConsumer',
{
peerId : producerPeer.id,
kind : consumer.kind,
producerId : producer.id,
id : consumer.id,
rtpParameters : consumer.rtpParameters,
type : consumer.type,
appData : producer.appData,
producerPaused : consumer.producerPaused
}
);
// Now that we got the positive response from the remote Peer and, if
// video, resume the Consumer to ask for an efficient key frame.
await consumer.resume();
}
catch (error)
{
logger.warn('_createConsumer() | [error:"%o"]', error);
}
}
/**
* Get the list of peers.
*/
getPeers(excludePeer = undefined)
{
return Object.values(this._peers)
.filter((peer) => peer !== excludePeer);
}
_timeoutCallback(callback)
{
let called = false;
const interval = setTimeout(
() =>
{
if (called)
return;
called = true;
callback(new SocketTimeoutError('Request timed out'));
},
config.requestTimeout || 20000
);
return (...args) =>
{
if (called)
return;
called = true;
clearTimeout(interval);
callback(...args);
};
}
_sendRequest(socket, method, data = {})
{
return new Promise((resolve, reject) =>
{
socket.emit(
'request',
{ method, data },
this._timeoutCallback((err, response) =>
{
if (err)
{
reject(err);
}
else
{
resolve(response);
}
})
);
});
}
async _request(socket, method, data)
{
logger.debug('_request() [method:"%s", data:"%o"]', method, data);
const {
requestRetries = 3
} = config;
for (let tries = 0; tries < requestRetries; tries++)
{
try
{
return await this._sendRequest(socket, method, data);
}
catch (error)
{
if (
error instanceof SocketTimeoutError &&
tries < requestRetries
)
logger.warn('_request() | timeout, retrying [attempt:"%s"]', tries);
else
throw error;
}
}
}
_notification(socket, method, data = {}, broadcast = false, includeSender = false)
{
if (broadcast)
{
socket.broadcast.to(this._roomId).emit(
'notification', { method, data }
);
if (includeSender)
socket.emit('notification', { method, data });
}
else
{
socket.emit('notification', { method, data });
}
}
/*
* Pipe producers of peers that are running under another routher to this router.
*/
async _pipeProducersToRouter(routerId)
{
const router = this._mediasoupRouters.get(routerId);
// All peers that have a different router
const peersToPipe =
Object.values(this._peers)
.filter((peer) => peer.routerId !== routerId && peer.routerId !== null);
for (const peer of peersToPipe)
{
const srcRouter = this._mediasoupRouters.get(peer.routerId);
for (const producerId of peer.producers.keys())
{
if (router._producers.has(producerId))
{
continue;
}
await srcRouter.pipeToRouter({
producerId : producerId,
router : router
});
}
}
}
async _getRouterId()
{
const routerId = Room.getLeastLoadedRouter(
this._mediasoupWorkers, this._allPeers, this._mediasoupRouters);
await this._pipeProducersToRouter(routerId);
return routerId;
}
// Returns an array of router ids we need to pipe to:
// The combined set of routers of all peers, exluding the router of the peer itself.
_getRoutersToPipeTo(originRouterId)
{
return Object.values(this._peers)
.map((peer) => peer.routerId)
.filter((routerId, index, self) =>
routerId !== originRouterId && self.indexOf(routerId) === index
);
}
}
module.exports = Room;
diff --git a/meet/server/test/performancetestbench.js b/meet/server/test/performancetestbench.js
new file mode 100644
index 00000000..3dd89a78
--- /dev/null
+++ b/meet/server/test/performancetestbench.js
@@ -0,0 +1,286 @@
+const assert = require('assert');
+let request = require('supertest')
+const io = require("socket.io-client");
+
+const mediasoupClient = require('mediasoup-client');
+const fakeParameters = require('./fakeParameters');
+const Process = require("child_process");
+
+let app
+
+//TODO use this
+ // const mediaCodecs = config.mediasoup.router.mediaCodecs;
+
+ let rtpParameters = {
+ mediaCodecs: [
+ {
+ kind: "audio",
+ mimeType: "audio/opus",
+ preferredPayloadType: 111,
+ clockRate: 48000,
+ channels: 2,
+ parameters: {
+ minptime: 10,
+ useinbandfec: 1,
+ },
+ },
+ {
+ kind: "video",
+ mimeType: "video/VP8",
+ preferredPayloadType: 96,
+ clockRate: 90000,
+ },
+ {
+ kind: "video",
+ mimeType: "video/H264",
+ preferredPayloadType: 125,
+ clockRate: 90000,
+ parameters: {
+ "level-asymmetry-allowed": 1,
+ "packetization-mode": 1,
+ "profile-level-id": "42e01f",
+ },
+ },
+ ],
+ }
+
+
+before(function (done) {
+ process.env.SSL_CERT = "../../docker/certs/kolab.hosted.com.cert"
+ process.env.SSL_KEY = "../../docker/certs/kolab.hosted.com.key"
+ process.env.REDIS_IP = "none"
+ process.env.DEBUG = '*'
+ app = require('../server.js')
+ request = request(app);
+
+ app.on("ready", function(){
+ done();
+ });
+});
+
+describe('Join room', function() {
+ const roomId = "room1";
+ let signalingSocket
+ let peerId
+
+ async function sendRequest(socket, method, data = null) {
+ return await new Promise((resolve, /*reject*/) => {
+ socket.emit(
+ 'request',
+ {method: method,
+ data: data},
+ (error, response) => {
+ assert(!error)
+ resolve(response)
+ }
+ )
+ })
+ }
+
+ it('create room', async () => {
+ return request
+ .post(`/meetmedia/api/sessions/${roomId}/connection`)
+ .send({role: 31})
+ .expect(200)
+ .then(async (res) => {
+ let data = res.body;
+ peerId = data['id'];
+ const signalingUrl = data['token'];
+ assert(signalingUrl.includes(peerId))
+ assert(signalingUrl.includes(roomId))
+ console.info(signalingUrl);
+
+ signalingSocket = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false });
+ let roomReady = new Promise((resolve, /*reject*/) => {
+ signalingSocket.on('notification', (reason) => {
+ if (reason['method'] == 'roomReady') {
+ resolve();
+ }
+ });
+ })
+
+ signalingSocket.connect();
+ await roomReady
+ })
+ .catch(err => { console.warn(err); throw err })
+ });
+
+ it('getRtpCapabilities', async () => {
+ const routerRtpCapabilities = await sendRequest(signalingSocket, 'getRouterRtpCapabilities')
+ assert(Object.keys(routerRtpCapabilities).length != 0)
+ });
+
+
+ it('join', async () => {
+ const { id, role, peers } = await sendRequest(signalingSocket, 'join', {
+ nickname: "nickname",
+ rtpCapabilities: rtpParameters
+ })
+ assert.equal(id, peerId)
+ assert.equal(role, 31)
+ assert.equal(peers.length, 0)
+ })
+
+ it('second peer joining', async () => {
+ return request
+ .post(`/meetmedia/api/sessions/${roomId}/connection`)
+ .expect(200)
+ .then(async (res) => {
+ let data = res.body;
+ const newId = data['id'];
+ const signalingUrl = data['token'];
+
+ let signalingSocket2 = io(signalingUrl, { path: '/meetmedia/signaling', transports: ["websocket"], rejectUnauthorized: false });
+
+ let roomReady = new Promise((resolve, /*reject*/) => {
+ signalingSocket2.on('notification', async (reason) => {
+ if (reason['method'] == 'roomReady') {
+ resolve(reason);
+ }
+ });
+ })
+
+ let newPeer = new Promise((resolve, /*reject*/) => {
+ signalingSocket.on('notification', (reason) => {
+ if (reason.method == 'newPeer') {
+ resolve(reason);
+ }
+ });
+ })
+
+ signalingSocket.connect();
+
+
+ let reason = await roomReady;
+ const { peers } = await sendRequest(signalingSocket2, 'join', {
+ nickname: "nickname",
+ rtpCapabilities: rtpParameters
+ })
+ assert.equal(peers.length, 1)
+ assert.equal(peers[0].id, peerId)
+
+ reason = await newPeer;
+ assert(reason.data.id == newId);
+ })
+ .catch(err => { console.warn(err); throw err })
+ });
+
+ let transportInfo;
+
+ it('createPlainTransport', async () => {
+ transportInfo = await sendRequest(signalingSocket, 'createPlainTransport', {
+ producing: true,
+ consuming: false,
+ })
+
+ // const { id, iceParameters, iceCandidates, dtlsParameters } = transportInfo
+ console.warn(transportInfo);
+
+ const { id } = await sendRequest(signalingSocket, 'produce', {
+ transportId: transportInfo.id,
+ kind: 'video',
+ rtpParameters: {
+ codecs: [
+ {
+ mimeType: "video/H264",
+ payloadType: 125,
+ clockRate: 90000,
+ parameters: {
+ "level-asymmetry-allowed": 1,
+ "packetization-mode": 1,
+ "profile-level-id": "42e01f",
+ },
+ },
+ ],
+ encodings: [{ ssrc: 2222 }]
+ },
+ appData: {
+ source: 'webcam'
+ }
+ })
+
+ //TODO now we could instruct gstreamer to produce video for the
+
+ });
+
+ let recProcess;
+
+ it('startFFMPEG', async () => {
+ let recResolve;
+ const promise = new Promise((res, _rej) => {
+ recResolve = res;
+ });
+
+ const cmdProgram = "ffmpeg";
+
+ const cmdArgStr = [
+ "-i /dev/video0",
+ "-r 24",
+ "-video_size 320x240",
+ `-c:v h264`,
+ "-f",
+ "rtp",
+ "-payload_type 125",
+ "-ssrc 2222",
+ // `[select=a:f=rtp:ssrc=2222:payload_type=125]rtp://127.0.0.1:${transportInfo.port}`
+ `rtp://127.0.0.1:${transportInfo.port}`,
+ ].join(" ").trim();
+
+ console.log(`Run command: ${cmdProgram} ${cmdArgStr}`);
+
+ recProcess = Process.spawn(cmdProgram, cmdArgStr.split(/\s+/));
+
+ recProcess.on("error", (err) => {
+ console.error("Recording process error:", err);
+ });
+
+ recProcess.on("exit", (code, signal) => {
+ console.log("Recording process exit, code: %d, signal: %s", code, signal);
+
+ recProcess = null;
+ // stopMediasoupRtp();
+
+ if (!signal || signal === "SIGINT") {
+ console.log("Recording stopped");
+ } else {
+ console.warn(
+ "Recording process didn't exit cleanly, output file might be corrupt"
+ );
+ }
+ });
+
+ // FFmpeg writes its logs to stderr
+ recProcess.stderr.on("data", (chunk) => {
+ chunk
+ .toString()
+ .split(/\r?\n/g)
+ .filter(Boolean) // Filter out empty strings
+ .forEach((line) => {
+ console.log(line);
+ //Stop after 5s of streaming
+ if (line.startsWith("ffmpeg version")) {
+ setTimeout(() => {
+ recResolve();
+ }, 5000);
+ }
+ });
+ });
+
+ return promise;
+ });
+
+
+
+ after(function () {
+ signalingSocket.close();
+ if (recProcess) {
+ recProcess.kill()
+ }
+ })
+
+});
+
+after(function () {
+ process.exit();
+})
+
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Apr 4, 1:27 AM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18747867
Default Alt Text
(45 KB)
Attached To
Mode
rK kolab
Attached
Detach File
Event Timeline