diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index db64f4a2..7af2e42e 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -1,287 +1,283 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server; import java.io.IOException; import java.net.MalformedURLException; import java.util.Arrays; import java.util.List; import javax.annotation.PostConstruct; import org.kurento.jsonrpc.JsonUtils; import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration; import org.kurento.jsonrpc.server.JsonRpcConfigurer; import org.kurento.jsonrpc.server.JsonRpcHandlerRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.context.event.EventListener; import org.springframework.core.env.Environment; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.cdr.CDRLoggerFile; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.HttpHandshakeInterceptor; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.TokenGenerator; import io.openvidu.server.core.TokenGeneratorDefault; import io.openvidu.server.coturn.CoturnCredentialsService; import io.openvidu.server.coturn.CoturnCredentialsServiceFactory; -import io.openvidu.server.kurento.AutodiscoveryKurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientProvider; import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig; import io.openvidu.server.kurento.core.KurentoSessionEventsHandler; import io.openvidu.server.kurento.core.KurentoSessionManager; +import io.openvidu.server.kurento.kms.DummyLoadManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager; +import io.openvidu.server.kurento.kms.KmsManager; +import io.openvidu.server.kurento.kms.LoadManager; import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcNotificationService; import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIpDummy; /** * OpenVidu Server application * * @author Pablo Fuente (pablofuenteperez@gmail.com) */ @Import({ JsonRpcConfiguration.class }) @SpringBootApplication public class OpenViduServer implements JsonRpcConfigurer { private static final Logger log = LoggerFactory.getLogger(OpenViduServer.class); @Autowired private Environment env; public static final String KMSS_URIS_PROPERTY = "kms.uris"; public static String wsUrl; public static String httpUrl; @Bean @ConditionalOnMissingBean - public KurentoClientProvider kmsManager() { - + public KmsManager kmsManager() { JsonParser parser = new JsonParser(); String uris = env.getProperty(KMSS_URIS_PROPERTY); JsonElement elem = parser.parse(uris); JsonArray kmsUris = elem.getAsJsonArray(); List kmsWsUris = JsonUtils.toStringList(kmsUris); if (kmsWsUris.isEmpty()) { throw new IllegalArgumentException(KMSS_URIS_PROPERTY + " should contain at least one kms url"); } String firstKmsWsUri = kmsWsUris.get(0); - - if (firstKmsWsUri.equals("autodiscovery")) { - log.info("Using autodiscovery rules to locate KMS on every pipeline"); - return new AutodiscoveryKurentoClientProvider(); - } else { - log.info("Configuring OpenVidu Server to use first of the following kmss: " + kmsWsUris); - return new FixedOneKmsManager(firstKmsWsUri); - } + log.info("OpenVidu Server using one KMS: {}", kmsWsUris); + return new FixedOneKmsManager(firstKmsWsUri); } @Bean @ConditionalOnMissingBean public RpcNotificationService notificationService() { return new RpcNotificationService(); } @Bean @ConditionalOnMissingBean public SessionManager sessionManager() { return new KurentoSessionManager(); } @Bean @ConditionalOnMissingBean public RpcHandler rpcHandler() { return new RpcHandler(); } @Bean @ConditionalOnMissingBean public SessionEventsHandler sessionEventsHandler() { return new KurentoSessionEventsHandler(); } @Bean @ConditionalOnMissingBean public CallDetailRecord cdr() { return new CallDetailRecord(Arrays.asList(new CDRLoggerFile())); } @Bean @ConditionalOnMissingBean public KurentoParticipantEndpointConfig kurentoEndpointConfig() { return new KurentoParticipantEndpointConfig(); } @Bean @ConditionalOnMissingBean public TokenGenerator tokenGenerator() { return new TokenGeneratorDefault(); } + @Bean + @ConditionalOnMissingBean + public LoadManager loadManager() { + return new DummyLoadManager(); + } + @Bean @ConditionalOnMissingBean public OpenviduConfig openviduConfig() { return new OpenviduConfig(); } @Bean @ConditionalOnMissingBean public RecordingManager recordingManager() { return new RecordingManager(); } @Bean public CoturnCredentialsService coturnCredentialsService() { return new CoturnCredentialsServiceFactory(openviduConfig()).getCoturnCredentialsService(); } @Bean @ConditionalOnMissingBean public GeoLocationByIp geoLocationByIp() { return new GeoLocationByIpDummy(); } @Override public void registerJsonRpcHandlers(JsonRpcHandlerRegistry registry) { registry.addHandler(rpcHandler().withPingWatchdog(true).withInterceptors(new HttpHandshakeInterceptor()), "/openvidu"); } private static String getContainerIp() throws IOException, InterruptedException { return CommandExecutor.execCommand("/bin/sh", "-c", "hostname -i | awk '{print $1}'"); } public static void main(String[] args) throws Exception { log.info("Using /dev/urandom for secure random generation"); System.setProperty("java.security.egd", "file:/dev/./urandom"); SpringApplication.run(OpenViduServer.class, args); } @PostConstruct public void init() throws MalformedURLException, InterruptedException { OpenviduConfig openviduConf = openviduConfig(); String publicUrl = openviduConf.getOpenViduPublicUrl(); String type = publicUrl; switch (publicUrl) { case "docker": try { String containerIp = getContainerIp(); OpenViduServer.wsUrl = "wss://" + containerIp + ":" + openviduConf.getServerPort(); } catch (Exception e) { log.error("Docker container IP was configured, but there was an error obtaining IP: " + e.getClass().getName() + " " + e.getMessage()); log.error("Fallback to local URL"); OpenViduServer.wsUrl = null; } break; case "local": break; case "": break; default: type = "custom"; if (publicUrl.startsWith("https://")) { OpenViduServer.wsUrl = publicUrl.replace("https://", "wss://"); } else if (publicUrl.startsWith("http://")) { OpenViduServer.wsUrl = publicUrl.replace("http://", "wss://"); } if (!OpenViduServer.wsUrl.startsWith("wss://")) { OpenViduServer.wsUrl = "wss://" + OpenViduServer.wsUrl; } } if (OpenViduServer.wsUrl == null) { type = "local"; OpenViduServer.wsUrl = "wss://localhost:" + openviduConf.getServerPort(); } if (OpenViduServer.wsUrl.endsWith("/")) { OpenViduServer.wsUrl = OpenViduServer.wsUrl.substring(0, OpenViduServer.wsUrl.length() - 1); } if (this.openviduConfig().isRecordingModuleEnabled()) { try { this.recordingManager().initializeRecordingManager(); } catch (OpenViduException e) { String finalErrorMessage = ""; if (e.getCodeValue() == Code.DOCKER_NOT_FOUND.getValue()) { finalErrorMessage = "Error connecting to Docker daemon. Enabling OpenVidu recording module requires Docker"; } else if (e.getCodeValue() == Code.RECORDING_PATH_NOT_VALID.getValue()) { finalErrorMessage = "Error initializing recording path \"" + this.openviduConfig().getOpenViduRecordingPath() + "\" set with system property \"openvidu.recording.path\""; } else if (e.getCodeValue() == Code.RECORDING_FILE_EMPTY_ERROR.getValue()) { finalErrorMessage = "Error initializing recording custom layouts path \"" + this.openviduConfig().getOpenviduRecordingCustomLayout() + "\" set with system property \"openvidu.recording.custom-layout\""; } log.error(finalErrorMessage + ". Shutting down OpenVidu Server"); System.exit(1); } } String finalUrl = OpenViduServer.wsUrl.replaceFirst("wss://", "https://").replaceFirst("ws://", "http://"); openviduConf.setFinalUrl(finalUrl); httpUrl = openviduConf.getFinalUrl(); log.info("OpenVidu Server using " + type + " URL: [" + OpenViduServer.wsUrl + "]"); } @EventListener(ApplicationReadyEvent.class) public void whenReady() { final String NEW_LINE = System.lineSeparator(); - String str = NEW_LINE + - NEW_LINE + " ACCESS IP " + - NEW_LINE + "-------------------------" + - NEW_LINE + httpUrl + - NEW_LINE + "-------------------------" + - NEW_LINE; + String str = NEW_LINE + NEW_LINE + " ACCESS IP " + NEW_LINE + "-------------------------" + + NEW_LINE + httpUrl + NEW_LINE + "-------------------------" + NEW_LINE; log.info(str); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java index 57bf2e9a..551a9c2d 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/MediaOptions.java @@ -1,104 +1,104 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.core; import com.google.gson.JsonObject; -import io.openvidu.server.kurento.KurentoFilter; +import io.openvidu.server.kurento.endpoint.KurentoFilter; public class MediaOptions { protected Boolean hasAudio; protected Boolean hasVideo; protected Boolean audioActive; protected Boolean videoActive; protected String typeOfVideo; protected Integer frameRate; protected String videoDimensions; protected KurentoFilter filter; public MediaOptions(Boolean hasAudio, Boolean hasVideo, Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, KurentoFilter filter) { this.hasAudio = hasAudio; this.hasVideo = hasVideo; this.audioActive = audioActive; this.videoActive = videoActive; this.typeOfVideo = typeOfVideo; this.frameRate = frameRate; this.videoDimensions = videoDimensions; this.filter = filter; } public boolean hasAudio() { return this.hasAudio; } public boolean hasVideo() { return this.hasVideo; } public boolean isAudioActive() { return this.hasAudio && this.audioActive; } public boolean isVideoActive() { return this.hasVideo && this.videoActive; } public String getTypeOfVideo() { return this.typeOfVideo; } public Integer getFrameRate() { return this.frameRate; } public String getVideoDimensions() { return this.videoDimensions; } public KurentoFilter getFilter() { return this.filter; } public void setFilter(KurentoFilter filter) { this.filter = filter; } public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("hasAudio", this.hasAudio); if (hasAudio) json.addProperty("audioActive", this.audioActive); json.addProperty("hasVideo", this.hasVideo); if (hasVideo) { json.addProperty("videoActive", this.videoActive); json.addProperty("typeOfVideo", this.typeOfVideo); json.addProperty("frameRate", this.frameRate); json.addProperty("videoDimensions", this.videoDimensions); } json.add("filter", this.filter != null ? this.filter.toJson() : new JsonObject()); if (this.filter != null) { ((JsonObject) json.get("filter")).add("lastExecMethod", this.filter.getLastExecMethod() != null ? this.filter.getLastExecMethod().toJson() : new JsonObject()); } return json; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java index 890063b1..704806b3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java +++ b/openvidu-server/src/main/java/io/openvidu/server/core/SessionEventsHandler.java @@ -1,572 +1,572 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.core; import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.InfoHandler; import io.openvidu.server.config.OpenviduConfig; -import io.openvidu.server.kurento.KurentoFilter; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.recording.Recording; import io.openvidu.server.rpc.RpcNotificationService; public class SessionEventsHandler { private static final Logger log = LoggerFactory.getLogger(SessionEventsHandler.class); @Autowired protected RpcNotificationService rpcNotificationService; @Autowired protected InfoHandler infoHandler; @Autowired protected CallDetailRecord CDR; @Autowired protected OpenviduConfig openviduConfig; Map recordingsStarted = new ConcurrentHashMap<>(); ReentrantLock lock = new ReentrantLock(); public void onSessionCreated(Session session) { CDR.recordSessionCreated(session); } public void onSessionClosed(String sessionId, EndReason reason) { CDR.recordSessionDestroyed(sessionId, reason); } public void onParticipantJoined(Participant participant, String sessionId, Set existingParticipants, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } JsonObject result = new JsonObject(); JsonArray resultArray = new JsonArray(); for (Participant existingParticipant : existingParticipants) { JsonObject participantJson = new JsonObject(); participantJson.addProperty(ProtocolElements.JOINROOM_PEERID_PARAM, existingParticipant.getParticipantPublicId()); participantJson.addProperty(ProtocolElements.JOINROOM_PEERCREATEDAT_PARAM, existingParticipant.getCreatedAt()); // Metadata associated to each existing participant participantJson.addProperty(ProtocolElements.JOINROOM_METADATA_PARAM, existingParticipant.getFullMetadata()); if (existingParticipant.isStreaming()) { KurentoParticipant kParticipant = (KurentoParticipant) existingParticipant; JsonObject stream = new JsonObject(); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMID_PARAM, existingParticipant.getPublisherStreamId()); stream.addProperty(ProtocolElements.JOINROOM_PEERCREATEDAT_PARAM, kParticipant.getPublisher().createdAt()); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMHASAUDIO_PARAM, kParticipant.getPublisherMediaOptions().hasAudio); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMHASVIDEO_PARAM, kParticipant.getPublisherMediaOptions().hasVideo); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMVIDEOACTIVE_PARAM, kParticipant.getPublisherMediaOptions().videoActive); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMAUDIOACTIVE_PARAM, kParticipant.getPublisherMediaOptions().audioActive); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMVIDEOACTIVE_PARAM, kParticipant.getPublisherMediaOptions().videoActive); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMTYPEOFVIDEO_PARAM, kParticipant.getPublisherMediaOptions().typeOfVideo); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMFRAMERATE_PARAM, kParticipant.getPublisherMediaOptions().frameRate); stream.addProperty(ProtocolElements.JOINROOM_PEERSTREAMVIDEODIMENSIONS_PARAM, kParticipant.getPublisherMediaOptions().videoDimensions); JsonElement filter = kParticipant.getPublisherMediaOptions().getFilter() != null ? kParticipant.getPublisherMediaOptions().getFilter().toJson() : new JsonObject(); stream.add(ProtocolElements.JOINROOM_PEERSTREAMFILTER_PARAM, filter); JsonArray streamsArray = new JsonArray(); streamsArray.add(stream); participantJson.add(ProtocolElements.JOINROOM_PEERSTREAMS_PARAM, streamsArray); } // Avoid emitting 'connectionCreated' event of existing RECORDER participant in // openvidu-browser in newly joined participants if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(existingParticipant.getParticipantPublicId())) { resultArray.add(participantJson); } // If RECORDER participant has joined do NOT send 'participantJoined' // notification to existing participants. 'recordingStarted' will be sent to all // existing participants when recorder first subscribe to a stream if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { JsonObject notifParams = new JsonObject(); // Metadata associated to new participant notifParams.addProperty(ProtocolElements.PARTICIPANTJOINED_USER_PARAM, participant.getParticipantPublicId()); notifParams.addProperty(ProtocolElements.PARTICIPANTJOINED_CREATEDAT_PARAM, participant.getCreatedAt()); notifParams.addProperty(ProtocolElements.PARTICIPANTJOINED_METADATA_PARAM, participant.getFullMetadata()); rpcNotificationService.sendNotification(existingParticipant.getParticipantPrivateId(), ProtocolElements.PARTICIPANTJOINED_METHOD, notifParams); } } result.addProperty(ProtocolElements.PARTICIPANTJOINED_USER_PARAM, participant.getParticipantPublicId()); result.addProperty(ProtocolElements.PARTICIPANTJOINED_CREATEDAT_PARAM, participant.getCreatedAt()); result.addProperty(ProtocolElements.PARTICIPANTJOINED_METADATA_PARAM, participant.getFullMetadata()); result.add("value", resultArray); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result); } public void onParticipantLeft(Participant participant, String sessionId, Set remainingParticipants, Integer transactionId, OpenViduException error, EndReason reason) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { // RECORDER participant return; } JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.PARTICIPANTLEFT_NAME_PARAM, participant.getParticipantPublicId()); params.addProperty(ProtocolElements.PARTICIPANTLEFT_REASON_PARAM, reason != null ? reason.name() : ""); for (Participant p : remainingParticipants) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTLEFT_METHOD, params); } if (transactionId != null) { // No response when the participant is forcibly evicted instead of voluntarily // leaving the session rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { CDR.recordParticipantLeft(participant, sessionId, reason); } } public void onPublishMedia(Participant participant, String streamId, Long createdAt, String sessionId, MediaOptions mediaOptions, String sdpAnswer, Set participants, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } JsonObject result = new JsonObject(); result.addProperty(ProtocolElements.PUBLISHVIDEO_SDPANSWER_PARAM, sdpAnswer); result.addProperty(ProtocolElements.PUBLISHVIDEO_STREAMID_PARAM, streamId); result.addProperty(ProtocolElements.PUBLISHVIDEO_CREATEDAT_PARAM, createdAt); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result); JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_USER_PARAM, participant.getParticipantPublicId()); JsonObject stream = new JsonObject(); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_STREAMID_PARAM, streamId); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_CREATEDAT_PARAM, createdAt); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_HASAUDIO_PARAM, mediaOptions.hasAudio); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_HASVIDEO_PARAM, mediaOptions.hasVideo); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_AUDIOACTIVE_PARAM, mediaOptions.audioActive); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_VIDEOACTIVE_PARAM, mediaOptions.videoActive); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_TYPEOFVIDEO_PARAM, mediaOptions.typeOfVideo); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_FRAMERATE_PARAM, mediaOptions.frameRate); stream.addProperty(ProtocolElements.PARTICIPANTPUBLISHED_VIDEODIMENSIONS_PARAM, mediaOptions.videoDimensions); JsonElement filter = mediaOptions.getFilter() != null ? mediaOptions.getFilter().toJson() : new JsonObject(); stream.add(ProtocolElements.JOINROOM_PEERSTREAMFILTER_PARAM, filter); JsonArray streamsArray = new JsonArray(); streamsArray.add(stream); params.add(ProtocolElements.PARTICIPANTPUBLISHED_STREAMS_PARAM, streamsArray); for (Participant p : participants) { if (p.getParticipantPrivateId().equals(participant.getParticipantPrivateId())) { continue; } else { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTPUBLISHED_METHOD, params); } } } public void onUnpublishMedia(Participant participant, Set participants, Participant moderator, Integer transactionId, OpenViduException error, EndReason reason) { boolean isRpcFromModerator = transactionId != null && moderator != null; boolean isRpcFromOwner = transactionId != null && moderator == null; if (isRpcFromModerator) { if (error != null) { rpcNotificationService.sendErrorResponse(moderator.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(moderator.getParticipantPrivateId(), transactionId, new JsonObject()); } JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.PARTICIPANTUNPUBLISHED_NAME_PARAM, participant.getParticipantPublicId()); params.addProperty(ProtocolElements.PARTICIPANTUNPUBLISHED_REASON_PARAM, reason != null ? reason.name() : ""); for (Participant p : participants) { if (p.getParticipantPrivateId().equals(participant.getParticipantPrivateId())) { // Send response to the affected participant if (!isRpcFromOwner) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTUNPUBLISHED_METHOD, params); } else { if (error != null) { rpcNotificationService.sendErrorResponse(p.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(p.getParticipantPrivateId(), transactionId, new JsonObject()); } } else { if (error == null) { // Send response to every other user in the session different than the affected // participant rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTUNPUBLISHED_METHOD, params); } } } } public void onSubscribe(Participant participant, Session session, String sdpAnswer, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } JsonObject result = new JsonObject(); result.addProperty(ProtocolElements.RECEIVEVIDEO_SDPANSWER_PARAM, sdpAnswer); rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, result); if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { lock.lock(); try { Recording recording = this.recordingsStarted.remove(session.getSessionId()); if (recording != null) { // RECORDER participant is now receiving video from the first publisher this.sendRecordingStartedNotification(session, recording); } } finally { lock.unlock(); } } } public void onUnsubscribe(Participant participant, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } public void onSendMessage(Participant participant, JsonObject message, Set participants, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.PARTICIPANTSENDMESSAGE_DATA_PARAM, message.get("data").getAsString()); params.addProperty(ProtocolElements.PARTICIPANTSENDMESSAGE_FROM_PARAM, participant.getParticipantPublicId()); params.addProperty(ProtocolElements.PARTICIPANTSENDMESSAGE_TYPE_PARAM, message.get("type").getAsString()); Set toSet = new HashSet(); if (message.has("to")) { JsonArray toJson = message.get("to").getAsJsonArray(); for (int i = 0; i < toJson.size(); i++) { JsonElement el = toJson.get(i); if (el.isJsonNull()) { throw new OpenViduException(Code.SIGNAL_TO_INVALID_ERROR_CODE, "Signal \"to\" field invalid format: null"); } toSet.add(el.getAsString()); } } if (toSet.isEmpty()) { for (Participant p : participants) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTSENDMESSAGE_METHOD, params); } } else { Set participantPublicIds = participants.stream().map(Participant::getParticipantPublicId) .collect(Collectors.toSet()); for (String to : toSet) { if (participantPublicIds.contains(to)) { Optional p = participants.stream().filter(x -> to.equals(x.getParticipantPublicId())) .findFirst(); rpcNotificationService.sendNotification(p.get().getParticipantPrivateId(), ProtocolElements.PARTICIPANTSENDMESSAGE_METHOD, params); } else { throw new OpenViduException(Code.SIGNAL_TO_INVALID_ERROR_CODE, "Signal \"to\" field invalid format: Connection [" + to + "] does not exist"); } } } rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } public void onStreamPropertyChanged(Participant participant, Integer transactionId, Set participants, String streamId, String property, JsonElement newValue, String reason) { JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_CONNECTIONID_PARAM, participant.getParticipantPublicId()); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_STREAMID_PARAM, streamId); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_PROPERTY_PARAM, property); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_NEWVALUE_PARAM, newValue.toString()); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_REASON_PARAM, reason); for (Participant p : participants) { if (p.getParticipantPrivateId().equals(participant.getParticipantPrivateId())) { rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } else { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.STREAMPROPERTYCHANGED_METHOD, params); } } } public void onRecvIceCandidate(Participant participant, Integer transactionId, OpenViduException error) { if (error != null) { rpcNotificationService.sendErrorResponse(participant.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(participant.getParticipantPrivateId(), transactionId, new JsonObject()); } public void onForceDisconnect(Participant moderator, Participant evictedParticipant, Set participants, Integer transactionId, OpenViduException error, EndReason reason) { boolean isRpcCall = transactionId != null; if (isRpcCall) { if (error != null) { rpcNotificationService.sendErrorResponse(moderator.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(moderator.getParticipantPrivateId(), transactionId, new JsonObject()); } JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.PARTICIPANTEVICTED_CONNECTIONID_PARAM, evictedParticipant.getParticipantPublicId()); params.addProperty(ProtocolElements.PARTICIPANTEVICTED_REASON_PARAM, reason != null ? reason.name() : ""); if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(evictedParticipant.getParticipantPublicId())) { // Do not send a message when evicting RECORDER participant rpcNotificationService.sendNotification(evictedParticipant.getParticipantPrivateId(), ProtocolElements.PARTICIPANTEVICTED_METHOD, params); } for (Participant p : participants) { if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(evictedParticipant.getParticipantPublicId())) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.PARTICIPANTEVICTED_METHOD, params); } } } public void sendRecordingStartedNotification(Session session, Recording recording) { CDR.recordRecordingStarted(session.getSessionId(), recording); // Filter participants by roles according to "openvidu.recording.notification" Set filteredParticipants = this.filterParticipantsByRole( this.openviduConfig.getRolesFromRecordingNotification(), session.getParticipants()); JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.RECORDINGSTARTED_ID_PARAM, recording.getId()); params.addProperty(ProtocolElements.RECORDINGSTARTED_NAME_PARAM, recording.getName()); for (Participant p : filteredParticipants) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.RECORDINGSTARTED_METHOD, params); } } public void sendRecordingStoppedNotification(Session session, Recording recording, EndReason reason) { CDR.recordRecordingStopped(session.getSessionId(), recording, reason); // Be sure to clean this map (this should return null) this.recordingsStarted.remove(session.getSessionId()); // Filter participants by roles according to "openvidu.recording.notification" Set existingParticipants; try { existingParticipants = session.getParticipants(); } catch (OpenViduException exception) { // Session is already closed. This happens when RecordingMode.ALWAYS and last // participant has left the session. No notification needs to be sent log.warn("Session already closed when trying to send 'recordingStopped' notification"); return; } Set filteredParticipants = this.filterParticipantsByRole( this.openviduConfig.getRolesFromRecordingNotification(), existingParticipants); JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.RECORDINGSTOPPED_ID_PARAM, recording.getId()); params.addProperty(ProtocolElements.RECORDINGSTARTED_NAME_PARAM, recording.getName()); params.addProperty(ProtocolElements.RECORDINGSTOPPED_REASON_PARAM, reason != null ? reason.name() : ""); for (Participant p : filteredParticipants) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.RECORDINGSTOPPED_METHOD, params); } } public void onFilterChanged(Participant participant, Participant moderator, Integer transactionId, Set participants, String streamId, KurentoFilter filter, OpenViduException error, String filterReason) { boolean isRpcFromModerator = transactionId != null && moderator != null; if (isRpcFromModerator) { // A moderator forced the application of the filter if (error != null) { rpcNotificationService.sendErrorResponse(moderator.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(moderator.getParticipantPrivateId(), transactionId, new JsonObject()); } JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_CONNECTIONID_PARAM, participant.getParticipantPublicId()); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_STREAMID_PARAM, streamId); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_PROPERTY_PARAM, "filter"); JsonObject filterJson = new JsonObject(); if (filter != null) { filterJson.addProperty(ProtocolElements.FILTER_TYPE_PARAM, filter.getType()); filterJson.add(ProtocolElements.FILTER_OPTIONS_PARAM, filter.getOptions()); if (filter.getLastExecMethod() != null) { filterJson.add(ProtocolElements.EXECFILTERMETHOD_LASTEXECMETHOD_PARAM, filter.getLastExecMethod().toJson()); } } params.add(ProtocolElements.STREAMPROPERTYCHANGED_NEWVALUE_PARAM, filterJson); params.addProperty(ProtocolElements.STREAMPROPERTYCHANGED_REASON_PARAM, filterReason); for (Participant p : participants) { if (p.getParticipantPrivateId().equals(participant.getParticipantPrivateId())) { // Affected participant if (isRpcFromModerator) { // Force by moderator. Send notification to affected participant rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.STREAMPROPERTYCHANGED_METHOD, params); } else { // Send response to participant if (error != null) { rpcNotificationService.sendErrorResponse(p.getParticipantPrivateId(), transactionId, null, error); return; } rpcNotificationService.sendResponse(p.getParticipantPrivateId(), transactionId, new JsonObject()); } } else { // Send response to every other user in the session different than the affected // participant or the moderator if (error == null && (moderator == null || !p.getParticipantPrivateId().equals(moderator.getParticipantPrivateId()))) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.STREAMPROPERTYCHANGED_METHOD, params); } } } } public void onFilterEventDispatched(String connectionId, String streamId, String filterType, String eventType, Object data, Set participants, Set subscribedParticipants) { JsonObject params = new JsonObject(); params.addProperty(ProtocolElements.FILTEREVENTLISTENER_CONNECTIONID_PARAM, connectionId); params.addProperty(ProtocolElements.FILTEREVENTLISTENER_STREAMID_PARAM, streamId); params.addProperty(ProtocolElements.FILTEREVENTLISTENER_FILTERTYPE_PARAM, filterType); params.addProperty(ProtocolElements.FILTEREVENTLISTENER_EVENTTYPE_PARAM, eventType); params.addProperty(ProtocolElements.FILTEREVENTLISTENER_DATA_PARAM, data.toString()); for (Participant p : participants) { if (subscribedParticipants.contains(p.getParticipantPublicId())) { rpcNotificationService.sendNotification(p.getParticipantPrivateId(), ProtocolElements.FILTEREVENTDISPATCHED_METHOD, params); } } } public void closeRpcSession(String participantPrivateId) { this.rpcNotificationService.closeRpcSession(participantPrivateId); } public void setRecordingStarted(String sessionId, Recording recording) { this.recordingsStarted.put(sessionId, recording); } private Set filterParticipantsByRole(OpenViduRole[] roles, Set participants) { return participants.stream().filter(part -> { if (ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(part.getParticipantPublicId())) { return false; } boolean isRole = false; for (OpenViduRole role : roles) { isRole = role.equals(part.getToken().getRole()); if (isRole) break; } return isRole; }).collect(Collectors.toSet()); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java deleted file mode 100644 index ee38a314..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/AutodiscoveryKurentoClientProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento; - -import org.kurento.client.KurentoClient; -import org.kurento.client.Properties; - -import io.openvidu.client.OpenViduException; - -public class AutodiscoveryKurentoClientProvider implements KurentoClientProvider { - - private static final int ROOM_PIPELINE_LOAD_POINTS = 50; - - @Override - public KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException { - - return KurentoClient.create(Properties.of("loadPoints", ROOM_PIPELINE_LOAD_POINTS)); - - } - - @Override - public boolean destroyWhenUnused() { - return true; - } -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java deleted file mode 100644 index 2172fe87..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento; - -import org.kurento.client.KurentoClient; - -import io.openvidu.client.OpenViduException; - -/** - * This service interface was designed so that the room manager could obtain a {@link KurentoClient} - * instance at any time, without requiring knowledge about the placement of the media server - * instances. It is left for the developer to provide an implementation for this API. - * - * @author Pablo Fuente (pablofuenteperez@gmail.com) - */ -public interface KurentoClientProvider { - - /** - * Obtains a {@link KurentoClient} instance given the custom session bean. Normally, it'd be - * called during a room's instantiation. - * - * @param sessionInfo - * custom information object required by the implementors of this interface - * @return the {@link KurentoClient} instance - * @throws OpenViduException - * in case there is an error obtaining a {@link KurentoClient} instance - */ - KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException; - - boolean destroyWhenUnused(); -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java deleted file mode 100644 index a5ca9b66..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/OpenViduKurentoClientSessionInfo.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento; - -/** - * Implementation of the session info interface, contains a participant's - * private id and the session's id. - * - * @author Pablo Fuente (pablofuenteperez@gmail.com) - * - */ -public class OpenViduKurentoClientSessionInfo implements KurentoClientSessionInfo { - - private String participantPrivateId; - private String sessionId; - - public OpenViduKurentoClientSessionInfo(String participantPrivateId, String roomName) { - super(); - this.participantPrivateId = participantPrivateId; - this.sessionId = roomName; - } - - public String getParticipantPrivateId() { - return participantPrivateId; - } - - public void setParticipantPrivateId(String participantPrivateId) { - this.participantPrivateId = participantPrivateId; - } - - @Override - public String getRoomName() { - return sessionId; - } - - public void setSessionId(String sessionId) { - this.sessionId = sessionId; - } -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java index 13abb43f..cb3d57b5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoMediaOptions.java @@ -1,48 +1,48 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.core; import org.kurento.client.MediaElement; import org.kurento.client.MediaType; import io.openvidu.server.core.MediaOptions; -import io.openvidu.server.kurento.KurentoFilter; +import io.openvidu.server.kurento.endpoint.KurentoFilter; public class KurentoMediaOptions extends MediaOptions { public boolean isOffer; public String sdpOffer; public boolean doLoopback; public MediaElement loopbackAlternativeSrc; public MediaType loopbackConnectionType; public MediaElement[] mediaElements; public KurentoMediaOptions(boolean isOffer, String sdpOffer, MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType, Boolean hasAudio, Boolean hasVideo, Boolean audioActive, Boolean videoActive, String typeOfVideo, Integer frameRate, String videoDimensions, KurentoFilter filter, boolean doLoopback, MediaElement... mediaElements) { super(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter); this.isOffer = isOffer; this.sdpOffer = sdpOffer; this.loopbackAlternativeSrc = loopbackAlternativeSrc; this.loopbackConnectionType = loopbackConnectionType; this.doLoopback = doLoopback; this.mediaElements = mediaElements; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java index e4c7c013..fdc7692b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoParticipant.java @@ -1,483 +1,482 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.core; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; import org.kurento.client.Filter; import org.kurento.client.IceCandidate; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaType; import org.kurento.client.SdpEndpoint; import org.kurento.client.internal.server.KurentoServerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; import io.openvidu.java.client.OpenViduRole; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.kurento.endpoint.MediaEndpoint; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SdpType; import io.openvidu.server.kurento.endpoint.SubscriberEndpoint; import io.openvidu.server.recording.service.RecordingManager; public class KurentoParticipant extends Participant { private static final Logger log = LoggerFactory.getLogger(KurentoParticipant.class); private OpenviduConfig openviduConfig; private RecordingManager recordingManager; private boolean webParticipant = true; private final KurentoSession session; private KurentoParticipantEndpointConfig endpointConfig; private PublisherEndpoint publisher; private CountDownLatch endPointLatch = new CountDownLatch(1); private final ConcurrentMap filters = new ConcurrentHashMap<>(); private final ConcurrentMap subscribers = new ConcurrentHashMap(); public KurentoParticipant(Participant participant, KurentoSession kurentoSession, KurentoParticipantEndpointConfig endpointConfig, OpenviduConfig openviduConfig, RecordingManager recordingManager) { super(participant.getFinalUserId(), participant.getParticipantPrivateId(), participant.getParticipantPublicId(), kurentoSession.getSessionId(), participant.getToken(), participant.getClientMetadata(), participant.getLocation(), participant.getPlatform(), participant.getCreatedAt()); this.endpointConfig = endpointConfig; this.openviduConfig = openviduConfig; this.recordingManager = recordingManager; this.session = kurentoSession; if (!OpenViduRole.SUBSCRIBER.equals(participant.getToken().getRole())) { this.publisher = new PublisherEndpoint(webParticipant, this, participant.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig); } for (Participant other : session.getParticipants()) { if (!other.getParticipantPublicId().equals(this.getParticipantPublicId())) { getNewOrExistingSubscriber(other.getParticipantPublicId()); } } } public void createPublishingEndpoint(MediaOptions mediaOptions) { publisher.createEndpoint(endPointLatch); if (getPublisher().getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create publisher endpoint"); } publisher.setMediaOptions(mediaOptions); String publisherStreamId = this.getParticipantPublicId() + "_" + (mediaOptions.hasVideo() ? mediaOptions.getTypeOfVideo() : "MICRO") + "_" + RandomStringUtils.random(5, true, false).toUpperCase(); this.publisher.setEndpointName(publisherStreamId); this.publisher.getEndpoint().setName(publisherStreamId); this.publisher.setStreamId(publisherStreamId); endpointConfig.addEndpointListeners(this.publisher, "publisher"); // Remove streamId from publisher's map this.session.publishedStreamIds.putIfAbsent(this.getPublisherStreamId(), this.getParticipantPrivateId()); } public synchronized Filter getFilterElement(String id) { return filters.get(id); } public synchronized void removeFilterElement(String id) { Filter filter = getFilterElement(id); filters.remove(id); if (filter != null) { publisher.revert(filter); } } public synchronized void releaseAllFilters() { // Check this, mutable array? filters.forEach((s, filter) -> removeFilterElement(s)); if (this.publisher != null && this.publisher.getFilter() != null) { this.publisher.revert(this.publisher.getFilter()); } } public PublisherEndpoint getPublisher() { try { if (!endPointLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Timeout reached while waiting for publisher endpoint to be ready"); } } catch (InterruptedException e) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Interrupted while waiting for publisher endpoint to be ready: " + e.getMessage()); } return this.publisher; } public MediaOptions getPublisherMediaOptions() { return this.publisher.getMediaOptions(); } public void setPublisherMediaOptions(MediaOptions mediaOptions) { this.publisher.setMediaOptions(mediaOptions); } public KurentoSession getSession() { return session; } public String publishToRoom(SdpType sdpType, String sdpString, boolean doLoopback, MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { log.info("PARTICIPANT {}: Request to publish video in room {} (sdp type {})", this.getParticipantPublicId(), this.session.getSessionId(), sdpType); log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpString); String sdpResponse = this.getPublisher().publish(sdpType, sdpString, doLoopback, loopbackAlternativeSrc, loopbackConnectionType); this.streaming = true; log.trace("PARTICIPANT {}: Publishing Sdp ({}) is {}", this.getParticipantPublicId(), sdpType, sdpResponse); log.info("PARTICIPANT {}: Is now publishing video in room {}", this.getParticipantPublicId(), this.session.getSessionId()); if (this.openviduConfig.isRecordingModuleEnabled() && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { this.recordingManager.startOneIndividualStreamRecording(session, null, null, this); } endpointConfig.getCdr().recordNewPublisher(this, session.getSessionId(), publisher.getStreamId(), publisher.getMediaOptions(), publisher.createdAt()); return sdpResponse; } public void unpublishMedia(EndReason reason) { log.info("PARTICIPANT {}: unpublishing media stream from room {}", this.getParticipantPublicId(), this.session.getSessionId()); releasePublisherEndpoint(reason); this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.getPipeline(), this.openviduConfig); log.info("PARTICIPANT {}: released publisher endpoint and left it initialized (ready for future streaming)", this.getParticipantPublicId()); } public String receiveMediaFrom(Participant sender, String sdpOffer) { final String senderName = sender.getParticipantPublicId(); log.info("PARTICIPANT {}: Request to receive media from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); log.trace("PARTICIPANT {}: SdpOffer for {} is {}", this.getParticipantPublicId(), senderName, sdpOffer); if (senderName.equals(this.getParticipantPublicId())) { log.warn("PARTICIPANT {}: trying to configure loopback by subscribing", this.getParticipantPublicId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Can loopback only when publishing media"); } KurentoParticipant kSender = (KurentoParticipant) sender; if (kSender.getPublisher() == null) { log.warn("PARTICIPANT {}: Trying to connect to a user without " + "a publishing endpoint", this.getParticipantPublicId()); return null; } log.debug("PARTICIPANT {}: Creating a subscriber endpoint to user {}", this.getParticipantPublicId(), senderName); SubscriberEndpoint subscriber = getNewOrExistingSubscriber(senderName); try { CountDownLatch subscriberLatch = new CountDownLatch(1); SdpEndpoint oldMediaEndpoint = subscriber.createEndpoint(subscriberLatch); try { if (!subscriberLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS)) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Timeout reached when creating subscriber endpoint"); } } catch (InterruptedException e) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Interrupted when creating subscriber endpoint: " + e.getMessage()); } if (oldMediaEndpoint != null) { log.warn( "PARTICIPANT {}: Two threads are trying to create at " + "the same time a subscriber endpoint for user {}", this.getParticipantPublicId(), senderName); return null; } if (subscriber.getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Unable to create subscriber endpoint"); } String subscriberEndpointName = this.getParticipantPublicId() + "_" + kSender.getPublisherStreamId(); subscriber.setEndpointName(subscriberEndpointName); subscriber.getEndpoint().setName(subscriberEndpointName); subscriber.setStreamId(kSender.getPublisherStreamId()); endpointConfig.addEndpointListeners(subscriber, "subscriber"); } catch (OpenViduException e) { this.subscribers.remove(senderName); throw e; } log.debug("PARTICIPANT {}: Created subscriber endpoint for user {}", this.getParticipantPublicId(), senderName); try { String sdpAnswer = subscriber.subscribe(sdpOffer, kSender.getPublisher()); log.trace("PARTICIPANT {}: Subscribing SdpAnswer is {}", this.getParticipantPublicId(), sdpAnswer); log.info("PARTICIPANT {}: Is now receiving video from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { endpointConfig.getCdr().recordNewSubscriber(this, this.session.getSessionId(), sender.getPublisherStreamId(), sender.getParticipantPublicId(), subscriber.createdAt()); } return sdpAnswer; } catch (KurentoServerException e) { // TODO Check object status when KurentoClient sets this info in the object if (e.getCode() == 40101) { log.warn("Publisher endpoint was already released when trying " + "to connect a subscriber endpoint to it", e); } else { log.error("Exception connecting subscriber endpoint " + "to publisher endpoint", e); } this.subscribers.remove(senderName); releaseSubscriberEndpoint(senderName, subscriber, null); } return null; } public void cancelReceivingMedia(String senderName, EndReason reason) { log.info("PARTICIPANT {}: cancel receiving media from {}", this.getParticipantPublicId(), senderName); SubscriberEndpoint subscriberEndpoint = subscribers.remove(senderName); if (subscriberEndpoint == null || subscriberEndpoint.getEndpoint() == null) { log.warn("PARTICIPANT {}: Trying to cancel receiving video from user {}. " + "But there is no such subscriber endpoint.", this.getParticipantPublicId(), senderName); } else { releaseSubscriberEndpoint(senderName, subscriberEndpoint, reason); log.info("PARTICIPANT {}: stopped receiving media from {} in room {}", this.getParticipantPublicId(), senderName, this.session.getSessionId()); } } public void close(EndReason reason, boolean definitelyClosed) { log.debug("PARTICIPANT {}: Closing user", this.getParticipantPublicId()); if (isClosed()) { log.warn("PARTICIPANT {}: Already closed", this.getParticipantPublicId()); return; } this.closed = definitelyClosed; for (String remoteParticipantName : subscribers.keySet()) { SubscriberEndpoint subscriber = this.subscribers.get(remoteParticipantName); if (subscriber != null && subscriber.getEndpoint() != null) { releaseSubscriberEndpoint(remoteParticipantName, subscriber, reason); log.debug("PARTICIPANT {}: Released subscriber endpoint to {}", this.getParticipantPublicId(), remoteParticipantName); } else { log.warn( "PARTICIPANT {}: Trying to close subscriber endpoint to {}. " + "But the endpoint was never instantiated.", this.getParticipantPublicId(), remoteParticipantName); } } this.subscribers.clear(); releasePublisherEndpoint(reason); } /** * Returns a {@link SubscriberEndpoint} for the given participant public id. The * endpoint is created if not found. * * @param remotePublicId id of another user * @return the endpoint instance */ public SubscriberEndpoint getNewOrExistingSubscriber(String senderPublicId) { SubscriberEndpoint subscriberEndpoint = new SubscriberEndpoint(webParticipant, this, senderPublicId, this.getPipeline(), this.openviduConfig); SubscriberEndpoint existingSendingEndpoint = this.subscribers.putIfAbsent(senderPublicId, subscriberEndpoint); if (existingSendingEndpoint != null) { subscriberEndpoint = existingSendingEndpoint; log.trace("PARTICIPANT {}: Already exists a subscriber endpoint to user {}", this.getParticipantPublicId(), senderPublicId); } else { log.debug("PARTICIPANT {}: New subscriber endpoint to user {}", this.getParticipantPublicId(), senderPublicId); } return subscriberEndpoint; } public void addIceCandidate(String endpointName, IceCandidate iceCandidate) { if (this.getParticipantPublicId().equals(endpointName)) { this.publisher.addIceCandidate(iceCandidate); } else { this.getNewOrExistingSubscriber(endpointName).addIceCandidate(iceCandidate); } } public void sendIceCandidate(String senderPublicId, String endpointName, IceCandidate candidate) { session.sendIceCandidate(this.getParticipantPrivateId(), senderPublicId, endpointName, candidate); } public void sendMediaError(ErrorEvent event) { String desc = event.getType() + ": " + event.getDescription() + "(errCode=" + event.getErrorCode() + ")"; log.warn("PARTICIPANT {}: Media error encountered: {}", getParticipantPublicId(), desc); session.sendMediaError(this.getParticipantPrivateId(), desc); } private void releasePublisherEndpoint(EndReason reason) { if (publisher != null && publisher.getEndpoint() != null) { // Remove streamId from publisher's map this.session.publishedStreamIds.remove(this.getPublisherStreamId()); if (this.openviduConfig.isRecordingModuleEnabled() && this.recordingManager.sessionIsBeingRecorded(session.getSessionId())) { - this.recordingManager.stopOneIndividualStreamRecording(session.getSessionId(), - this.getPublisherStreamId(), false); + this.recordingManager.stopOneIndividualStreamRecording(session, this.getPublisherStreamId()); } publisher.unregisterErrorListeners(); if (publisher.kmsWebrtcStatsThread != null) { publisher.kmsWebrtcStatsThread.cancel(true); } for (MediaElement el : publisher.getMediaElements()) { releaseElement(getParticipantPublicId(), el); } releaseElement(getParticipantPublicId(), publisher.getEndpoint()); this.streaming = false; this.session.deregisterPublisher(); endpointConfig.getCdr().stopPublisher(this.getParticipantPublicId(), publisher.getStreamId(), reason); publisher = null; } else { log.warn("PARTICIPANT {}: Trying to release publisher endpoint but is null", getParticipantPublicId()); } } private void releaseSubscriberEndpoint(String senderName, SubscriberEndpoint subscriber, EndReason reason) { if (subscriber != null) { subscriber.unregisterErrorListeners(); if (subscriber.kmsWebrtcStatsThread != null) { subscriber.kmsWebrtcStatsThread.cancel(true); } releaseElement(senderName, subscriber.getEndpoint()); if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(this.getParticipantPublicId())) { endpointConfig.getCdr().stopSubscriber(this.getParticipantPublicId(), senderName, subscriber.getStreamId(), reason); } } else { log.warn("PARTICIPANT {}: Trying to release subscriber endpoint for '{}' but is null", this.getParticipantPublicId(), senderName); } } private void releaseElement(final String senderName, final MediaElement element) { final String eid = element.getId(); try { element.release(new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.debug("PARTICIPANT {}: Released successfully media element #{} for {}", getParticipantPublicId(), eid, senderName); } @Override public void onError(Throwable cause) throws Exception { log.warn("PARTICIPANT {}: Could not release media element #{} for {}", getParticipantPublicId(), eid, senderName, cause); } }); } catch (Exception e) { log.error("PARTICIPANT {}: Error calling release on elem #{} for {}", getParticipantPublicId(), eid, senderName, e); } } public MediaPipeline getPipeline() { return this.session.getPipeline(); } @Override public String getPublisherStreamId() { return this.publisher.getStreamId(); } public void resetPublisherEndpoint() { log.info("Reseting publisher endpoint for participant {}", this.getParticipantPublicId()); this.publisher = new PublisherEndpoint(webParticipant, this, this.getParticipantPublicId(), this.session.getPipeline(), this.openviduConfig); } @Override public JsonObject toJson() { return this.sharedJson(MediaEndpoint::toJson); } public JsonObject withStatsToJson() { return this.sharedJson(MediaEndpoint::withStatsToJson); } private JsonObject sharedJson(Function toJsonFunction) { JsonObject json = super.toJson(); JsonArray publisherEnpoints = new JsonArray(); if (this.streaming && this.publisher.getEndpoint() != null) { publisherEnpoints.add(toJsonFunction.apply(this.publisher)); } JsonArray subscriberEndpoints = new JsonArray(); for (MediaEndpoint sub : this.subscribers.values()) { if (sub.getEndpoint() != null) { subscriberEndpoints.add(toJsonFunction.apply(sub)); } } json.add("publishers", publisherEnpoints); json.add("subscribers", subscriberEndpoints); return json; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java index 87cff7bb..8871619b 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSession.java @@ -1,325 +1,328 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.core; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.IceCandidate; -import org.kurento.client.KurentoClient; import org.kurento.client.MediaPipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; import io.openvidu.java.client.OpenViduRole; import io.openvidu.java.client.Recording.OutputMode; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; +import io.openvidu.server.kurento.kms.Kms; import io.openvidu.server.recording.Recording; /** * @author Pablo Fuente (pablofuenteperez@gmail.com) */ public class KurentoSession extends Session { private final static Logger log = LoggerFactory.getLogger(Session.class); public static final int ASYNC_LATCH_TIMEOUT = 30; private MediaPipeline pipeline; private CountDownLatch pipelineLatch = new CountDownLatch(1); - private KurentoClient kurentoClient; + private Kms kms; private KurentoSessionEventsHandler kurentoSessionHandler; private KurentoParticipantEndpointConfig kurentoEndpointConfig; private final ConcurrentHashMap filterStates = new ConcurrentHashMap<>(); private Object pipelineCreateLock = new Object(); private Object pipelineReleaseLock = new Object(); private boolean destroyKurentoClient; public final ConcurrentHashMap publishedStreamIds = new ConcurrentHashMap<>(); - public KurentoSession(Session sessionNotActive, KurentoClient kurentoClient, - KurentoSessionEventsHandler kurentoSessionHandler, KurentoParticipantEndpointConfig kurentoEndpointConfig, - boolean destroyKurentoClient) { + public KurentoSession(Session sessionNotActive, Kms kms, KurentoSessionEventsHandler kurentoSessionHandler, + KurentoParticipantEndpointConfig kurentoEndpointConfig, boolean destroyKurentoClient) { super(sessionNotActive); - this.kurentoClient = kurentoClient; + this.kms = kms; this.destroyKurentoClient = destroyKurentoClient; this.kurentoSessionHandler = kurentoSessionHandler; this.kurentoEndpointConfig = kurentoEndpointConfig; log.debug("New SESSION instance with id '{}'", sessionId); } @Override public void join(Participant participant) { checkClosed(); createPipeline(); KurentoParticipant kurentoParticipant = new KurentoParticipant(participant, this, this.kurentoEndpointConfig, this.openviduConfig, this.recordingManager); participants.put(participant.getParticipantPrivateId(), kurentoParticipant); filterStates.forEach((filterId, state) -> { log.info("Adding filter {}", filterId); kurentoSessionHandler.updateFilter(sessionId, participant, filterId, state); }); log.info("SESSION {}: Added participant {}", sessionId, participant); if (!ProtocolElements.RECORDER_PARTICIPANT_PUBLICID.equals(participant.getParticipantPublicId())) { kurentoEndpointConfig.getCdr().recordParticipantJoined(participant, sessionId); } } public void newPublisher(Participant participant) { registerPublisher(); // pre-load endpoints to recv video from the new publisher for (Participant p : participants.values()) { if (participant.equals(p)) { continue; } ((KurentoParticipant) p).getNewOrExistingSubscriber(participant.getParticipantPublicId()); } log.debug("SESSION {}: Virtually subscribed other participants {} to new publisher {}", sessionId, participants.values(), participant.getParticipantPublicId()); } public void cancelPublisher(Participant participant, EndReason reason) { // Cancel all subscribers for this publisher for (Participant subscriber : participants.values()) { if (participant.equals(subscriber)) { continue; } ((KurentoParticipant) subscriber).cancelReceivingMedia(participant.getParticipantPublicId(), reason); } log.debug("SESSION {}: Unsubscribed other participants {} from the publisher {}", sessionId, participants.values(), participant.getParticipantPublicId()); } @Override public void leave(String participantPrivateId, EndReason reason) throws OpenViduException { checkClosed(); KurentoParticipant participant = (KurentoParticipant) participants.get(participantPrivateId); if (participant == null) { throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Participant with private id " + participantPrivateId + " not found in session '" + sessionId + "'"); } participant.releaseAllFilters(); log.info("PARTICIPANT {}: Leaving session {}", participant.getParticipantPublicId(), this.sessionId); this.removeParticipant(participant, reason); participant.close(reason, true); } @Override public boolean close(EndReason reason) { if (!closed) { for (Participant participant : participants.values()) { ((KurentoParticipant) participant).releaseAllFilters(); ((KurentoParticipant) participant).close(reason, true); } participants.clear(); closePipeline(null); log.debug("Session {} closed", this.sessionId); if (destroyKurentoClient) { - kurentoClient.destroy(); + kms.getKurentoClient().destroy(); } this.closed = true; return true; } else { log.warn("Closing an already closed session '{}'", this.sessionId); return false; } } public void sendIceCandidate(String participantPrivateId, String senderPublicId, String endpointName, IceCandidate candidate) { this.kurentoSessionHandler.onIceCandidate(sessionId, participantPrivateId, senderPublicId, endpointName, candidate); } public void sendMediaError(String participantId, String description) { this.kurentoSessionHandler.onMediaElementError(sessionId, participantId, description); } private void removeParticipant(Participant participant, EndReason reason) { checkClosed(); participants.remove(participant.getParticipantPrivateId()); log.debug("SESSION {}: Cancel receiving media from participant '{}' for other participant", this.sessionId, participant.getParticipantPublicId()); for (Participant other : participants.values()) { ((KurentoParticipant) other).cancelReceivingMedia(participant.getParticipantPublicId(), reason); } } + public Kms getKms() { + return this.kms; + } + public MediaPipeline getPipeline() { try { pipelineLatch.await(KurentoSession.ASYNC_LATCH_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } return this.pipeline; } private void createPipeline() { synchronized (pipelineCreateLock) { if (pipeline != null) { return; } log.info("SESSION {}: Creating MediaPipeline", sessionId); try { - kurentoClient.createMediaPipeline(new Continuation() { + kms.getKurentoClient().createMediaPipeline(new Continuation() { @Override public void onSuccess(MediaPipeline result) throws Exception { pipeline = result; pipelineLatch.countDown(); log.debug("SESSION {}: Created MediaPipeline", sessionId); } @Override public void onError(Throwable cause) throws Exception { pipelineLatch.countDown(); log.error("SESSION {}: Failed to create MediaPipeline", sessionId, cause); } }); } catch (Exception e) { log.error("Unable to create media pipeline for session '{}'", sessionId, e); pipelineLatch.countDown(); } if (getPipeline() == null) { throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, "Unable to create media pipeline for session '" + sessionId + "'"); } pipeline.addErrorListener(new EventListener() { @Override public void onEvent(ErrorEvent event) { String desc = event.getType() + ": " + event.getDescription() + "(errCode=" + event.getErrorCode() + ")"; log.warn("SESSION {}: Pipeline error encountered: {}", sessionId, desc); kurentoSessionHandler.onPipelineError(sessionId, getParticipants(), desc); } }); } } private void closePipeline(Runnable callback) { synchronized (pipelineReleaseLock) { if (pipeline == null) { return; } getPipeline().release(new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.debug("SESSION {}: Released Pipeline", sessionId); pipeline = null; pipelineLatch = new CountDownLatch(1); if (callback != null) { callback.run(); } } @Override public void onError(Throwable cause) throws Exception { log.warn("SESSION {}: Could not successfully release Pipeline", sessionId, cause); pipeline = null; pipelineLatch = new CountDownLatch(1); if (callback != null) { callback.run(); } } }); } } public String getParticipantPrivateIdFromStreamId(String streamId) { return this.publishedStreamIds.get(streamId); } public void restartStatusInKurento() { log.info("Reseting remote media objects for active session {}", this.sessionId); // Stop recording if session is being recorded if (recordingManager.sessionIsBeingRecorded(this.sessionId)) { Recording stoppedRecording = this.recordingManager.forceStopRecording(this, EndReason.mediaServerDisconnect); if (OutputMode.COMPOSED.equals(stoppedRecording.getOutputMode()) && stoppedRecording.hasVideo()) { recordingManager.getSessionManager().evictParticipant( this.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID), null, null, null); } } // Close all MediaEndpoints of participants this.getParticipants().forEach(p -> { KurentoParticipant kParticipant = (KurentoParticipant) p; final boolean wasStreaming = kParticipant.isStreaming(); kParticipant.releaseAllFilters(); kParticipant.close(EndReason.mediaServerDisconnect, false); if (wasStreaming) { kurentoSessionHandler.onUnpublishMedia(kParticipant, this.getParticipants(), null, null, null, EndReason.mediaServerDisconnect); } }); // Release pipeline, create a new one and prepare new PublisherEndpoints for // allowed users this.closePipeline(() -> { createPipeline(); try { if (!pipelineLatch.await(20, TimeUnit.SECONDS)) { throw new Exception("MediaPipleine was not created in 20 seconds"); } getParticipants().forEach(p -> { if (!OpenViduRole.SUBSCRIBER.equals(p.getToken().getRole())) { ((KurentoParticipant) p).resetPublisherEndpoint(); } }); } catch (Exception e) { log.error("Error waiting to new MediaPipeline on KurentoSession restart: {}", e.getMessage()); } }); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java index 044c4f1f..02feebf5 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/core/KurentoSessionManager.java @@ -1,913 +1,900 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.core; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.kurento.client.GenericMediaElement; import org.kurento.client.IceCandidate; -import org.kurento.client.KurentoClient; import org.kurento.client.ListenerSubscription; import org.kurento.jsonrpc.Props; import org.kurento.jsonrpc.message.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; import io.openvidu.java.client.MediaMode; import io.openvidu.java.client.RecordingLayout; import io.openvidu.java.client.RecordingMode; import io.openvidu.java.client.RecordingProperties; import io.openvidu.java.client.SessionProperties; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.MediaOptions; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; -import io.openvidu.server.kurento.KurentoFilter; -import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; +import io.openvidu.server.kurento.endpoint.KurentoFilter; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; import io.openvidu.server.kurento.endpoint.SdpType; +import io.openvidu.server.kurento.kms.Kms; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.utils.JsonUtils; public class KurentoSessionManager extends SessionManager { private static final Logger log = LoggerFactory.getLogger(KurentoSessionManager.class); @Autowired - private KurentoClientProvider kcProvider; + private KmsManager kmsManager; @Autowired private KurentoSessionEventsHandler kurentoSessionEventsHandler; @Autowired private KurentoParticipantEndpointConfig kurentoEndpointConfig; - private KurentoClient kurentoClient; - @Override public synchronized void joinRoom(Participant participant, String sessionId, Integer transactionId) { Set existingParticipants = null; try { - KurentoClientSessionInfo kcSessionInfo = new OpenViduKurentoClientSessionInfo( - participant.getParticipantPrivateId(), sessionId); KurentoSession kSession = (KurentoSession) sessions.get(sessionId); - if (kSession == null && kcSessionInfo != null) { + if (kSession == null) { // First user connecting to the session Session sessionNotActive = sessionsNotActive.remove(sessionId); if (sessionNotActive == null && this.isInsecureParticipant(participant.getParticipantPrivateId())) { // Insecure user directly call joinRoom RPC method, without REST API use sessionNotActive = new Session(sessionId, new SessionProperties.Builder().mediaMode(MediaMode.ROUTED) .recordingMode(RecordingMode.ALWAYS) .defaultRecordingLayout(RecordingLayout.BEST_FIT).build(), openviduConfig, recordingManager); } - createSession(sessionNotActive, kcSessionInfo); - } - kSession = (KurentoSession) sessions.get(sessionId); - if (kSession == null) { - log.warn("Session '{}' not found"); - throw new OpenViduException(Code.ROOM_NOT_FOUND_ERROR_CODE, "Session '" + sessionId - + "' was not found, must be created before '" + sessionId + "' can join"); + Kms lessLoadedKms = this.kmsManager.getLessLoadedKms(); + log.info("KMS less loaded is {} with a load of {}", lessLoadedKms.getUri(), lessLoadedKms.getLoad()); + kSession = createSession(sessionNotActive, lessLoadedKms); } + if (kSession.isClosed()) { log.warn("'{}' is trying to join session '{}' but it is closing", participant.getParticipantPublicId(), sessionId); throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId() + "' is trying to join session '" + sessionId + "' but it is closing"); } + existingParticipants = getParticipants(sessionId); kSession.join(participant); } catch (OpenViduException e) { log.warn("PARTICIPANT {}: Error joining/creating session {}", participant.getParticipantPublicId(), sessionId, e); sessionEventsHandler.onParticipantJoined(participant, sessionId, null, transactionId, e); } if (existingParticipants != null) { sessionEventsHandler.onParticipantJoined(participant, sessionId, existingParticipants, transactionId, null); } } @Override public synchronized void leaveRoom(Participant participant, Integer transactionId, EndReason reason, boolean closeWebSocket) { log.debug("Request [LEAVE_ROOM] ({})", participant.getParticipantPublicId()); KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoSession session = kParticipant.getSession(); String sessionId = session.getSessionId(); if (session.isClosed()) { log.warn("'{}' is trying to leave from session '{}' but it is closing", participant.getParticipantPublicId(), sessionId); throw new OpenViduException(Code.ROOM_CLOSED_ERROR_CODE, "'" + participant.getParticipantPublicId() + "' is trying to leave from session '" + sessionId + "' but it is closing"); } session.leave(participant.getParticipantPrivateId(), reason); // Update control data structures if (sessionidParticipantpublicidParticipant.get(sessionId) != null) { Participant p = sessionidParticipantpublicidParticipant.get(sessionId) .remove(participant.getParticipantPublicId()); if (this.coturnCredentialsService.isCoturnAvailable()) { this.coturnCredentialsService.deleteUser(p.getToken().getTurnCredentials().getUsername()); } if (sessionidTokenTokenobj.get(sessionId) != null) { sessionidTokenTokenobj.get(sessionId).remove(p.getToken().getToken()); } boolean stillParticipant = false; for (Session s : sessions.values()) { if (s.getParticipantByPrivateId(p.getParticipantPrivateId()) != null) { stillParticipant = true; break; } } if (!stillParticipant) { insecureUsers.remove(p.getParticipantPrivateId()); } } showTokens(); // Close Session if no more participants Set remainingParticipants = null; try { remainingParticipants = getParticipants(sessionId); } catch (OpenViduException e) { log.info("Possible collision when closing the session '{}' (not found)", sessionId); remainingParticipants = Collections.emptySet(); } sessionEventsHandler.onParticipantLeft(participant, sessionId, remainingParticipants, transactionId, null, reason); if (!EndReason.sessionClosedByServer.equals(reason)) { // If session is closed by a call to "DELETE /api/sessions" do NOT stop the // recording. Will be stopped after in method // "SessionManager.closeSessionAndEmptyCollections" if (remainingParticipants.isEmpty()) { if (openviduConfig.isRecordingModuleEnabled() && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) && (this.recordingManager.sessionIsBeingRecorded(sessionId))) { // Start countdown to stop recording. Will be aborted if a Publisher starts // before timeout log.info( "Last participant left. Starting {} seconds countdown for stopping recording of session {}", this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); recordingManager.initAutomaticRecordingStopThread(session); } else { log.info("No more participants in session '{}', removing it and closing it", sessionId); this.closeSessionAndEmptyCollections(session, reason); showTokens(); } } else if (remainingParticipants.size() == 1 && openviduConfig.isRecordingModuleEnabled() && MediaMode.ROUTED.equals(session.getSessionProperties().mediaMode()) && this.recordingManager.sessionIsBeingRecorded(sessionId) && ProtocolElements.RECORDER_PARTICIPANT_PUBLICID .equals(remainingParticipants.iterator().next().getParticipantPublicId())) { // Start countdown log.info("Last participant left. Starting {} seconds countdown for stopping recording of session {}", this.openviduConfig.getOpenviduRecordingAutostopTimeout(), sessionId); recordingManager.initAutomaticRecordingStopThread(session); } } // Finally close websocket session if required if (closeWebSocket) { sessionEventsHandler.closeRpcSession(participant.getParticipantPrivateId()); } } /** * Represents a client's request to start streaming her local media to anyone * inside the room. The media elements should have been created using the same * pipeline as the publisher's. The streaming media endpoint situated on the * server can be connected to itself thus realizing what is known as a loopback * connection. The loopback is performed after applying all additional media * elements specified as parameters (in the same order as they appear in the * params list). *

*
* Dev advice: Send notifications to the existing participants * in the room to inform about the new stream that has been published. Answer to * the peer's request by sending it the SDP response (answer or updated offer) * generated by the WebRTC endpoint on the server. * * @param participant Participant publishing video * @param MediaOptions configuration of the stream to publish * @param transactionId identifier of the Transaction * @throws OpenViduException on error */ @Override public void publishVideo(Participant participant, MediaOptions mediaOptions, Integer transactionId) throws OpenViduException { Set participants = null; String sdpAnswer = null; KurentoMediaOptions kurentoOptions = (KurentoMediaOptions) mediaOptions; KurentoParticipant kParticipant = (KurentoParticipant) participant; log.debug( "Request [PUBLISH_MEDIA] isOffer={} sdp={} " + "loopbackAltSrc={} lpbkConnType={} doLoopback={} mediaElements={} ({})", kurentoOptions.isOffer, kurentoOptions.sdpOffer, kurentoOptions.loopbackAlternativeSrc, kurentoOptions.loopbackConnectionType, kurentoOptions.doLoopback, kurentoOptions.mediaElements, participant.getParticipantPublicId()); SdpType sdpType = kurentoOptions.isOffer ? SdpType.OFFER : SdpType.ANSWER; KurentoSession kSession = kParticipant.getSession(); kParticipant.createPublishingEndpoint(mediaOptions); /* * for (MediaElement elem : kurentoOptions.mediaElements) { * kurentoParticipant.getPublisher().apply(elem); } */ KurentoTokenOptions kurentoTokenOptions = participant.getToken().getKurentoTokenOptions(); if (kurentoOptions.getFilter() != null && kurentoTokenOptions != null) { if (kurentoTokenOptions.isFilterAllowed(kurentoOptions.getFilter().getType())) { this.applyFilterInPublisher(kParticipant, kurentoOptions.getFilter()); } else { OpenViduException e = new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "Error applying filter for publishing user " + participant.getParticipantPublicId() + ". The token has no permissions to apply filter " + kurentoOptions.getFilter().getType()); log.error("PARTICIPANT {}: Error applying filter. The token has no permissions to apply filter {}", participant.getParticipantPublicId(), kurentoOptions.getFilter().getType(), e); sessionEventsHandler.onPublishMedia(participant, null, kParticipant.getPublisher().createdAt(), kSession.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, e); throw e; } } sdpAnswer = kParticipant.publishToRoom(sdpType, kurentoOptions.sdpOffer, kurentoOptions.doLoopback, kurentoOptions.loopbackAlternativeSrc, kurentoOptions.loopbackConnectionType); if (sdpAnswer == null) { OpenViduException e = new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Error generating SDP response for publishing user " + participant.getParticipantPublicId()); log.error("PARTICIPANT {}: Error publishing media", participant.getParticipantPublicId(), e); sessionEventsHandler.onPublishMedia(participant, null, kParticipant.getPublisher().createdAt(), kSession.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, e); } if (this.openviduConfig.isRecordingModuleEnabled() && MediaMode.ROUTED.equals(kSession.getSessionProperties().mediaMode()) && kSession.getActivePublishers() == 0) { if (RecordingMode.ALWAYS.equals(kSession.getSessionProperties().recordingMode()) && !recordingManager.sessionIsBeingRecorded(kSession.getSessionId()) && !kSession.recordingManuallyStopped.get()) { // Start automatic recording for sessions configured with RecordingMode.ALWAYS new Thread(() -> { recordingManager.startRecording(kSession, new RecordingProperties.Builder().name("") .outputMode(kSession.getSessionProperties().defaultOutputMode()) .recordingLayout(kSession.getSessionProperties().defaultRecordingLayout()) .customLayout(kSession.getSessionProperties().defaultCustomLayout()).build()); }).start(); } else if (RecordingMode.MANUAL.equals(kSession.getSessionProperties().recordingMode()) && recordingManager.sessionIsBeingRecorded(kSession.getSessionId())) { // Abort automatic recording stop (user published before timeout) log.info("Participant {} published before timeout finished. Aborting automatic recording stop", participant.getParticipantPublicId()); boolean stopAborted = recordingManager.abortAutomaticRecordingStopThread(kSession); if (stopAborted) { log.info("Automatic recording stopped succesfully aborted"); } else { log.info("Automatic recording stopped couldn't be aborted. Recording of session {} has stopped", kSession.getSessionId()); } } } kSession.newPublisher(participant); participants = kParticipant.getSession().getParticipants(); if (sdpAnswer != null) { sessionEventsHandler.onPublishMedia(participant, participant.getPublisherStreamId(), kParticipant.getPublisher().createdAt(), kSession.getSessionId(), mediaOptions, sdpAnswer, participants, transactionId, null); } } @Override public void unpublishVideo(Participant participant, Participant moderator, Integer transactionId, EndReason reason) { try { KurentoParticipant kParticipant = (KurentoParticipant) participant; KurentoSession session = kParticipant.getSession(); log.debug("Request [UNPUBLISH_MEDIA] ({})", participant.getParticipantPublicId()); if (!participant.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to unpublish video of user {} " + "in session {} but user is not streaming media", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "Participant '" + participant.getParticipantPublicId() + "' is not streaming media"); } kParticipant.unpublishMedia(reason); session.cancelPublisher(participant, reason); Set participants = session.getParticipants(); sessionEventsHandler.onUnpublishMedia(participant, participants, moderator, transactionId, null, reason); } catch (OpenViduException e) { log.warn("PARTICIPANT {}: Error unpublishing media", participant.getParticipantPublicId(), e); sessionEventsHandler.onUnpublishMedia(participant, new HashSet<>(Arrays.asList(participant)), moderator, transactionId, e, null); } } @Override public void subscribe(Participant participant, String senderName, String sdpOffer, Integer transactionId) { String sdpAnswer = null; Session session = null; try { log.debug("Request [SUBSCRIBE] remoteParticipant={} sdpOffer={} ({})", senderName, sdpOffer, participant.getParticipantPublicId()); KurentoParticipant kParticipant = (KurentoParticipant) participant; session = ((KurentoParticipant) participant).getSession(); Participant senderParticipant = session.getParticipantByPublicId(senderName); if (senderParticipant == null) { log.warn( "PARTICIPANT {}: Requesting to recv media from user {} " + "in session {} but user could not be found", participant.getParticipantPublicId(), senderName, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "User '" + senderName + " not found in session '" + session.getSessionId() + "'"); } if (!senderParticipant.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to recv media from user {} " + "in session {} but user is not streaming media", participant.getParticipantPublicId(), senderName, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + senderName + " not streaming media in session '" + session.getSessionId() + "'"); } sdpAnswer = kParticipant.receiveMediaFrom(senderParticipant, sdpOffer); if (sdpAnswer == null) { throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Unable to generate SDP answer when subscribing '" + participant.getParticipantPublicId() + "' to '" + senderName + "'"); } } catch (OpenViduException e) { log.error("PARTICIPANT {}: Error subscribing to {}", participant.getParticipantPublicId(), senderName, e); sessionEventsHandler.onSubscribe(participant, session, null, transactionId, e); } if (sdpAnswer != null) { sessionEventsHandler.onSubscribe(participant, session, sdpAnswer, transactionId, null); } } @Override public void unsubscribe(Participant participant, String senderName, Integer transactionId) { log.debug("Request [UNSUBSCRIBE] remoteParticipant={} ({})", senderName, participant.getParticipantPublicId()); KurentoParticipant kParticipant = (KurentoParticipant) participant; Session session = ((KurentoParticipant) participant).getSession(); Participant sender = session.getParticipantByPublicId(senderName); if (sender == null) { log.warn( "PARTICIPANT {}: Requesting to unsubscribe from user {} " + "in session {} but user could not be found", participant.getParticipantPublicId(), senderName, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "User " + senderName + " not found in session " + session.getSessionId()); } kParticipant.cancelReceivingMedia(senderName, EndReason.unsubscribe); sessionEventsHandler.onUnsubscribe(participant, transactionId, null); } @Override public void sendMessage(Participant participant, String message, Integer transactionId) { try { JsonObject messageJson = new JsonParser().parse(message).getAsJsonObject(); KurentoParticipant kParticipant = (KurentoParticipant) participant; sessionEventsHandler.onSendMessage(participant, messageJson, getParticipants(kParticipant.getSession().getSessionId()), transactionId, null); } catch (JsonSyntaxException | IllegalStateException e) { throw new OpenViduException(Code.SIGNAL_FORMAT_INVALID_ERROR_CODE, "Provided signal object '" + message + "' has not a valid JSON format"); } } @Override public void streamPropertyChanged(Participant participant, Integer transactionId, String streamId, String property, JsonElement newValue, String reason) { KurentoParticipant kParticipant = (KurentoParticipant) participant; streamId = kParticipant.getPublisherStreamId(); MediaOptions streamProperties = kParticipant.getPublisherMediaOptions(); Boolean hasAudio = streamProperties.hasAudio(); Boolean hasVideo = streamProperties.hasVideo(); Boolean audioActive = streamProperties.isAudioActive(); Boolean videoActive = streamProperties.isVideoActive(); String typeOfVideo = streamProperties.getTypeOfVideo(); Integer frameRate = streamProperties.getFrameRate(); String videoDimensions = streamProperties.getVideoDimensions(); KurentoFilter filter = streamProperties.getFilter(); switch (property) { case "audioActive": audioActive = newValue.getAsBoolean(); break; case "videoActive": videoActive = newValue.getAsBoolean(); break; case "videoDimensions": videoDimensions = newValue.getAsString(); break; } kParticipant.setPublisherMediaOptions(new MediaOptions(hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, filter)); sessionEventsHandler.onStreamPropertyChanged(participant, transactionId, kParticipant.getSession().getParticipants(), streamId, property, newValue, reason); } @Override public void onIceCandidate(Participant participant, String endpointName, String candidate, int sdpMLineIndex, String sdpMid, Integer transactionId) { try { KurentoParticipant kParticipant = (KurentoParticipant) participant; log.debug("Request [ICE_CANDIDATE] endpoint={} candidate={} " + "sdpMLineIdx={} sdpMid={} ({})", endpointName, candidate, sdpMLineIndex, sdpMid, participant.getParticipantPublicId()); kParticipant.addIceCandidate(endpointName, new IceCandidate(candidate, sdpMid, sdpMLineIndex)); sessionEventsHandler.onRecvIceCandidate(participant, transactionId, null); } catch (OpenViduException e) { log.error("PARTICIPANT {}: Error receiving ICE " + "candidate (epName={}, candidate={})", participant.getParticipantPublicId(), endpointName, candidate, e); sessionEventsHandler.onRecvIceCandidate(participant, transactionId, e); } } /** - * Creates a session if it doesn't already exist. The session's id will be - * indicated by the session info bean. - * - * @param kcSessionInfo bean that will be passed to the - * {@link KurentoClientProvider} in order to obtain the - * {@link KurentoClient} that will be used by the room + * Creates a session with the already existing not-active session in the + * indicated KMS, if it doesn't already exist + * * @throws OpenViduException in case of error while creating the session */ - public void createSession(Session sessionNotActive, KurentoClientSessionInfo kcSessionInfo) - throws OpenViduException { - String sessionId = kcSessionInfo.getRoomName(); - KurentoSession session = (KurentoSession) sessions.get(sessionId); + public KurentoSession createSession(Session sessionNotActive, Kms kms) throws OpenViduException { + KurentoSession session = (KurentoSession) sessions.get(sessionNotActive.getSessionId()); if (session != null) { throw new OpenViduException(Code.ROOM_CANNOT_BE_CREATED_ERROR_CODE, - "Session '" + sessionId + "' already exists"); + "Session '" + session.getSessionId() + "' already exists"); } - this.kurentoClient = kcProvider.getKurentoClient(kcSessionInfo); - session = new KurentoSession(sessionNotActive, kurentoClient, kurentoSessionEventsHandler, - kurentoEndpointConfig, kcProvider.destroyWhenUnused()); + session = new KurentoSession(sessionNotActive, kms, kurentoSessionEventsHandler, kurentoEndpointConfig, + kmsManager.destroyWhenUnused()); - KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(sessionId, session); + KurentoSession oldSession = (KurentoSession) sessions.putIfAbsent(session.getSessionId(), session); if (oldSession != null) { - log.warn("Session '{}' has just been created by another thread", sessionId); - return; - } - String kcName = "[NAME NOT AVAILABLE]"; - if (kurentoClient.getServerManager() != null) { - kcName = kurentoClient.getServerManager().getName(); + log.warn("Session '{}' has just been created by another thread", session.getSessionId()); + return oldSession; } - log.warn("No session '{}' exists yet. Created one using KurentoClient '{}'.", sessionId, kcName); + log.warn("No session '{}' exists yet. Created one on KMS '{}'", session.getSessionId(), kms.getUri()); sessionEventsHandler.onSessionCreated(session); + return session; } @Override public void evictParticipant(Participant evictedParticipant, Participant moderator, Integer transactionId, EndReason reason) throws OpenViduException { if (evictedParticipant != null) { KurentoParticipant kParticipant = (KurentoParticipant) evictedParticipant; Set participants = kParticipant.getSession().getParticipants(); this.leaveRoom(kParticipant, null, reason, false); this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, participants, transactionId, null, reason); sessionEventsHandler.closeRpcSession(evictedParticipant.getParticipantPrivateId()); } else { if (moderator != null && transactionId != null) { this.sessionEventsHandler.onForceDisconnect(moderator, evictedParticipant, new HashSet<>(Arrays.asList(moderator)), transactionId, new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Connection not found when calling 'forceDisconnect'"), null); } } } @Override public KurentoMediaOptions generateMediaOptions(Request request) throws OpenViduException { String sdpOffer = RpcHandler.getStringParam(request, ProtocolElements.PUBLISHVIDEO_SDPOFFER_PARAM); boolean hasAudio = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_HASAUDIO_PARAM); boolean hasVideo = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_HASVIDEO_PARAM); Boolean audioActive = null, videoActive = null; String typeOfVideo = null, videoDimensions = null; Integer frameRate = null; KurentoFilter kurentoFilter = null; try { audioActive = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_AUDIOACTIVE_PARAM); } catch (RuntimeException noParameterFound) { } try { videoActive = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_VIDEOACTIVE_PARAM); } catch (RuntimeException noParameterFound) { } try { typeOfVideo = RpcHandler.getStringParam(request, ProtocolElements.PUBLISHVIDEO_TYPEOFVIDEO_PARAM); } catch (RuntimeException noParameterFound) { } try { videoDimensions = RpcHandler.getStringParam(request, ProtocolElements.PUBLISHVIDEO_VIDEODIMENSIONS_PARAM); } catch (RuntimeException noParameterFound) { } try { frameRate = RpcHandler.getIntParam(request, ProtocolElements.PUBLISHVIDEO_FRAMERATE_PARAM); } catch (RuntimeException noParameterFound) { } try { JsonObject kurentoFilterJson = (JsonObject) RpcHandler.getParam(request, ProtocolElements.PUBLISHVIDEO_KURENTOFILTER_PARAM); if (kurentoFilterJson != null) { try { kurentoFilter = new KurentoFilter(kurentoFilterJson.get("type").getAsString(), kurentoFilterJson.get("options").getAsJsonObject()); } catch (Exception e) { throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "'filter' parameter wrong:" + e.getMessage()); } } } catch (OpenViduException e) { throw e; } catch (RuntimeException noParameterFound) { } boolean doLoopback = RpcHandler.getBooleanParam(request, ProtocolElements.PUBLISHVIDEO_DOLOOPBACK_PARAM); return new KurentoMediaOptions(true, sdpOffer, null, null, hasAudio, hasVideo, audioActive, videoActive, typeOfVideo, frameRate, videoDimensions, kurentoFilter, doLoopback); } @Override public boolean unpublishStream(Session session, String streamId, Participant moderator, Integer transactionId, EndReason reason) { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { Participant participant = this.getParticipant(participantPrivateId); if (participant != null) { this.unpublishVideo(participant, moderator, transactionId, reason); return true; } else { return false; } } else { return false; } } @Override public void applyFilter(Session session, String streamId, String filterType, JsonObject filterOptions, Participant moderator, Integer transactionId, String filterReason) { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { Participant publisher = this.getParticipant(participantPrivateId); moderator = (moderator != null && publisher.getParticipantPublicId().equals(moderator.getParticipantPublicId())) ? null : moderator; log.debug("Request [APPLY_FILTER] over stream [{}] for reason [{}]", streamId, filterReason); KurentoParticipant kParticipantPublisher = (KurentoParticipant) publisher; if (!publisher.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to applyFilter to user {} " + "in session {} but user is not streaming media", moderator != null ? moderator.getParticipantPublicId() : publisher.getParticipantPublicId(), publisher.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + publisher.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); } else if (kParticipantPublisher.getPublisher().getFilter() != null) { log.warn( "PARTICIPANT {}: Requesting to applyFilter to user {} " + "in session {} but user already has a filter", moderator != null ? moderator.getParticipantPublicId() : publisher.getParticipantPublicId(), publisher.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.EXISTING_FILTER_ALREADY_APPLIED_ERROR_CODE, "User '" + publisher.getParticipantPublicId() + " already has a filter applied in session '" + session.getSessionId() + "'"); } else { try { KurentoFilter filter = new KurentoFilter(filterType, filterOptions); this.applyFilterInPublisher(kParticipantPublisher, filter); Set participants = kParticipantPublisher.getSession().getParticipants(); sessionEventsHandler.onFilterChanged(publisher, moderator, transactionId, participants, streamId, filter, null, filterReason); } catch (OpenViduException e) { log.warn("PARTICIPANT {}: Error applying filter", publisher.getParticipantPublicId(), e); sessionEventsHandler.onFilterChanged(publisher, moderator, transactionId, new HashSet<>(), streamId, null, e, ""); } } log.info("State of filter for participant {}: {}", publisher.getParticipantPublicId(), ((KurentoParticipant) publisher).getPublisher().filterCollectionsToString()); } else { log.warn("PARTICIPANT {}: Requesting to applyFilter to stream {} " + "in session {} but the owner cannot be found", streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Owner of stream '" + streamId + "' not found in session '" + session.getSessionId() + "'"); } } @Override public void removeFilter(Session session, String streamId, Participant moderator, Integer transactionId, String filterReason) { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { Participant participant = this.getParticipant(participantPrivateId); log.debug("Request [REMOVE_FILTER] over stream [{}] for reason [{}]", streamId, filterReason); KurentoParticipant kParticipant = (KurentoParticipant) participant; if (!participant.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to removeFilter to user {} " + "in session {} but user is not streaming media", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); } else if (kParticipant.getPublisher().getFilter() == null) { log.warn( "PARTICIPANT {}: Requesting to removeFilter to user {} " + "in session {} but user does NOT have a filter", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" + session.getSessionId() + "'"); } else { this.removeFilterInPublisher(kParticipant); Set participants = kParticipant.getSession().getParticipants(); sessionEventsHandler.onFilterChanged(participant, moderator, transactionId, participants, streamId, null, null, filterReason); } log.info("State of filter for participant {}: {}", kParticipant.getParticipantPublicId(), kParticipant.getPublisher().filterCollectionsToString()); } else { log.warn("PARTICIPANT {}: Requesting to removeFilter to stream {} " + "in session {} but the owner cannot be found", streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Owner of stream '" + streamId + "' not found in session '" + session.getSessionId() + "'"); } } @Override public void execFilterMethod(Session session, String streamId, String filterMethod, JsonObject filterParams, Participant moderator, Integer transactionId, String filterReason) { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { Participant participant = this.getParticipant(participantPrivateId); log.debug("Request [EXEC_FILTER_MTEHOD] over stream [{}] for reason [{}]", streamId, filterReason); KurentoParticipant kParticipant = (KurentoParticipant) participant; if (!participant.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to execFilterMethod to user {} " + "in session {} but user is not streaming media", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + participant.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); } else if (kParticipant.getPublisher().getFilter() == null) { log.warn( "PARTICIPANT {}: Requesting to execFilterMethod to user {} " + "in session {} but user does NOT have a filter", moderator != null ? moderator.getParticipantPublicId() : participant.getParticipantPublicId(), participant.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "User '" + participant.getParticipantPublicId() + " has no filter applied in session '" + session.getSessionId() + "'"); } else { KurentoFilter updatedFilter = this.execFilterMethodInPublisher(kParticipant, filterMethod, filterParams); Set participants = kParticipant.getSession().getParticipants(); sessionEventsHandler.onFilterChanged(participant, moderator, transactionId, participants, streamId, updatedFilter, null, filterReason); } } else { log.warn("PARTICIPANT {}: Requesting to removeFilter to stream {} " + "in session {} but the owner cannot be found", streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Owner of stream '" + streamId + "' not found in session '" + session.getSessionId() + "'"); } } @Override public void addFilterEventListener(Session session, Participant userSubscribing, String streamId, String eventType) throws OpenViduException { String publisherPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (publisherPrivateId != null) { log.debug("Request [ADD_FILTER_LISTENER] over stream [{}]", streamId); KurentoParticipant kParticipantPublishing = (KurentoParticipant) this.getParticipant(publisherPrivateId); KurentoParticipant kParticipantSubscribing = (KurentoParticipant) userSubscribing; if (!kParticipantPublishing.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to addFilterEventListener to stream {} " + "in session {} but the publisher is not streaming media", userSubscribing.getParticipantPublicId(), streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + kParticipantPublishing.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); } else if (kParticipantPublishing.getPublisher().getFilter() == null) { log.warn( "PARTICIPANT {}: Requesting to addFilterEventListener to user {} " + "in session {} but user does NOT have a filter", kParticipantSubscribing.getParticipantPublicId(), kParticipantPublishing.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "User '" + kParticipantPublishing.getParticipantPublicId() + " has no filter applied in session '" + session.getSessionId() + "'"); } else { try { this.addFilterEventListenerInPublisher(kParticipantPublishing, eventType); kParticipantPublishing.getPublisher().addParticipantAsListenerOfFilterEvent(eventType, userSubscribing.getParticipantPublicId()); } catch (OpenViduException e) { throw e; } } log.info("State of filter for participant {}: {}", kParticipantPublishing.getParticipantPublicId(), kParticipantPublishing.getPublisher().filterCollectionsToString()); } else { throw new OpenViduException(Code.USER_NOT_FOUND_ERROR_CODE, "Not user found for streamId '" + streamId + "' in session '" + session.getSessionId() + "'"); } } @Override public void removeFilterEventListener(Session session, Participant subscriber, String streamId, String eventType) throws OpenViduException { String participantPrivateId = ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); if (participantPrivateId != null) { log.debug("Request [REMOVE_FILTER_LISTENER] over stream [{}]", streamId); Participant participantPublishing = this.getParticipant(participantPrivateId); KurentoParticipant kParticipantPublishing = (KurentoParticipant) participantPublishing; if (!participantPublishing.isStreaming()) { log.warn( "PARTICIPANT {}: Requesting to removeFilterEventListener to stream {} " + "in session {} but user is not streaming media", subscriber.getParticipantPublicId(), streamId, session.getSessionId()); throw new OpenViduException(Code.USER_NOT_STREAMING_ERROR_CODE, "User '" + participantPublishing.getParticipantPublicId() + " not streaming media in session '" + session.getSessionId() + "'"); } else if (kParticipantPublishing.getPublisher().getFilter() == null) { log.warn( "PARTICIPANT {}: Requesting to removeFilterEventListener to user {} " + "in session {} but user does NOT have a filter", subscriber.getParticipantPublicId(), participantPublishing.getParticipantPublicId(), session.getSessionId()); throw new OpenViduException(Code.FILTER_NOT_APPLIED_ERROR_CODE, "User '" + participantPublishing.getParticipantPublicId() + " has no filter applied in session '" + session.getSessionId() + "'"); } else { try { PublisherEndpoint pub = kParticipantPublishing.getPublisher(); if (pub.removeParticipantAsListenerOfFilterEvent(eventType, subscriber.getParticipantPublicId())) { // If there are no more participants listening to the event remove the event // from the GenericMediaElement this.removeFilterEventListenerInPublisher(kParticipantPublishing, eventType); } } catch (OpenViduException e) { throw e; } } log.info("State of filter for participant {}: {}", kParticipantPublishing.getParticipantPublicId(), kParticipantPublishing.getPublisher().filterCollectionsToString()); } } @Override public String getParticipantPrivateIdFromStreamId(String sessionId, String streamId) { Session session = this.getSession(sessionId); return ((KurentoSession) session).getParticipantPrivateIdFromStreamId(streamId); } + public KmsManager getKmsManager() { + return this.kmsManager; + } + private void applyFilterInPublisher(KurentoParticipant kParticipant, KurentoFilter filter) throws OpenViduException { GenericMediaElement.Builder builder = new GenericMediaElement.Builder(kParticipant.getPipeline(), filter.getType()); Props props = new JsonUtils().fromJsonObjectToProps(filter.getOptions()); props.forEach(prop -> { builder.withConstructorParam(prop.getName(), prop.getValue()); }); kParticipant.getPublisher().apply(builder.build()); kParticipant.getPublisher().getMediaOptions().setFilter(filter); } private void removeFilterInPublisher(KurentoParticipant kParticipant) { kParticipant.getPublisher().cleanAllFilterListeners(); kParticipant.getPublisher().revert(kParticipant.getPublisher().getFilter()); kParticipant.getPublisher().getMediaOptions().setFilter(null); } private KurentoFilter execFilterMethodInPublisher(KurentoParticipant kParticipant, String method, JsonObject params) { kParticipant.getPublisher().execMethod(method, params); KurentoFilter filter = kParticipant.getPublisher().getMediaOptions().getFilter(); KurentoFilter updatedFilter = new KurentoFilter(filter.getType(), filter.getOptions(), method, params); kParticipant.getPublisher().getMediaOptions().setFilter(updatedFilter); return updatedFilter; } private void addFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType) throws OpenViduException { PublisherEndpoint pub = kParticipant.getPublisher(); if (!pub.isListenerAddedToFilterEvent(eventType)) { final String connectionId = kParticipant.getParticipantPublicId(); final String streamId = kParticipant.getPublisherStreamId(); final String filterType = kParticipant.getPublisherMediaOptions().getFilter().getType(); try { ListenerSubscription listener = pub.getFilter().addEventListener(eventType, event -> { sessionEventsHandler.onFilterEventDispatched(connectionId, streamId, filterType, event.getType(), event.getData(), kParticipant.getSession().getParticipants(), kParticipant.getPublisher().getPartipantsListentingToFilterEvent(eventType)); }); pub.storeListener(eventType, listener); } catch (Exception e) { log.error("Request to addFilterEventListener to stream {} gone wrong. Error: {}", streamId, e.getMessage()); throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND, "Request to addFilterEventListener to stream " + streamId + " gone wrong: " + e.getMessage()); } } } private void removeFilterEventListenerInPublisher(KurentoParticipant kParticipant, String eventType) { PublisherEndpoint pub = kParticipant.getPublisher(); if (pub.isListenerAddedToFilterEvent(eventType)) { GenericMediaElement filter = kParticipant.getPublisher().getFilter(); filter.removeEventListener(pub.removeListener(eventType)); } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoFilter.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/KurentoFilter.java similarity index 97% rename from openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoFilter.java rename to openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/KurentoFilter.java index 6370bcc5..58623bd0 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoFilter.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/KurentoFilter.java @@ -1,84 +1,84 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ -package io.openvidu.server.kurento; +package io.openvidu.server.kurento.endpoint; import com.google.gson.JsonElement; import com.google.gson.JsonObject; public class KurentoFilter { public class KurentoFilterMethod { String method; JsonObject params; protected KurentoFilterMethod(String method, JsonObject params) { this.method = method; this.params = params; } public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("method", lastExecMethod.method); json.add("params", lastExecMethod.params); return json; } } String type; JsonObject options; KurentoFilterMethod lastExecMethod; public KurentoFilter(String type, JsonObject options) { this.type = type; this.options = options; } public KurentoFilter(String type, JsonObject options, String method, JsonObject params) { this.type = type; this.options = options; this.lastExecMethod = new KurentoFilterMethod(method, params); } public KurentoFilter(JsonElement json) { JsonObject jsonObject = json.getAsJsonObject(); this.type = jsonObject.get("type").getAsString(); this.options = jsonObject.get("options").getAsJsonObject(); } public String getType() { return type; } public JsonObject getOptions() { return options; } public KurentoFilterMethod getLastExecMethod() { return this.lastExecMethod; } public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("type", type); json.add("options", options); json.add("lastExecMethod", this.lastExecMethod != null ? this.lastExecMethod.toJson() : new JsonObject()); return json; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java index fd25534e..855e5898 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/MediaEndpoint.java @@ -1,516 +1,515 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.endpoint; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.IceCandidate; import org.kurento.client.ListenerSubscription; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.OnIceCandidateEvent; import org.kurento.client.RtpEndpoint; import org.kurento.client.SdpEndpoint; import org.kurento.client.WebRtcEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.Participant; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoTokenOptions; /** * {@link WebRtcEndpoint} wrapper that supports buffering of * {@link IceCandidate}s until the {@link WebRtcEndpoint} is created. * Connections to other peers are opened using the corresponding method of the * internal endpoint. * * @author Pablo Fuente (pablofuenteperez@gmail.com) */ public abstract class MediaEndpoint { private static Logger log; private OpenviduConfig openviduConfig; private boolean web = false; private WebRtcEndpoint webEndpoint = null; private RtpEndpoint endpoint = null; private final int maxRecvKbps; private final int minRecvKbps; private final int maxSendKbps; private final int minSendKbps; private KurentoParticipant owner; protected String endpointName; // KMS media object identifier. Unique for every MediaEndpoint protected String streamId; // OpenVidu Stream identifier. Common property for a // PublisherEndpoint->SubscriberEndpoint flow. Equal to endpointName for // PublisherEndpoints, different for SubscriberEndpoints protected Long createdAt; // Timestamp when this [publisher / subscriber] started [publishing / receiving] private MediaPipeline pipeline = null; private ListenerSubscription endpointSubscription = null; private final List receivedCandidateList = new LinkedList(); private LinkedList candidates = new LinkedList(); public String selectedLocalIceCandidate; public String selectedRemoteIceCandidate; public Queue kmsEvents = new ConcurrentLinkedQueue<>(); public Future kmsWebrtcStatsThread; /** * Constructor to set the owner, the endpoint's name and the media pipeline. * * @param web * @param owner * @param endpointName * @param pipeline * @param log */ public MediaEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, OpenviduConfig openviduConfig, Logger log) { if (log == null) { MediaEndpoint.log = LoggerFactory.getLogger(MediaEndpoint.class); } else { MediaEndpoint.log = log; } this.web = web; this.owner = owner; this.setEndpointName(endpointName); this.setMediaPipeline(pipeline); this.openviduConfig = openviduConfig; KurentoTokenOptions kurentoTokenOptions = this.owner.getToken().getKurentoTokenOptions(); if (kurentoTokenOptions != null) { this.maxRecvKbps = kurentoTokenOptions.getVideoMaxRecvBandwidth() != null ? kurentoTokenOptions.getVideoMaxRecvBandwidth() : this.openviduConfig.getVideoMaxRecvBandwidth(); this.minRecvKbps = kurentoTokenOptions.getVideoMinRecvBandwidth() != null ? kurentoTokenOptions.getVideoMinRecvBandwidth() : this.openviduConfig.getVideoMinRecvBandwidth(); this.maxSendKbps = kurentoTokenOptions.getVideoMaxSendBandwidth() != null ? kurentoTokenOptions.getVideoMaxSendBandwidth() : this.openviduConfig.getVideoMaxSendBandwidth(); this.minSendKbps = kurentoTokenOptions.getVideoMinSendBandwidth() != null ? kurentoTokenOptions.getVideoMinSendBandwidth() : this.openviduConfig.getVideoMinSendBandwidth(); } else { this.maxRecvKbps = this.openviduConfig.getVideoMaxRecvBandwidth(); this.minRecvKbps = this.openviduConfig.getVideoMinRecvBandwidth(); this.maxSendKbps = this.openviduConfig.getVideoMaxSendBandwidth(); this.minSendKbps = this.openviduConfig.getVideoMinSendBandwidth(); } } public boolean isWeb() { return web; } /** * @return the user session that created this endpoint */ public Participant getOwner() { return owner; } /** * @return the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) */ public SdpEndpoint getEndpoint() { if (this.isWeb()) { return this.webEndpoint; } else { return this.endpoint; } } public long createdAt() { return this.createdAt; } public WebRtcEndpoint getWebEndpoint() { return webEndpoint; } protected RtpEndpoint getRtpEndpoint() { return endpoint; } /** * If this object doesn't have a {@link WebRtcEndpoint}, it is created in a * thread-safe way using the internal {@link MediaPipeline}. Otherwise no * actions are taken. It also registers an error listener for the endpoint and * for any additional media elements. * * @param endpointLatch latch whose countdown is performed when the asynchronous * call to build the {@link WebRtcEndpoint} returns * * @return the existing endpoint, if any */ public synchronized SdpEndpoint createEndpoint(CountDownLatch endpointLatch) { SdpEndpoint old = this.getEndpoint(); if (old == null) { internalEndpointInitialization(endpointLatch); } else { endpointLatch.countDown(); } if (this.isWeb()) { while (!candidates.isEmpty()) { internalAddIceCandidate(candidates.removeFirst()); } } return old; } /** * @return the pipeline */ public MediaPipeline getPipeline() { return this.pipeline; } /** * Sets the {@link MediaPipeline} used to create the internal * {@link WebRtcEndpoint}. * * @param pipeline the {@link MediaPipeline} */ public void setMediaPipeline(MediaPipeline pipeline) { this.pipeline = pipeline; } public String getEndpointName() { if (endpointName == null) { endpointName = this.getEndpoint().getName(); } return endpointName; } public void setEndpointName(String endpointName) { this.endpointName = endpointName; } public String getStreamId() { return streamId; } public void setStreamId(String streamId) { this.streamId = streamId; } /** * Unregisters all error listeners created for media elements owned by this * instance. */ public synchronized void unregisterErrorListeners() { unregisterElementErrListener(endpoint, endpointSubscription); } /** * Creates the endpoint (RTP or WebRTC) and any other additional elements (if * needed). * * @param endpointLatch */ protected void internalEndpointInitialization(final CountDownLatch endpointLatch) { if (this.isWeb()) { WebRtcEndpoint.Builder builder = new WebRtcEndpoint.Builder(pipeline); /* * if (this.dataChannels) { builder.useDataChannels(); } */ builder.buildAsync(new Continuation() { @Override public void onSuccess(WebRtcEndpoint result) throws Exception { webEndpoint = result; webEndpoint.setMaxVideoRecvBandwidth(maxRecvKbps); webEndpoint.setMinVideoRecvBandwidth(minRecvKbps); webEndpoint.setMaxVideoSendBandwidth(maxSendKbps); webEndpoint.setMinVideoSendBandwidth(minSendKbps); endpointLatch.countDown(); log.trace("EP {}: Created a new WebRtcEndpoint", endpointName); endpointSubscription = registerElemErrListener(webEndpoint); } @Override public void onError(Throwable cause) throws Exception { endpointLatch.countDown(); log.error("EP {}: Failed to create a new WebRtcEndpoint", endpointName, cause); } }); } else { new RtpEndpoint.Builder(pipeline).buildAsync(new Continuation() { @Override public void onSuccess(RtpEndpoint result) throws Exception { endpoint = result; endpointLatch.countDown(); log.trace("EP {}: Created a new RtpEndpoint", endpointName); endpointSubscription = registerElemErrListener(endpoint); } @Override public void onError(Throwable cause) throws Exception { endpointLatch.countDown(); log.error("EP {}: Failed to create a new RtpEndpoint", endpointName, cause); } }); } } /** * Add a new {@link IceCandidate} received gathered by the remote peer of this * {@link WebRtcEndpoint}. * * @param candidate the remote candidate */ public synchronized void addIceCandidate(IceCandidate candidate) throws OpenViduException { if (!this.isWeb()) { throw new OpenViduException(Code.MEDIA_NOT_A_WEB_ENDPOINT_ERROR_CODE, "Operation not supported"); } if (webEndpoint == null) { candidates.addLast(candidate); } else { internalAddIceCandidate(candidate); } } /** * Registers a listener for when the {@link MediaElement} triggers an * {@link ErrorEvent}. Notifies the owner with the error. * * @param element the {@link MediaElement} * @return {@link ListenerSubscription} that can be used to deregister the * listener */ protected ListenerSubscription registerElemErrListener(MediaElement element) { return element.addErrorListener(new EventListener() { @Override public void onEvent(ErrorEvent event) { owner.sendMediaError(event); } }); } /** * Unregisters the error listener from the media element using the provided * subscription. * * @param element the {@link MediaElement} * @param subscription the associated {@link ListenerSubscription} */ protected void unregisterElementErrListener(MediaElement element, final ListenerSubscription subscription) { if (element == null || subscription == null) { return; } element.removeErrorListener(subscription); } /** * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * to process the offer String. * * @see SdpEndpoint#processOffer(String) * @param offer String with the Sdp offer * @return the Sdp answer */ protected String processOffer(String offer) throws OpenViduException { if (this.isWeb()) { if (webEndpoint == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't process offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); } return webEndpoint.processOffer(offer); } else { if (endpoint == null) { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, "Can't process offer when RtpEndpoint is null (ep: " + endpointName + ")"); } return endpoint.processOffer(offer); } } /** * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * to generate the offer String that can be used to initiate a connection. * * @see SdpEndpoint#generateOffer() * @return the Sdp offer */ protected String generateOffer() throws OpenViduException { if (this.isWeb()) { if (webEndpoint == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't generate offer when WebRtcEndpoint is null (ep: " + endpointName + ")"); } return webEndpoint.generateOffer(); } else { if (endpoint == null) { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, "Can't generate offer when RtpEndpoint is null (ep: " + endpointName + ")"); } return endpoint.generateOffer(); } } /** * Orders the internal endpoint ({@link RtpEndpoint} or {@link WebRtcEndpoint}) * to process the answer String. * * @see SdpEndpoint#processAnswer(String) * @param answer String with the Sdp answer from remote * @return the updated Sdp offer, based on the received answer */ protected String processAnswer(String answer) throws OpenViduException { if (this.isWeb()) { if (webEndpoint == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't process answer when WebRtcEndpoint is null (ep: " + endpointName + ")"); } return webEndpoint.processAnswer(answer); } else { if (endpoint == null) { throw new OpenViduException(Code.MEDIA_RTP_ENDPOINT_ERROR_CODE, "Can't process answer when RtpEndpoint is null (ep: " + endpointName + ")"); } return endpoint.processAnswer(answer); } } /** * If supported, it registers a listener for when a new {@link IceCandidate} is * gathered by the internal endpoint ({@link WebRtcEndpoint}) and sends it to * the remote User Agent as a notification using the messaging capabilities of * the {@link Participant}. * * @see WebRtcEndpoint#addOnIceCandidateListener(org.kurento.client.EventListener) * @see Participant#sendIceCandidate(String, IceCandidate) * @throws OpenViduException if thrown, unable to register the listener */ protected void registerOnIceCandidateEventListener(String senderPublicId) throws OpenViduException { if (!this.isWeb()) { return; } if (webEndpoint == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't register event listener for null WebRtcEndpoint (ep: " + endpointName + ")"); } webEndpoint.addOnIceCandidateListener(new EventListener() { @Override public void onEvent(OnIceCandidateEvent event) { owner.sendIceCandidate(senderPublicId, endpointName, event.getCandidate()); } }); } /** * If supported, it instructs the internal endpoint to start gathering * {@link IceCandidate}s. */ protected void gatherCandidates() throws OpenViduException { if (!this.isWeb()) { return; } if (webEndpoint == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't start gathering ICE candidates on null WebRtcEndpoint (ep: " + endpointName + ")"); } webEndpoint.gatherCandidates(new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.trace("EP {}: Internal endpoint started to gather candidates", endpointName); } @Override public void onError(Throwable cause) throws Exception { log.warn("EP {}: Internal endpoint failed to start gathering candidates", endpointName, cause); } }); } private void internalAddIceCandidate(IceCandidate candidate) throws OpenViduException { if (webEndpoint == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Can't add existing ICE candidates to null WebRtcEndpoint (ep: " + endpointName + ")"); } this.receivedCandidateList.add(candidate); this.webEndpoint.addIceCandidate(candidate, new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.trace("Ice candidate added to the internal endpoint"); } @Override public void onError(Throwable cause) throws Exception { log.warn("EP {}: Failed to add ice candidate to the internal endpoint", endpointName, cause); } }); } public abstract PublisherEndpoint getPublisher(); public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("createdAt", this.createdAt); return json; } public JsonObject withStatsToJson() { JsonObject json = new JsonObject(); json.addProperty("createdAt", this.createdAt); json.addProperty("webrtcEndpointName", this.getEndpointName()); json.addProperty("remoteSdp", this.getEndpoint().getRemoteSessionDescriptor()); json.addProperty("localSdp", this.getEndpoint().getLocalSessionDescriptor()); json.add("receivedCandidates", new GsonBuilder().create().toJsonTree(this.receivedCandidateList)); json.addProperty("localCandidate", this.selectedLocalIceCandidate); json.addProperty("remoteCandidate", this.selectedRemoteIceCandidate); JsonArray jsonArray = new JsonArray(); this.kmsEvents.forEach(ev -> { // Remove unwanted properties JsonObject j = ev.toJson(); j.remove("session"); j.remove("user"); j.remove("connection"); j.remove("endpoint"); j.remove("timestampMillis"); jsonArray.add(j); }); json.add("events", jsonArray); return json; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java index 9ea7d0c9..b1c8f7c2 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/PublisherEndpoint.java @@ -1,577 +1,576 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.endpoint; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.kurento.client.Continuation; import org.kurento.client.GenericMediaElement; import org.kurento.client.ListenerSubscription; import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaType; import org.kurento.client.PassThrough; import org.kurento.client.WebRtcEndpoint; import org.kurento.jsonrpc.Props; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.MediaOptions; -import io.openvidu.server.kurento.TrackType; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.utils.JsonUtils; /** * Publisher aspect of the {@link MediaEndpoint}. * * @author Radu Tom Vlad */ public class PublisherEndpoint extends MediaEndpoint { private final static Logger log = LoggerFactory.getLogger(PublisherEndpoint.class); protected MediaOptions mediaOptions; private PassThrough passThru = null; private ListenerSubscription passThruSubscription = null; private GenericMediaElement filter; private Map> subscribersToFilterEvents = new ConcurrentHashMap<>(); private Map filterListeners = new ConcurrentHashMap<>(); private Map elements = new HashMap(); private LinkedList elementIds = new LinkedList(); private boolean connected = false; private Map elementsErrorSubscriptions = new HashMap(); public PublisherEndpoint(boolean web, KurentoParticipant owner, String endpointName, MediaPipeline pipeline, OpenviduConfig openviduConfig) { super(web, owner, endpointName, pipeline, openviduConfig, log); } @Override protected void internalEndpointInitialization(final CountDownLatch endpointLatch) { super.internalEndpointInitialization(endpointLatch); passThru = new PassThrough.Builder(getPipeline()).build(); passThruSubscription = registerElemErrListener(passThru); } @Override public synchronized void unregisterErrorListeners() { super.unregisterErrorListeners(); unregisterElementErrListener(passThru, passThruSubscription); for (String elemId : elementIds) { unregisterElementErrListener(elements.get(elemId), elementsErrorSubscriptions.remove(elemId)); } } /** * @return all media elements created for this publisher, except for the main * element ( {@link WebRtcEndpoint}) */ public synchronized Collection getMediaElements() { if (passThru != null) { elements.put(passThru.getId(), passThru); } return elements.values(); } public GenericMediaElement getFilter() { return this.filter; } public boolean isListenerAddedToFilterEvent(String eventType) { return (this.subscribersToFilterEvents.containsKey(eventType) && this.filterListeners.containsKey(eventType)); } public Set getPartipantsListentingToFilterEvent(String eventType) { return this.subscribersToFilterEvents.get(eventType); } public boolean storeListener(String eventType, ListenerSubscription listener) { return (this.filterListeners.putIfAbsent(eventType, listener) == null); } public ListenerSubscription removeListener(String eventType) { // Clean all participant subscriptions to this event this.subscribersToFilterEvents.remove(eventType); // Clean ListenerSubscription object for this event return this.filterListeners.remove(eventType); } public void addParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) { this.subscribersToFilterEvents.putIfAbsent(eventType, new HashSet<>()); this.subscribersToFilterEvents.get(eventType).add(participantPublicId); } public boolean removeParticipantAsListenerOfFilterEvent(String eventType, String participantPublicId) { if (!this.subscribersToFilterEvents.containsKey(eventType)) { String streamId = this.getStreamId(); log.error("Request to removeFilterEventListener to stream {} gone wrong: Filter {} has no listener added", streamId, eventType); throw new OpenViduException(Code.FILTER_EVENT_LISTENER_NOT_FOUND, "Request to removeFilterEventListener to stream " + streamId + " gone wrong: Filter " + eventType + " has no listener added"); } this.subscribersToFilterEvents.computeIfPresent(eventType, (type, subs) -> { subs.remove(participantPublicId); return subs; }); return this.subscribersToFilterEvents.get(eventType).isEmpty(); } public void cleanAllFilterListeners() { for (String eventType : this.subscribersToFilterEvents.keySet()) { this.removeListener(eventType); } } /** * Initializes this media endpoint for publishing media and processes the SDP * offer or answer. If the internal endpoint is an {@link WebRtcEndpoint}, it * first registers an event listener for the ICE candidates and instructs the * endpoint to start gathering the candidates. If required, it connects to * itself (after applying the intermediate media elements and the * {@link PassThrough}) to allow loopback of the media stream. * * @param sdpType indicates the type of the sdpString (offer or * answer) * @param sdpString offer or answer from the remote peer * @param doLoopback loopback flag * @param loopbackAlternativeSrc alternative loopback source * @param loopbackConnectionType how to connect the loopback source * @return the SDP response (the answer if processing an offer SDP, otherwise is * the updated offer generated previously by this endpoint) */ public synchronized String publish(SdpType sdpType, String sdpString, boolean doLoopback, MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { registerOnIceCandidateEventListener(this.getOwner().getParticipantPublicId()); if (doLoopback) { if (loopbackAlternativeSrc == null) { connect(this.getEndpoint(), loopbackConnectionType); } else { connectAltLoopbackSrc(loopbackAlternativeSrc, loopbackConnectionType); } } else { innerConnect(); } String sdpResponse = null; switch (sdpType) { case ANSWER: sdpResponse = processAnswer(sdpString); break; case OFFER: sdpResponse = processOffer(sdpString); break; default: throw new OpenViduException(Code.MEDIA_SDP_ERROR_CODE, "Sdp type not supported: " + sdpType); } gatherCandidates(); this.createdAt = System.currentTimeMillis(); return sdpResponse; } public synchronized String preparePublishConnection() { return generateOffer(); } public synchronized void connect(MediaElement sink) { if (!connected) { innerConnect(); } internalSinkConnect(passThru, sink); } public synchronized void connect(MediaElement sink, MediaType type) { if (!connected) { innerConnect(); } internalSinkConnect(passThru, sink, type); } public synchronized void disconnectFrom(MediaElement sink) { internalSinkDisconnect(passThru, sink); } public synchronized void disconnectFrom(MediaElement sink, MediaType type) { internalSinkDisconnect(passThru, sink, type); } /** * Changes the media passing through a chain of media elements by applying the * specified element/shaper. The element is plugged into the stream only if the * chain has been initialized (a.k.a. media streaming has started), otherwise it * is left ready for when the connections between elements will materialize and * the streaming begins. * * @param shaper {@link MediaElement} that will be linked to the end of the * chain (e.g. a filter) * @return the element's id * @throws OpenViduException if thrown, the media element was not added */ public String apply(GenericMediaElement shaper) throws OpenViduException { return apply(shaper, null); } /** * Same as {@link #apply(MediaElement)}, can specify the media type that will be * streamed through the shaper element. * * @param shaper {@link MediaElement} that will be linked to the end of the * chain (e.g. a filter) * @param type indicates which type of media will be connected to the shaper * ({@link MediaType}), if null then the connection is mixed * @return the element's id * @throws OpenViduException if thrown, the media element was not added */ public synchronized String apply(GenericMediaElement shaper, MediaType type) throws OpenViduException { String id = shaper.getId(); if (id == null) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Unable to connect media element with null id"); } if (elements.containsKey(id)) { throw new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "This endpoint already has a media element with id " + id); } MediaElement first = null; if (!elementIds.isEmpty()) { first = elements.get(elementIds.getFirst()); } if (connected) { if (first != null) { internalSinkConnect(first, shaper, type); } else { internalSinkConnect(this.getEndpoint(), shaper, type); } internalSinkConnect(shaper, passThru, type); } elementIds.addFirst(id); elements.put(id, shaper); this.filter = shaper; elementsErrorSubscriptions.put(id, registerElemErrListener(shaper)); return id; } /** * Removes the media element object found from the media chain structure. The * object is released. If the chain is connected, both adjacent remaining * elements will be interconnected. * * @param shaper {@link MediaElement} that will be removed from the chain * @throws OpenViduException if thrown, the media element was not removed */ public synchronized void revert(MediaElement shaper) throws OpenViduException { revert(shaper, true); } public synchronized void revert(MediaElement shaper, boolean releaseElement) throws OpenViduException { final String elementId = shaper.getId(); if (!elements.containsKey(elementId)) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "This endpoint (" + getEndpointName() + ") has no media element with id " + elementId); } MediaElement element = elements.remove(elementId); unregisterElementErrListener(element, elementsErrorSubscriptions.remove(elementId)); // careful, the order in the elems list is reverted if (connected) { String nextId = getNext(elementId); String prevId = getPrevious(elementId); // next connects to prev MediaElement prev = null; MediaElement next = null; if (nextId != null) { next = elements.get(nextId); } else { next = this.getEndpoint(); } if (prevId != null) { prev = elements.get(prevId); } else { prev = passThru; } internalSinkConnect(next, prev); } elementIds.remove(elementId); if (releaseElement) { element.release(new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.trace("EP {}: Released media element {}", getEndpointName(), elementId); } @Override public void onError(Throwable cause) throws Exception { log.error("EP {}: Failed to release media element {}", getEndpointName(), elementId, cause); } }); } this.filter = null; } public JsonElement execMethod(String method, JsonObject params) throws OpenViduException { Props props = new JsonUtils().fromJsonObjectToProps(params); return (JsonElement) ((GenericMediaElement) this.filter).invoke(method, props); } public synchronized void mute(TrackType muteType) { MediaElement sink = passThru; if (!elements.isEmpty()) { String sinkId = elementIds.peekLast(); if (!elements.containsKey(sinkId)) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "This endpoint (" + getEndpointName() + ") has no media element with id " + sinkId + " (should've been connected to the internal ep)"); } sink = elements.get(sinkId); } else { log.debug("Will mute connection of WebRTC and PassThrough (no other elems)"); } switch (muteType) { case ALL: internalSinkDisconnect(this.getEndpoint(), sink); break; case AUDIO: internalSinkDisconnect(this.getEndpoint(), sink, MediaType.AUDIO); break; case VIDEO: internalSinkDisconnect(this.getEndpoint(), sink, MediaType.VIDEO); break; } } public synchronized void unmute(TrackType muteType) { MediaElement sink = passThru; if (!elements.isEmpty()) { String sinkId = elementIds.peekLast(); if (!elements.containsKey(sinkId)) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "This endpoint (" + getEndpointName() + ") has no media element with id " + sinkId + " (should've been connected to the internal ep)"); } sink = elements.get(sinkId); } else { log.debug("Will unmute connection of WebRTC and PassThrough (no other elems)"); } switch (muteType) { case ALL: internalSinkConnect(this.getEndpoint(), sink); break; case AUDIO: internalSinkConnect(this.getEndpoint(), sink, MediaType.AUDIO); break; case VIDEO: internalSinkConnect(this.getEndpoint(), sink, MediaType.VIDEO); break; } } private String getNext(String uid) { int idx = elementIds.indexOf(uid); if (idx < 0 || idx + 1 == elementIds.size()) { return null; } return elementIds.get(idx + 1); } private String getPrevious(String uid) { int idx = elementIds.indexOf(uid); if (idx <= 0) { return null; } return elementIds.get(idx - 1); } private void connectAltLoopbackSrc(MediaElement loopbackAlternativeSrc, MediaType loopbackConnectionType) { if (!connected) { innerConnect(); } internalSinkConnect(loopbackAlternativeSrc, this.getEndpoint(), loopbackConnectionType); } private void innerConnect() { if (this.getEndpoint() == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "Can't connect null endpoint (ep: " + getEndpointName() + ")"); } MediaElement current = this.getEndpoint(); String prevId = elementIds.peekLast(); while (prevId != null) { MediaElement prev = elements.get(prevId); if (prev == null) { throw new OpenViduException(Code.MEDIA_ENDPOINT_ERROR_CODE, "No media element with id " + prevId + " (ep: " + getEndpointName() + ")"); } internalSinkConnect(current, prev); current = prev; prevId = getPrevious(prevId); } internalSinkConnect(current, passThru); connected = true; } private void internalSinkConnect(final MediaElement source, final MediaElement sink) { source.connect(sink, new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.debug("EP {}: Elements have been connected (source {} -> sink {})", getEndpointName(), source.getId(), sink.getId()); } @Override public void onError(Throwable cause) throws Exception { log.warn("EP {}: Failed to connect media elements (source {} -> sink {})", getEndpointName(), source.getId(), sink.getId(), cause); } }); } /** * Same as {@link #internalSinkConnect(MediaElement, MediaElement)}, but can * specify the type of the media that will be streamed. * * @param source * @param sink * @param type if null, * {@link #internalSinkConnect(MediaElement, MediaElement)} will * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ private void internalSinkConnect(final MediaElement source, final MediaElement sink, final MediaType type) { if (type == null) { internalSinkConnect(source, sink); } else { source.connect(sink, type, new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.debug("EP {}: {} media elements have been connected (source {} -> sink {})", getEndpointName(), type, source.getId(), sink.getId()); } @Override public void onError(Throwable cause) throws Exception { log.warn("EP {}: Failed to connect {} media elements (source {} -> sink {})", getEndpointName(), type, source.getId(), sink.getId(), cause); } }); } } private void internalSinkDisconnect(final MediaElement source, final MediaElement sink) { source.disconnect(sink, new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.debug("EP {}: Elements have been disconnected (source {} -> sink {})", getEndpointName(), source.getId(), sink.getId()); } @Override public void onError(Throwable cause) throws Exception { log.warn("EP {}: Failed to disconnect media elements (source {} -> sink {})", getEndpointName(), source.getId(), sink.getId(), cause); } }); } /** * Same as {@link #internalSinkDisconnect(MediaElement, MediaElement)}, but can * specify the type of the media that will be disconnected. * * @param source * @param sink * @param type if null, * {@link #internalSinkConnect(MediaElement, MediaElement)} will * be used instead * @see #internalSinkConnect(MediaElement, MediaElement) */ private void internalSinkDisconnect(final MediaElement source, final MediaElement sink, final MediaType type) { if (type == null) { internalSinkDisconnect(source, sink); } else { source.disconnect(sink, type, new Continuation() { @Override public void onSuccess(Void result) throws Exception { log.debug("EP {}: {} media elements have been disconnected (source {} -> sink {})", getEndpointName(), type, source.getId(), sink.getId()); } @Override public void onError(Throwable cause) throws Exception { log.warn("EP {}: Failed to disconnect {} media elements (source {} -> sink {})", getEndpointName(), type, source.getId(), sink.getId(), cause); } }); } } @Override public PublisherEndpoint getPublisher() { return this; } public MediaOptions getMediaOptions() { return mediaOptions; } public void setMediaOptions(MediaOptions mediaOptions) { this.mediaOptions = mediaOptions; } @Override public JsonObject toJson() { JsonObject json = super.toJson(); json.addProperty("streamId", this.getStreamId()); json.add("mediaOptions", this.mediaOptions.toJson()); return json; } @Override public JsonObject withStatsToJson() { JsonObject json = super.withStatsToJson(); JsonObject toJson = this.toJson(); for (Entry entry : toJson.entrySet()) { json.add(entry.getKey(), entry.getValue()); } return json; } public String filterCollectionsToString() { return "{filter: " + ((this.filter != null) ? this.filter.getName() : "null") + ", listener: " + this.filterListeners.toString() + ", subscribers: " + this.subscribersToFilterEvents.toString() + "}"; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/TrackType.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/TrackType.java similarity index 90% rename from openvidu-server/src/main/java/io/openvidu/server/kurento/TrackType.java rename to openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/TrackType.java index 38be72bd..eec03479 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/TrackType.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/endpoint/TrackType.java @@ -1,22 +1,22 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ -package io.openvidu.server.kurento; +package io.openvidu.server.kurento.endpoint; public enum TrackType { - ALL, VIDEO, AUDIO; + ALL, VIDEO, AUDIO; } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientSessionInfo.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java similarity index 60% rename from openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientSessionInfo.java rename to openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java index d818041c..fcb8e3ea 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/KurentoClientSessionInfo.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java @@ -1,33 +1,32 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ -package io.openvidu.server.kurento; +package io.openvidu.server.kurento.kms; -import org.kurento.client.KurentoClient; +public class DummyLoadManager implements LoadManager { + + @Override + public double calculateLoad(Kms kms) { + return 1; + } + + @Override + public boolean allowMoreElements(Kms kms) { + return true; + } -/** - * Interface for beans holding information required to obtain a {@link KurentoClient}. - * - * @author Radu Tom Vlad - * - */ -public interface KurentoClientSessionInfo { - /** - * @return the room's name (or id) for whom a {@link KurentoClient} will be needed - */ - public String getRoomName(); } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java index a467bca7..fd708aee 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedOneKmsManager.java @@ -1,88 +1,78 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.kurento.client.KurentoClient; import org.kurento.client.KurentoConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import io.openvidu.server.core.SessionManager; import io.openvidu.server.kurento.core.KurentoSession; +import io.openvidu.server.kurento.core.KurentoSessionManager; public class FixedOneKmsManager extends KmsManager { private static final Logger log = LoggerFactory.getLogger(FixedOneKmsManager.class); - @Autowired - SessionManager sessionManager; - - public static final AtomicBoolean CONNECTED_TO_KMS = new AtomicBoolean(false); - public static final AtomicLong TIME_OF_DISCONNECTION = new AtomicLong(0); - public FixedOneKmsManager(String kmsWsUri) { - this(kmsWsUri, 1); - } - - public FixedOneKmsManager(String kmsWsUri, int numKmss) { - for (int i = 0; i < numKmss; i++) { - this.addKms(new Kms(KurentoClient.create(kmsWsUri, new KurentoConnectionListener() { + KurentoClient kClient = KurentoClient.create(kmsWsUri, new KurentoConnectionListener() { - @Override - public void reconnected(boolean isReconnected) { - CONNECTED_TO_KMS.compareAndSet(false, true); - if (!isReconnected) { - // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) - log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); - log.warn("Updating all webrtc endpoints for active sessions"); - sessionManager.getSessions().forEach(s -> { - ((KurentoSession) s).restartStatusInKurento(); - }); - } else { - // Same KMS. We can infer that openvidu-server/KMS connection has been lost, but - // not the clients/KMS connections - log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri); - } + @Override + public void reconnected(boolean isReconnected) { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true); + if (!isReconnected) { + // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) + log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kmsWsUri); + log.warn("Updating all webrtc endpoints for active sessions"); + sessionManager.getSessions().forEach(s -> { + ((KurentoSession) s).restartStatusInKurento(); + }); + } else { + // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but + // not the clients/KMS connections + log.warn("Kurento Client reconnected to same KMS with uri {}", kmsWsUri); } + } - @Override - public void disconnected() { - CONNECTED_TO_KMS.compareAndSet(true, false); - TIME_OF_DISCONNECTION.set(System.currentTimeMillis()); - log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri); - } + @Override + public void disconnected() { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, + false); + ((KurentoSessionManager) sessionManager).getKmsManager().setTimeOfKurentoClientDisconnection(kmsWsUri, + System.currentTimeMillis()); + log.warn("Kurento Client disconnected from KMS with uri {}", kmsWsUri); + } - @Override - public void connectionFailed() { - CONNECTED_TO_KMS.set(false); - log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri); - } + @Override + public void connectionFailed() { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, + false); + log.warn("Kurento Client failed connecting to KMS with uri {}", kmsWsUri); + } - @Override - public void connected() { - CONNECTED_TO_KMS.compareAndSet(false, true); - log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri); - } - }), kmsWsUri)); - } + @Override + public void connected() { + ((KurentoSessionManager) sessionManager).getKmsManager().setKurentoClientConnectedToKms(kmsWsUri, true); + log.warn("Kurento Client is now connected to KMS with uri {}", kmsWsUri); + } + }); + + this.addKms(new Kms(kmsWsUri, kClient, loadManager)); } + } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index 77363477..1c33f282 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -1,52 +1,83 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.kurento.client.KurentoClient; +/** + * Abstraction of a KMS instance: an object of this class corresponds to a KMS + * process running somewhere. + * + * It is uniquely identified by the KMS ws URI endpoint. It encapsulates the + * WebSocket client to communicate openvidu-server Java process with it and has + * a specific LoadManager service to calculate the load of this KMS based on + * different measures + * + * @author Pablo Fuente (pablofuenteperez@gmail.com) + */ public class Kms { - private LoadManager loadManager = new MaxWebRtcLoadManager(10000); - private KurentoClient client; - private String kmsUri; + private String kmsUri; + private KurentoClient client; + private LoadManager loadManager; + + private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); + private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); + + public Kms(String kmsUri, KurentoClient client, LoadManager loadManager) { + this.kmsUri = kmsUri; + this.client = client; + this.loadManager = loadManager; + } + + public String getUri() { + return kmsUri; + } + + public KurentoClient getKurentoClient() { + return this.client; + } + + public double getLoad() { + return loadManager.calculateLoad(this); + } - public Kms(KurentoClient client, String kmsUri) { - this.client = client; - this.kmsUri = kmsUri; - } + public boolean allowMoreElements() { + return loadManager.allowMoreElements(this); + } - public void setLoadManager(LoadManager loadManager) { - this.loadManager = loadManager; - } + public boolean isKurentoClientConnected() { + return this.isKurentoClientConnected.get(); + } - public double getLoad() { - return loadManager.calculateLoad(this); - } + public void setKurentoClientConnected(boolean isConnected) { + this.isKurentoClientConnected.set(isConnected); + } - public boolean allowMoreElements() { - return loadManager.allowMoreElements(this); - } + public long getTimeOfKurentoClientDisconnection() { + return this.timeOfKurentoClientDisconnection.get(); + } - public String getUri() { - return kmsUri; - } + public void setTimeOfKurentoClientDisconnection(long time) { + this.timeOfKurentoClientDisconnection.set(time); + } - public KurentoClient getKurentoClient() { - return this.client; - } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index f2a79dbf..b7407074 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -1,125 +1,149 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import org.kurento.client.KurentoClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import io.openvidu.client.OpenViduException; -import io.openvidu.client.OpenViduException.Code; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; -import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; - -public abstract class KmsManager implements KurentoClientProvider { - - public static class KmsLoad implements Comparable { - - private Kms kms; - private double load; - - public KmsLoad(Kms kms, double load) { - this.kms = kms; - this.load = load; - } - - public Kms getKms() { - return kms; - } - - public double getLoad() { - return load; - } - - @Override - public int compareTo(KmsLoad o) { - return Double.compare(this.load, o.load); - } - } - - private final Logger log = LoggerFactory.getLogger(KmsManager.class); - - private List kmss = new ArrayList(); - private Iterator usageIterator = null; - - @Override - public KurentoClient getKurentoClient(KurentoClientSessionInfo sessionInfo) throws OpenViduException { - if (!(sessionInfo instanceof OpenViduKurentoClientSessionInfo)) { - throw new OpenViduException(Code.GENERIC_ERROR_CODE, "Unkown session info bean type (expected " - + OpenViduKurentoClientSessionInfo.class.getName() + ")"); - } - return getKms((OpenViduKurentoClientSessionInfo) sessionInfo).getKurentoClient(); - } - - /** - * Returns a {@link Kms} using a round-robin strategy. - * - * @param sessionInfo - * session's id - */ - public synchronized Kms getKms(OpenViduKurentoClientSessionInfo sessionInfo) { - if (usageIterator == null || !usageIterator.hasNext()) { - usageIterator = kmss.iterator(); - } - return usageIterator.next(); - } - - public synchronized void addKms(Kms kms) { - this.kmss.add(kms); - } - - public synchronized Kms getLessLoadedKms() { - return Collections.min(getKmsLoads()).kms; - } - - public synchronized Kms getNextLessLoadedKms() { - List sortedLoads = getKmssSortedByLoad(); - if (sortedLoads.size() > 1) { - return sortedLoads.get(1).kms; - } else { - return sortedLoads.get(0).kms; - } - } - - public synchronized List getKmssSortedByLoad() { - List kmsLoads = getKmsLoads(); - Collections.sort(kmsLoads); - return kmsLoads; - } - - private List getKmsLoads() { - ArrayList kmsLoads = new ArrayList<>(); - for (Kms kms : kmss) { - double load = kms.getLoad(); - kmsLoads.add(new KmsLoad(kms, load)); - log.trace("Calc load {} for kms: {}", load, kms.getUri()); - } - return kmsLoads; - } - - @Override - public boolean destroyWhenUnused() { - return false; - } +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import io.openvidu.server.core.SessionManager; + +@Service +public abstract class KmsManager { + + public class KmsLoad implements Comparable { + + private Kms kms; + private double load; + + public KmsLoad(Kms kms, double load) { + this.kms = kms; + this.load = load; + } + + public Kms getKms() { + return kms; + } + + public double getLoad() { + return load; + } + + @Override + public int compareTo(KmsLoad o) { + return Double.compare(this.load, o.load); + } + } + + @Autowired + protected SessionManager sessionManager; + + @Autowired + protected LoadManager loadManager; + + private final Logger log = LoggerFactory.getLogger(KmsManager.class); + + // Using KMS websocket uris as unique identifiers + private Map kmss = new ConcurrentHashMap<>(); + + private Iterator usageIterator = null; + + public synchronized void addKms(Kms kms) { + this.kmss.put(kms.getUri(), kms); + } + + public synchronized void removeKms(Kms kms) { + this.kmss.remove(kms.getUri()); + } + + public synchronized Kms getKms(String sessionId) { + if (usageIterator == null || !usageIterator.hasNext()) { + usageIterator = kmss.values().iterator(); + } + return usageIterator.next(); + } + + /** + * Returns a {@link Kms} using a round-robin strategy. + * + * @param sessionInfo session's id + */ + public synchronized Kms getKmsRoundRobin() { + if (usageIterator == null || !usageIterator.hasNext()) { + usageIterator = kmss.values().iterator(); + } + return usageIterator.next(); + } + + public synchronized Kms getLessLoadedKms() { + return Collections.min(getKmsLoads()).kms; + } + + public synchronized Kms getNextLessLoadedKms() { + List sortedLoads = getKmssSortedByLoad(); + if (sortedLoads.size() > 1) { + return sortedLoads.get(1).kms; + } else { + return sortedLoads.get(0).kms; + } + } + + public synchronized List getKmssSortedByLoad() { + List kmsLoads = getKmsLoads(); + Collections.sort(kmsLoads); + return kmsLoads; + } + + public boolean isKurentoClientConnectedToKms(Kms kms) { + return this.kmss.get(kms.getUri()).isKurentoClientConnected(); + } + + public long getTimeOfKurentoClientDisconnection(Kms kms) { + return this.kmss.get(kms.getUri()).getTimeOfKurentoClientDisconnection(); + } + + public void setKurentoClientConnectedToKms(String kmsUri, boolean isConnected) { + this.kmss.get(kmsUri).setKurentoClientConnected(isConnected); + } + + public void setTimeOfKurentoClientDisconnection(String kmsUri, long time) { + this.kmss.get(kmsUri).setTimeOfKurentoClientDisconnection(time); + } + + private List getKmsLoads() { + ArrayList kmsLoads = new ArrayList<>(); + for (Kms kms : kmss.values()) { + double load = kms.getLoad(); + kmsLoads.add(new KmsLoad(kms, load)); + log.trace("Calc load {} for kms: {}", load, kms.getUri()); + } + return kmsLoads; + } + + public boolean destroyWhenUnused() { + return false; + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java index 570499cd..a7754ba3 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java @@ -1,26 +1,29 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; +import org.springframework.stereotype.Service; + +@Service public interface LoadManager { double calculateLoad(Kms kms); boolean allowMoreElements(Kms kms); } \ No newline at end of file diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java deleted file mode 100644 index b6623f28..00000000 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.openvidu.server.kurento.kms; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MaxWebRtcLoadManager implements LoadManager { - private static final Logger log = LoggerFactory.getLogger(MaxWebRtcLoadManager.class); - - private int maxWebRtcPerKms; - - public MaxWebRtcLoadManager(int maxWebRtcPerKms) { - this.maxWebRtcPerKms = maxWebRtcPerKms; - } - - @Override - public double calculateLoad(Kms kms) { - int numWebRtcs = countWebRtcEndpoints(kms); - if (numWebRtcs > maxWebRtcPerKms) { - return 1; - } else { - return numWebRtcs / (double) maxWebRtcPerKms; - } - } - - @Override - public boolean allowMoreElements(Kms kms) { - return countWebRtcEndpoints(kms) < maxWebRtcPerKms; - } - - private synchronized int countWebRtcEndpoints(Kms kms) { - try { - return kms.getKurentoClient().getServerManager().getPipelines().size(); - } catch (Throwable e) { - log.warn("Error counting KurentoClient pipelines", e); - return 0; - } - } -} diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java index 2109cccb..b55462a4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/CompositeWrapper.java @@ -1,170 +1,169 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.recording; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.kurento.client.Composite; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.HubPort; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.RecorderEndpoint; import org.kurento.client.RecordingEvent; import org.kurento.client.StoppedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.kms.FixedOneKmsManager; public class CompositeWrapper { private static final Logger log = LoggerFactory.getLogger(CompositeWrapper.class); KurentoSession session; Composite composite; RecorderEndpoint recorderEndpoint; HubPort compositeToRecorderHubPort; Map hubPorts = new ConcurrentHashMap<>(); Map publisherEndpoints = new ConcurrentHashMap<>(); AtomicBoolean isRecording = new AtomicBoolean(false); long startTime; long endTime; long size; public CompositeWrapper(KurentoSession session, String path) { this.session = session; this.composite = new Composite.Builder(session.getPipeline()).build(); this.recorderEndpoint = new RecorderEndpoint.Builder(composite.getMediaPipeline(), path) .withMediaProfile(MediaProfileSpecType.WEBM_AUDIO_ONLY).build(); this.compositeToRecorderHubPort = new HubPort.Builder(composite).build(); this.compositeToRecorderHubPort.connect(recorderEndpoint); } public synchronized void startCompositeRecording(CountDownLatch startLatch) { this.recorderEndpoint.addRecordingListener(new EventListener() { @Override public void onEvent(RecordingEvent event) { startTime = Long.parseLong(event.getTimestampMillis()); log.info("Recording started event for audio-only RecorderEndpoint of Composite in session {}", session.getSessionId()); startLatch.countDown(); } }); this.recorderEndpoint.addErrorListener(new EventListener() { @Override public void onEvent(ErrorEvent event) { log.error(event.getErrorCode() + " " + event.getDescription()); } }); this.recorderEndpoint.record(); } - public synchronized void stopCompositeRecording(CountDownLatch stopLatch, boolean forceAfterKmsRestart) { - if (!forceAfterKmsRestart) { + public synchronized void stopCompositeRecording(CountDownLatch stopLatch, Long timeOfKmsDisconnection) { + if (timeOfKmsDisconnection == 0) { this.recorderEndpoint.addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { endTime = Long.parseLong(event.getTimestampMillis()); log.info("Recording stopped event for audio-only RecorderEndpoint of Composite in session {}", session.getSessionId()); recorderEndpoint.release(); compositeToRecorderHubPort.release(); stopLatch.countDown(); } }); this.recorderEndpoint.stop(); } else { - endTime = FixedOneKmsManager.TIME_OF_DISCONNECTION.get(); + endTime = timeOfKmsDisconnection; stopLatch.countDown(); log.warn("Forcing composed audio-only recording stop after KMS restart in session {}", this.session.getSessionId()); } } public void connectPublisherEndpoint(PublisherEndpoint endpoint) throws OpenViduException { HubPort hubPort = new HubPort.Builder(composite).build(); endpoint.connect(hubPort); String streamId = endpoint.getOwner().getPublisherStreamId(); this.hubPorts.put(streamId, hubPort); this.publisherEndpoints.put(streamId, endpoint); if (isRecording.compareAndSet(false, true)) { // First user publishing. Starting RecorderEndpoint log.info("First stream ({}) joined to Composite in session {}. Starting RecorderEndpoint for Composite", streamId, session.getSessionId()); final CountDownLatch startLatch = new CountDownLatch(1); this.startCompositeRecording(startLatch); try { if (!startLatch.await(5, TimeUnit.SECONDS)) { log.error("Error waiting for RecorderEndpoint of Composite to start in session {}", session.getSessionId()); throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, "Couldn't initialize RecorderEndpoint of Composite"); } log.info("RecorderEnpoint of Composite is now recording for session {}", session.getSessionId()); } catch (InterruptedException e) { log.error("Exception while waiting for state change", e); } } log.info("Composite for session {} has now {} connected publishers", this.session.getSessionId(), this.composite.getChildren().size() - 1); } public void disconnectPublisherEndpoint(String streamId) { HubPort hubPort = this.hubPorts.remove(streamId); PublisherEndpoint publisherEndpoint = this.publisherEndpoints.remove(streamId); publisherEndpoint.disconnectFrom(hubPort); hubPort.release(); log.info("Composite for session {} has now {} connected publishers", this.session.getSessionId(), this.composite.getChildren().size() - 1); } public void disconnectAllPublisherEndpoints() { this.publisherEndpoints.keySet().forEach(streamId -> { PublisherEndpoint endpoint = this.publisherEndpoints.get(streamId); HubPort hubPort = this.hubPorts.get(streamId); endpoint.disconnectFrom(hubPort); hubPort.release(); }); this.hubPorts.clear(); this.publisherEndpoints.clear(); this.composite.release(); } public long getDuration() { return this.endTime - this.startTime; } } \ No newline at end of file diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java index a64ae656..337adbe9 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/ComposedRecordingService.java @@ -1,463 +1,456 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.recording.service; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.Volume; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.java.client.RecordingLayout; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.OpenViduServer; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.recording.CompositeWrapper; import io.openvidu.server.recording.Recording; import io.openvidu.server.recording.RecordingInfoUtils; import io.openvidu.server.utils.DockerManager; public class ComposedRecordingService extends RecordingService { private static final Logger log = LoggerFactory.getLogger(ComposedRecordingService.class); private Map containers = new ConcurrentHashMap<>(); private Map sessionsContainers = new ConcurrentHashMap<>(); private Map composites = new ConcurrentHashMap<>(); private DockerManager dockerManager; public ComposedRecordingService(RecordingManager recordingManager, OpenviduConfig openviduConfig) { super(recordingManager, openviduConfig); this.dockerManager = new DockerManager(); } @Override public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { PropertiesRecordingId updatePropertiesAndRecordingId = this.setFinalRecordingNameAndGetFreeRecordingId(session, properties); properties = updatePropertiesAndRecordingId.properties; String recordingId = updatePropertiesAndRecordingId.recordingId; // Instantiate and store recording object Recording recording = new Recording(session.getSessionId(), recordingId, properties); this.recordingManager.startingRecordings.put(recording.getId(), recording); if (properties.hasVideo()) { // Docker container used recording = this.startRecordingWithVideo(session, recording, properties); } else { // Kurento composite used recording = this.startRecordingAudioOnly(session, recording, properties); } this.updateRecordingManagerCollections(session, recording); return recording; } @Override public Recording stopRecording(Session session, Recording recording, EndReason reason) { - return this.stopRecording(session, recording, reason, false); - } - - public Recording stopRecording(Session session, Recording recording, EndReason reason, - boolean forceAfterKmsRestart) { if (recording.hasVideo()) { return this.stopRecordingWithVideo(session, recording, reason); } else { - return this.stopRecordingAudioOnly(session, recording, reason, forceAfterKmsRestart); + return this.stopRecordingAudioOnly(session, recording, reason); } } public void joinPublisherEndpointToComposite(Session session, String recordingId, Participant participant) throws OpenViduException { log.info("Joining single stream {} to Composite in session {}", participant.getPublisherStreamId(), session.getSessionId()); KurentoParticipant kurentoParticipant = (KurentoParticipant) participant; CompositeWrapper compositeWrapper = this.composites.get(session.getSessionId()); try { compositeWrapper.connectPublisherEndpoint(kurentoParticipant.getPublisher()); } catch (OpenViduException e) { if (Code.RECORDING_START_ERROR_CODE.getValue() == e.getCodeValue()) { // First user publishing triggered RecorderEnpoint start, but it failed throw e; } } } public void removePublisherEndpointFromComposite(String sessionId, String streamId) { CompositeWrapper compositeWrapper = this.composites.get(sessionId); compositeWrapper.disconnectPublisherEndpoint(streamId); } private Recording startRecordingWithVideo(Session session, Recording recording, RecordingProperties properties) throws OpenViduException { log.info("Starting composed ({}) recording {} of session {}", properties.hasAudio() ? "video + audio" : "audio-only", recording.getId(), recording.getSessionId()); List envs = new ArrayList<>(); String layoutUrl = this.getLayoutUrl(recording, this.getShortSessionId(session)); envs.add("URL=" + layoutUrl); envs.add("ONLY_VIDEO=" + !properties.hasAudio()); envs.add("RESOLUTION=" + properties.resolution()); envs.add("FRAMERATE=30"); envs.add("VIDEO_ID=" + recording.getId()); envs.add("VIDEO_NAME=" + properties.name()); envs.add("VIDEO_FORMAT=mp4"); envs.add("RECORDING_JSON=" + recording.toJson().toString()); log.info(recording.toJson().toString()); log.info("Recorder connecting to url {}", layoutUrl); String containerId; try { final String container = RecordingManager.IMAGE_NAME + ":" + RecordingManager.IMAGE_TAG; final String containerName = "recording_" + recording.getId(); Volume volume1 = new Volume("/recordings"); Volume volume2 = new Volume("/dev/shm"); List volumes = new ArrayList<>(); volumes.add(volume1); volumes.add(volume2); Bind bind1 = new Bind(openviduConfig.getOpenViduRecordingPath(), volume1); Bind bind2 = new Bind("/dev/shm", volume2); List binds = new ArrayList<>(); binds.add(bind1); binds.add(bind2); containerId = dockerManager.runContainer(container, containerName, volumes, binds, null, "host", envs); containers.put(containerId, containerName); } catch (Exception e) { this.cleanRecordingMaps(recording); throw this.failStartRecording(session, recording, "Couldn't initialize recording container. Error: " + e.getMessage()); } this.sessionsContainers.put(session.getSessionId(), containerId); try { this.waitForVideoFileNotEmpty(recording); } catch (OpenViduException e) { this.cleanRecordingMaps(recording); throw this.failStartRecording(session, recording, "Couldn't initialize recording container. Error: " + e.getMessage()); } return recording; } private Recording startRecordingAudioOnly(Session session, Recording recording, RecordingProperties properties) throws OpenViduException { log.info("Starting composed (audio-only) recording {} of session {}", recording.getId(), recording.getSessionId()); CompositeWrapper compositeWrapper = new CompositeWrapper((KurentoSession) session, "file://" + this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" + properties.name() + ".webm"); this.composites.put(session.getSessionId(), compositeWrapper); for (Participant p : session.getParticipants()) { if (p.isStreaming()) { try { this.joinPublisherEndpointToComposite(session, recording.getId(), p); } catch (OpenViduException e) { log.error("Error waiting for RecorderEndpooint of Composite to start in session {}", session.getSessionId()); throw this.failStartRecording(session, recording, e.getMessage()); } } } this.generateRecordingMetadataFile(recording); this.sendRecordingStartedNotification(session, recording); return recording; } private Recording stopRecordingWithVideo(Session session, Recording recording, EndReason reason) { log.info("Stopping composed ({}) recording {} of session {}. Reason: {}", recording.hasAudio() ? "video + audio" : "audio-only", recording.getId(), recording.getSessionId(), RecordingManager.finalReason(reason)); String containerId = this.sessionsContainers.remove(recording.getSessionId()); this.cleanRecordingMaps(recording); final String recordingId = recording.getId(); if (session == null) { log.warn( "Existing recording {} does not have an active session associated. This usually means a custom recording" + " layout did not join a recorded participant or the recording has been automatically" + " stopped after last user left and timeout passed", recording.getId()); } if (containerId == null) { // Session was closed while recording container was initializing // Wait until containerId is available and force its stop and removal new Thread(() -> { log.warn("Session closed while starting recording container"); boolean containerClosed = false; String containerIdAux; int i = 0; final int timeout = 30; while (!containerClosed && (i < timeout)) { containerIdAux = this.sessionsContainers.remove(session.getSessionId()); if (containerIdAux == null) { try { log.warn("Waiting for container to be launched..."); i++; Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } else { log.warn("Removing container {} for closed session {}...", containerIdAux, session.getSessionId()); dockerManager.stopDockerContainer(containerIdAux); dockerManager.removeDockerContainer(containerIdAux, false); containers.remove(containerId); containerClosed = true; log.warn("Container {} for closed session {} succesfully stopped and removed", containerIdAux, session.getSessionId()); log.warn("Deleting unusable files for recording {}", recordingId); if (HttpStatus.NO_CONTENT .equals(this.recordingManager.deleteRecordingFromHost(recordingId, true))) { log.warn("Files properly deleted"); } } } if (i == timeout) { log.error("Container did not launched in {} seconds", timeout / 2); return; } }).start(); } else { // Gracefully stop ffmpeg process try { dockerManager.runCommandInContainer(containerId, "echo 'q' > stop", 0); } catch (InterruptedException e1) { e1.printStackTrace(); } // Wait for the container to be gracefully self-stopped final int timeOfWait = 30; try { dockerManager.waitForContainerStopped(containerId, timeOfWait); } catch (Exception e) { failRecordingCompletion(recording, containerId, new OpenViduException(Code.RECORDING_COMPLETION_ERROR_CODE, "The recording completion process couldn't finish in " + timeOfWait + " seconds")); } // Remove container dockerManager.removeDockerContainer(containerId, false); containers.remove(containerId); // Update recording attributes reading from video report file try { RecordingInfoUtils infoUtils = new RecordingInfoUtils( this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" + recordingId + ".info"); if (!infoUtils.hasVideo()) { log.error("COMPOSED recording {} with hasVideo=true has not video track", recordingId); recording.setStatus(io.openvidu.java.client.Recording.Status.failed); } else { recording.setStatus(io.openvidu.java.client.Recording.Status.stopped); recording.setDuration(infoUtils.getDurationInSeconds()); recording.setSize(infoUtils.getSizeInBytes()); recording.setResolution(infoUtils.videoWidth() + "x" + infoUtils.videoHeight()); recording.setHasAudio(infoUtils.hasAudio()); recording.setHasVideo(infoUtils.hasVideo()); } infoUtils.deleteFilePath(); recording = this.recordingManager.updateRecordingUrl(recording); } catch (IOException e) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); throw new OpenViduException(Code.RECORDING_REPORT_ERROR_CODE, "There was an error generating the metadata report file for the recording"); } if (session != null && reason != null) { this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); } } return recording; } - private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason, - boolean forceAfterKmsRestart) { + private Recording stopRecordingAudioOnly(Session session, Recording recording, EndReason reason) { log.info("Stopping composed (audio-only) recording {} of session {}. Reason: {}", recording.getId(), recording.getSessionId(), reason); String sessionId; if (session == null) { log.warn( "Existing recording {} does not have an active session associated. This means the recording " + "has been automatically stopped after last user left and {} seconds timeout passed", recording.getId(), this.openviduConfig.getOpenviduRecordingAutostopTimeout()); sessionId = recording.getSessionId(); } else { sessionId = session.getSessionId(); } CompositeWrapper compositeWrapper = this.composites.remove(sessionId); - final CountDownLatch stoppedCountDown = new CountDownLatch(1); + compositeWrapper.stopCompositeRecording(stoppedCountDown, ((KurentoSession)session).getKms().getTimeOfKurentoClientDisconnection()); - compositeWrapper.stopCompositeRecording(stoppedCountDown, forceAfterKmsRestart); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); log.error("Error waiting for RecorderEndpoint of Composite to stop in session {}", recording.getSessionId()); } } catch (InterruptedException e) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); log.error("Exception while waiting for state change", e); } compositeWrapper.disconnectAllPublisherEndpoints(); this.cleanRecordingMaps(recording); String filesPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; File videoFile = new File(filesPath + recording.getName() + ".webm"); long finalSize = videoFile.length(); double finalDuration = (double) compositeWrapper.getDuration() / 1000; this.updateFilePermissions(filesPath); this.sealRecordingMetadataFile(recording, finalSize, finalDuration, filesPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId()); if (reason != null && session != null) { this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); } return recording; } private void waitForVideoFileNotEmpty(Recording recording) throws OpenViduException { boolean isPresent = false; int i = 1; int timeout = 150; // Wait for 150*150 = 22500 = 22.5 seconds while (!isPresent && timeout <= 150) { try { Thread.sleep(150); timeout++; File f = new File(this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/" + recording.getName() + ".mp4"); isPresent = ((f.isFile()) && (f.length() > 0)); } catch (InterruptedException e) { e.printStackTrace(); } } if (i == timeout) { log.error("Recorder container failed generating video file (is empty) for session {}", recording.getSessionId()); throw new OpenViduException(Code.RECORDING_START_ERROR_CODE, "Recorder container failed generating video file (is empty)"); } } private void failRecordingCompletion(Recording recording, String containerId, OpenViduException e) throws OpenViduException { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); dockerManager.stopDockerContainer(containerId); dockerManager.removeDockerContainer(containerId, true); containers.remove(containerId); throw e; } private String getLayoutUrl(Recording recording, String shortSessionId) { String secret = openviduConfig.getOpenViduSecret(); boolean recordingUrlDefined = openviduConfig.getOpenViduRecordingComposedUrl() != null && !openviduConfig.getOpenViduRecordingComposedUrl().isEmpty(); String recordingUrl = recordingUrlDefined ? openviduConfig.getOpenViduRecordingComposedUrl() : OpenViduServer.wsUrl; recordingUrl = recordingUrl.replaceFirst("wss://", "").replaceFirst("https://", ""); boolean startsWithHttp = recordingUrl.startsWith("http://") || recordingUrl.startsWith("ws://"); if (startsWithHttp) { recordingUrl = recordingUrl.replaceFirst("http://", "").replaceFirst("ws://", ""); } if (recordingUrl.endsWith("/")) { recordingUrl = recordingUrl.substring(0, recordingUrl.length() - 1); } String layout, finalUrl; if (RecordingLayout.CUSTOM.equals(recording.getRecordingLayout())) { layout = recording.getCustomLayout(); if (!layout.isEmpty()) { layout = layout.startsWith("/") ? layout : ("/" + layout); layout = layout.endsWith("/") ? layout.substring(0, layout.length() - 1) : layout; } layout += "/index.html"; finalUrl = (startsWithHttp ? "http" : "https") + "://OPENVIDUAPP:" + secret + "@" + recordingUrl + "/layouts/custom" + layout + "?sessionId=" + shortSessionId + "&secret=" + secret; } else { layout = recording.getRecordingLayout().name().toLowerCase().replaceAll("_", "-"); int port = startsWithHttp ? 80 : 443; try { port = new URL(openviduConfig.getFinalUrl()).getPort(); } catch (MalformedURLException e) { log.error(e.getMessage()); } finalUrl = (startsWithHttp ? "http" : "https") + "://OPENVIDUAPP:" + secret + "@" + recordingUrl + "/#/layout-" + layout + "/" + shortSessionId + "/" + secret + "/" + port + "/" + !recording.hasAudio(); } return finalUrl; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java index 123c1cc5..61394602 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/RecordingManager.java @@ -1,643 +1,641 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.recording.service; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.RecorderEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.client.internal.ProtocolElements; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; -import io.openvidu.server.kurento.OpenViduKurentoClientSessionInfo; +import io.openvidu.server.kurento.core.KurentoSession; +import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.recording.Recording; import io.openvidu.server.utils.CustomFileManager; import io.openvidu.server.utils.DockerManager; @Service public class RecordingManager { private static final Logger log = LoggerFactory.getLogger(RecordingManager.class); RecordingService recordingService; private ComposedRecordingService composedRecordingService; private SingleStreamRecordingService singleStreamRecordingService; private DockerManager dockerManager; @Autowired protected SessionEventsHandler sessionHandler; @Autowired private SessionManager sessionManager; @Autowired protected OpenviduConfig openviduConfig; @Autowired - private KurentoClientProvider kcProvider; + private KmsManager kmsManager; protected Map startingRecordings = new ConcurrentHashMap<>(); protected Map startedRecordings = new ConcurrentHashMap<>(); protected Map sessionsRecordings = new ConcurrentHashMap<>(); private final Map> automaticRecordingStopThreads = new ConcurrentHashMap<>(); private ScheduledThreadPoolExecutor automaticRecordingStopExecutor = new ScheduledThreadPoolExecutor( Runtime.getRuntime().availableProcessors()); static final String RECORDING_ENTITY_FILE = ".recording."; public static final String IMAGE_NAME = "openvidu/openvidu-recording"; static String IMAGE_TAG; private static final List LAST_PARTICIPANT_LEFT_REASONS = Arrays .asList(new EndReason[] { EndReason.disconnect, EndReason.forceDisconnectByUser, EndReason.forceDisconnectByServer, EndReason.networkDisconnect }); public SessionEventsHandler getSessionEventsHandler() { return this.sessionHandler; } public SessionManager getSessionManager() { return this.sessionManager; } public void initializeRecordingManager() throws OpenViduException { RecordingManager.IMAGE_TAG = openviduConfig.getOpenViduRecordingVersion(); this.dockerManager = new DockerManager(); this.composedRecordingService = new ComposedRecordingService(this, openviduConfig); this.singleStreamRecordingService = new SingleStreamRecordingService(this, openviduConfig); log.info("Recording module required: Downloading openvidu/openvidu-recording:" + openviduConfig.getOpenViduRecordingVersion() + " Docker image (350MB aprox)"); this.checkRecordingRequirements(this.openviduConfig.getOpenViduRecordingPath(), this.openviduConfig.getOpenviduRecordingCustomLayout()); if (dockerManager.dockerImageExistsLocally(IMAGE_NAME + ":" + IMAGE_TAG)) { log.info("Docker image already exists locally"); } else { Thread t = new Thread(() -> { boolean keep = true; log.info("Downloading "); while (keep) { System.out.print("."); try { Thread.sleep(1000); } catch (InterruptedException e) { keep = false; log.info("\nDownload complete"); } } }); t.start(); try { dockerManager.downloadDockerImage(IMAGE_NAME + ":" + IMAGE_TAG, 600); } catch (Exception e) { log.error("Error downloading docker image {}:{}", IMAGE_NAME, IMAGE_TAG); } t.interrupt(); try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Docker image available"); } // Clean any stranded openvidu/openvidu-recording container on startup dockerManager.cleanStrandedContainers(RecordingManager.IMAGE_NAME); } public void checkRecordingRequirements(String openviduRecordingPath, String openviduRecordingCustomLayout) throws OpenViduException { if (dockerManager == null) { this.dockerManager = new DockerManager(); } dockerManager.checkDockerEnabled(openviduConfig.getSpringProfile()); this.checkRecordingPaths(openviduRecordingPath, openviduRecordingCustomLayout); } public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { Recording recording = null; try { switch (properties.outputMode()) { case COMPOSED: recording = this.composedRecordingService.startRecording(session, properties); break; case INDIVIDUAL: recording = this.singleStreamRecordingService.startRecording(session, properties); break; } } catch (OpenViduException e) { throw e; } if (session.getActivePublishers() == 0) { // Init automatic recording stop if there are now publishers when starting // recording log.info("No publisher in session {}. Starting {} seconds countdown for stopping recording", session.getSessionId(), this.openviduConfig.getOpenviduRecordingAutostopTimeout()); this.initAutomaticRecordingStopThread(session); } return recording; } public Recording stopRecording(Session session, String recordingId, EndReason reason) { Recording recording; if (session == null) { recording = this.startedRecordings.get(recordingId); } else { recording = this.sessionsRecordings.get(session.getSessionId()); } switch (recording.getOutputMode()) { case COMPOSED: recording = this.composedRecordingService.stopRecording(session, recording, reason); break; case INDIVIDUAL: recording = this.singleStreamRecordingService.stopRecording(session, recording, reason); break; } this.abortAutomaticRecordingStopThread(session); return recording; } public Recording forceStopRecording(Session session, EndReason reason) { Recording recording; recording = this.sessionsRecordings.get(session.getSessionId()); switch (recording.getOutputMode()) { case COMPOSED: - recording = this.composedRecordingService.stopRecording(session, recording, reason, true); + recording = this.composedRecordingService.stopRecording(session, recording, reason); break; case INDIVIDUAL: - recording = this.singleStreamRecordingService.stopRecording(session, recording, reason, true); + recording = this.singleStreamRecordingService.stopRecording(session, recording, reason); break; } this.abortAutomaticRecordingStopThread(session); return recording; } public void startOneIndividualStreamRecording(Session session, String recordingId, MediaProfileSpecType profile, Participant participant) { Recording recording = this.sessionsRecordings.get(session.getSessionId()); if (recording == null) { log.error("Cannot start recording of new stream {}. Session {} is not being recorded", participant.getPublisherStreamId(), session.getSessionId()); } if (io.openvidu.java.client.Recording.OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) { // Start new RecorderEndpoint for this stream log.info("Starting new RecorderEndpoint in session {} for new stream of participant {}", session.getSessionId(), participant.getParticipantPublicId()); final CountDownLatch startedCountDown = new CountDownLatch(1); this.singleStreamRecordingService.startRecorderEndpointForPublisherEndpoint(session, recordingId, profile, participant, startedCountDown); } else if (io.openvidu.java.client.Recording.OutputMode.COMPOSED.equals(recording.getOutputMode()) && !recording.hasVideo()) { // Connect this stream to existing Composite recorder log.info("Joining PublisherEndpoint to existing Composite in session {} for new stream of participant {}", session.getSessionId(), participant.getParticipantPublicId()); this.composedRecordingService.joinPublisherEndpointToComposite(session, recordingId, participant); } } - public void stopOneIndividualStreamRecording(String sessionId, String streamId, boolean forceAfterKmsRestart) { - Recording recording = this.sessionsRecordings.get(sessionId); + public void stopOneIndividualStreamRecording(KurentoSession session, String streamId) { + Recording recording = this.sessionsRecordings.get(session.getSessionId()); if (recording == null) { log.error("Cannot stop recording of existing stream {}. Session {} is not being recorded", streamId, - sessionId); + session.getSessionId()); } if (io.openvidu.java.client.Recording.OutputMode.INDIVIDUAL.equals(recording.getOutputMode())) { // Stop specific RecorderEndpoint for this stream - log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", sessionId, streamId); + log.info("Stopping RecorderEndpoint in session {} for stream of participant {}", session.getSessionId(), + streamId); final CountDownLatch stoppedCountDown = new CountDownLatch(1); - this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(sessionId, streamId, - stoppedCountDown, forceAfterKmsRestart); + this.singleStreamRecordingService.stopRecorderEndpointOfPublisherEndpoint(session.getSessionId(), streamId, + stoppedCountDown, session.getKms().getTimeOfKurentoClientDisconnection()); try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { log.error("Error waiting for recorder endpoint of stream {} to stop in session {}", streamId, - sessionId); + session.getSessionId()); } } catch (InterruptedException e) { log.error("Exception while waiting for state change", e); } } else if (io.openvidu.java.client.Recording.OutputMode.COMPOSED.equals(recording.getOutputMode()) && !recording.hasVideo()) { // Disconnect this stream from existing Composite recorder - log.info("Removing PublisherEndpoint from Composite in session {} for stream of participant {}", sessionId, - streamId); - this.composedRecordingService.removePublisherEndpointFromComposite(sessionId, streamId); + log.info("Removing PublisherEndpoint from Composite in session {} for stream of participant {}", + session.getSessionId(), streamId); + this.composedRecordingService.removePublisherEndpointFromComposite(session.getSessionId(), streamId); } } public boolean sessionIsBeingRecorded(String sessionId) { return (this.sessionsRecordings.get(sessionId) != null); } public Recording getStartedRecording(String recordingId) { return this.startedRecordings.get(recordingId); } public Recording getStartingRecording(String recordingId) { return this.startingRecordings.get(recordingId); } public Collection getFinishedRecordings() { return this.getAllRecordingsFromHost().stream() .filter(recording -> (recording.getStatus().equals(io.openvidu.java.client.Recording.Status.stopped) || recording.getStatus().equals(io.openvidu.java.client.Recording.Status.available))) .collect(Collectors.toSet()); } public Recording getRecording(String recordingId) { return this.getRecordingFromHost(recordingId); } public Collection getAllRecordings() { return this.getAllRecordingsFromHost(); } public String getFreeRecordingId(String sessionId, String shortSessionId) { Set recordingIds = this.getRecordingIdsFromHost(); String recordingId = shortSessionId; boolean isPresent = recordingIds.contains(recordingId); int i = 1; while (isPresent) { recordingId = shortSessionId + "-" + i; i++; isPresent = recordingIds.contains(recordingId); } return recordingId; } public HttpStatus deleteRecordingFromHost(String recordingId, boolean force) { if (!force && (this.startedRecordings.containsKey(recordingId) || this.startingRecordings.containsKey(recordingId))) { // Cannot delete an active recording return HttpStatus.CONFLICT; } Recording recording = getRecordingFromHost(recordingId); if (recording == null) { return HttpStatus.NOT_FOUND; } File folder = new File(this.openviduConfig.getOpenViduRecordingPath()); File[] files = folder.listFiles(); for (int i = 0; i < files.length; i++) { if (files[i].isDirectory() && files[i].getName().equals(recordingId)) { // Correct folder. Delete it try { FileUtils.deleteDirectory(files[i]); } catch (IOException e) { log.error("Couldn't delete folder {}", files[i].getAbsolutePath()); } break; } } return HttpStatus.NO_CONTENT; } public Recording getRecordingFromEntityFile(File file) { if (file.isFile() && file.getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { JsonObject json = null; FileReader fr = null; try { fr = new FileReader(file); json = new JsonParser().parse(fr).getAsJsonObject(); } catch (IOException e) { return null; } finally { try { fr.close(); } catch (Exception e) { log.error("Exception while closing FileReader: {}", e.getMessage()); } } return new Recording(json); } return null; } public void initAutomaticRecordingStopThread(final Session session) { final String recordingId = this.sessionsRecordings.get(session.getSessionId()).getId(); ScheduledFuture future = this.automaticRecordingStopExecutor.schedule(() -> { log.info("Stopping recording {} after {} seconds wait (no publisher published before timeout)", recordingId, this.openviduConfig.getOpenviduRecordingAutostopTimeout()); if (this.automaticRecordingStopThreads.remove(session.getSessionId()) != null) { if (session.getParticipants().size() == 0 || (session.getParticipants().size() == 1 && session.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID) != null)) { // Close session if there are no participants connected (except for RECORDER). // This code won't be executed only when some user reconnects to the session // but never publishing (publishers automatically abort this thread) log.info("Closing session {} after automatic stop of recording {}", session.getSessionId(), recordingId); sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop); sessionManager.showTokens(); } else { this.stopRecording(session, recordingId, EndReason.automaticStop); } } else { // This code is reachable if there already was an automatic stop of a recording // caused by not user publishing within timeout after recording started, and a // new automatic stop thread was started by last user leaving the session log.warn("Recording {} was already automatically stopped by a previous thread", recordingId); } }, this.openviduConfig.getOpenviduRecordingAutostopTimeout(), TimeUnit.SECONDS); this.automaticRecordingStopThreads.putIfAbsent(session.getSessionId(), future); } public boolean abortAutomaticRecordingStopThread(Session session) { ScheduledFuture future = this.automaticRecordingStopThreads.remove(session.getSessionId()); if (future != null) { boolean cancelled = future.cancel(false); if (session.getParticipants().size() == 0 || (session.getParticipants().size() == 1 && session.getParticipantByPublicId(ProtocolElements.RECORDER_PARTICIPANT_PUBLICID) != null)) { // Close session if there are no participants connected (except for RECORDER). // This code will only be executed if recording is manually stopped during the // automatic stop timeout, so the session must be also closed log.info( "Ongoing recording of session {} was explicetly stopped within timeout for automatic recording stop. Closing session", session.getSessionId()); sessionManager.closeSessionAndEmptyCollections(session, EndReason.automaticStop); sessionManager.showTokens(); } return cancelled; } else { return true; } } public Recording updateRecordingUrl(Recording recording) { if (openviduConfig.getOpenViduRecordingPublicAccess()) { if (io.openvidu.java.client.Recording.Status.stopped.equals(recording.getStatus())) { String extension; switch (recording.getOutputMode()) { case COMPOSED: extension = recording.hasVideo() ? "mp4" : "webm"; break; case INDIVIDUAL: extension = "zip"; break; default: extension = "mp4"; } recording.setUrl(this.openviduConfig.getFinalUrl() + "recordings/" + recording.getId() + "/" + recording.getName() + "." + extension); recording.setStatus(io.openvidu.java.client.Recording.Status.available); } } return recording; } private Recording getRecordingFromHost(String recordingId) { log.info(this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" + RecordingManager.RECORDING_ENTITY_FILE + recordingId); File file = new File(this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" + RecordingManager.RECORDING_ENTITY_FILE + recordingId); log.info("File exists: " + file.exists()); Recording recording = this.getRecordingFromEntityFile(file); if (recording != null) { this.updateRecordingUrl(recording); } return recording; } private Set getAllRecordingsFromHost() { File folder = new File(this.openviduConfig.getOpenViduRecordingPath()); File[] files = folder.listFiles(); Set recordingEntities = new HashSet<>(); for (int i = 0; i < files.length; i++) { if (files[i].isDirectory()) { File[] innerFiles = files[i].listFiles(); for (int j = 0; j < innerFiles.length; j++) { Recording recording = this.getRecordingFromEntityFile(innerFiles[j]); if (recording != null) { this.updateRecordingUrl(recording); recordingEntities.add(recording); } } } } return recordingEntities; } private Set getRecordingIdsFromHost() { File folder = new File(this.openviduConfig.getOpenViduRecordingPath()); File[] files = folder.listFiles(); Set fileNamesNoExtension = new HashSet<>(); for (int i = 0; i < files.length; i++) { if (files[i].isDirectory()) { File[] innerFiles = files[i].listFiles(); for (int j = 0; j < innerFiles.length; j++) { if (innerFiles[j].isFile() && innerFiles[j].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { fileNamesNoExtension .add(innerFiles[j].getName().replaceFirst(RecordingManager.RECORDING_ENTITY_FILE, "")); break; } } } } return fileNamesNoExtension; } private void checkRecordingPaths(String openviduRecordingPath, String openviduRecordingCustomLayout) throws OpenViduException { log.info("Initializing recording paths"); Path recordingPath = null; try { recordingPath = Files.createDirectories(Paths.get(openviduRecordingPath)); } catch (IOException e) { String errorMessage = "The recording path \"" + openviduRecordingPath + "\" is not valid. Reason: OpenVidu Server cannot find path \"" + openviduRecordingPath + "\" and doesn't have permissions to create it"; log.error(errorMessage); throw new OpenViduException(Code.RECORDING_PATH_NOT_VALID, errorMessage); } // Check OpenVidu Server write permissions in recording path if (!Files.isWritable(recordingPath)) { String errorMessage = "The recording path \"" + openviduRecordingPath + "\" is not valid. Reason: OpenVidu Server needs write permissions. Try running command \"sudo chmod 777 " + openviduRecordingPath + "\""; log.error(errorMessage); throw new OpenViduException(Code.RECORDING_PATH_NOT_VALID, errorMessage); } else { log.info("OpenVidu Server has write permissions on recording path: {}", openviduRecordingPath); } final String testFolderPath = openviduRecordingPath + "/TEST_RECORDING_PATH_" + System.currentTimeMillis(); final String testFilePath = testFolderPath + "/TEST_RECORDING_PATH.webm"; // Check Kurento Media Server write permissions in recording path - KurentoClientSessionInfo kcSessionInfo = new OpenViduKurentoClientSessionInfo("TEST_RECORDING_PATH", - "TEST_RECORDING_PATH"); - MediaPipeline pipeline = this.kcProvider.getKurentoClient(kcSessionInfo).createMediaPipeline(); + MediaPipeline pipeline = this.kmsManager.getLessLoadedKms().getKurentoClient().createMediaPipeline(); RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + testFilePath).build(); final AtomicBoolean kurentoRecorderError = new AtomicBoolean(false); recorder.addErrorListener(new EventListener() { @Override public void onEvent(ErrorEvent event) { if (event.getErrorCode() == 6) { // KMS write permissions error kurentoRecorderError.compareAndSet(false, true); } } }); recorder.record(); try { // Give the error event some time to trigger if necessary Thread.sleep(500); } catch (InterruptedException e1) { e1.printStackTrace(); } if (kurentoRecorderError.get()) { String errorMessage = "The recording path \"" + openviduRecordingPath + "\" is not valid. Reason: Kurento Media Server needs write permissions. Try running command \"sudo chmod 777 " + openviduRecordingPath + "\""; log.error(errorMessage); throw new OpenViduException(Code.RECORDING_PATH_NOT_VALID, errorMessage); } recorder.stop(); recorder.release(); pipeline.release(); log.info("Kurento Media Server has write permissions on recording path: {}", openviduRecordingPath); try { new CustomFileManager().deleteFolder(testFolderPath); log.info("OpenVidu Server has write permissions over files created by Kurento Media Server"); } catch (IOException e) { String errorMessage = "The recording path \"" + openviduRecordingPath + "\" is not valid. Reason: OpenVidu Server does not have write permissions over files created by Kurento Media Server. " + "Try running Kurento Media Server as user \"" + System.getProperty("user.name") + "\" or run OpenVidu Server as superuser"; log.error(errorMessage); log.error("Be aware that a folder \"{}\" was created and should be manually deleted (\"sudo rm -rf {}\")", testFolderPath, testFolderPath); throw new OpenViduException(Code.RECORDING_PATH_NOT_VALID, errorMessage); } if (openviduConfig.openviduRecordingCustomLayoutChanged(openviduRecordingCustomLayout)) { // Property openvidu.recording.custom-layout changed File dir = new File(openviduRecordingCustomLayout); if (dir.exists()) { if (!dir.isDirectory()) { String errorMessage = "The custom layouts path \"" + openviduRecordingCustomLayout + "\" is not valid. Reason: path already exists but it is not a directory"; log.error(errorMessage); throw new OpenViduException(Code.RECORDING_FILE_EMPTY_ERROR, errorMessage); } else { if (dir.listFiles() == null) { String errorMessage = "The custom layouts path \"" + openviduRecordingCustomLayout + "\" is not valid. Reason: OpenVidu Server needs read permissions. Try running command \"sudo chmod 755 " + openviduRecordingCustomLayout + "\""; log.error(errorMessage); throw new OpenViduException(Code.RECORDING_FILE_EMPTY_ERROR, errorMessage); } else { log.info("OpenVidu Server has read permissions on custom layout path: {}", openviduRecordingCustomLayout); log.info("Custom layouts path successfully initialized at {}", openviduRecordingCustomLayout); } } } else { try { Files.createDirectories(dir.toPath()); log.warn( "OpenVidu custom layouts path (system property 'openvidu.recording.custom-layout') has been created, being folder {}. " + "It is an empty folder, so no custom layout is currently present", dir.getAbsolutePath()); } catch (IOException e) { String errorMessage = "The custom layouts path \"" + openviduRecordingCustomLayout + "\" is not valid. Reason: OpenVidu Server cannot find path \"" + openviduRecordingCustomLayout + "\" and doesn't have permissions to create it"; log.error(errorMessage); throw new OpenViduException(Code.RECORDING_FILE_EMPTY_ERROR, errorMessage); } } } log.info("Recording path successfully initialized at {}", openviduRecordingPath); } public static EndReason finalReason(EndReason reason) { if (RecordingManager.LAST_PARTICIPANT_LEFT_REASONS.contains(reason)) { return EndReason.lastParticipantLeft; } else { return reason; } } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java index 3b7a6887..b6f14958 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java +++ b/openvidu-server/src/main/java/io/openvidu/server/recording/service/SingleStreamRecordingService.java @@ -1,454 +1,451 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.recording.service; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.commons.io.FilenameUtils; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.MediaType; import org.kurento.client.RecorderEndpoint; import org.kurento.client.RecordingEvent; import org.kurento.client.StoppedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.openvidu.client.OpenViduException; import io.openvidu.client.OpenViduException.Code; import io.openvidu.java.client.RecordingProperties; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.EndReason; import io.openvidu.server.core.Participant; import io.openvidu.server.core.Session; import io.openvidu.server.kurento.core.KurentoParticipant; +import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.kurento.endpoint.PublisherEndpoint; -import io.openvidu.server.kurento.kms.FixedOneKmsManager; import io.openvidu.server.recording.RecorderEndpointWrapper; import io.openvidu.server.recording.Recording; public class SingleStreamRecordingService extends RecordingService { private static final Logger log = LoggerFactory.getLogger(SingleStreamRecordingService.class); private Map> recorders = new ConcurrentHashMap<>(); private final String INDIVIDUAL_STREAM_METADATA_FILE = ".stream."; public SingleStreamRecordingService(RecordingManager recordingManager, OpenviduConfig openviduConfig) { super(recordingManager, openviduConfig); } @Override public Recording startRecording(Session session, RecordingProperties properties) throws OpenViduException { PropertiesRecordingId updatePropertiesAndRecordingId = this.setFinalRecordingNameAndGetFreeRecordingId(session, properties); properties = updatePropertiesAndRecordingId.properties; String recordingId = updatePropertiesAndRecordingId.recordingId; log.info("Starting individual ({}) recording {} of session {}", properties.hasVideo() ? (properties.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recordingId, session.getSessionId()); Recording recording = new Recording(session.getSessionId(), recordingId, properties); this.recordingManager.startingRecordings.put(recording.getId(), recording); recorders.put(session.getSessionId(), new ConcurrentHashMap()); final int activePublishers = session.getActivePublishers(); final CountDownLatch recordingStartedCountdown = new CountDownLatch(activePublishers); for (Participant p : session.getParticipants()) { if (p.isStreaming()) { MediaProfileSpecType profile = null; try { profile = generateMediaProfile(properties, p); } catch (OpenViduException e) { log.error( "Cannot start single stream recorder for stream {} in session {}: {}. Skipping to next stream being published", p.getPublisherStreamId(), session.getSessionId(), e.getMessage()); recordingStartedCountdown.countDown(); continue; } this.startRecorderEndpointForPublisherEndpoint(session, recordingId, profile, p, recordingStartedCountdown); } } try { if (!recordingStartedCountdown.await(5, TimeUnit.SECONDS)) { log.error("Error waiting for some recorder endpoint to start in session {}", session.getSessionId()); throw this.failStartRecording(session, recording, "Couldn't initialize some RecorderEndpoint"); } } catch (InterruptedException e) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); log.error("Exception while waiting for state change", e); } this.generateRecordingMetadataFile(recording); this.updateRecordingManagerCollections(session, recording); this.sendRecordingStartedNotification(session, recording); return recording; } @Override public Recording stopRecording(Session session, Recording recording, EndReason reason) { - return this.stopRecording(session, recording, reason, false); - } - - public Recording stopRecording(Session session, Recording recording, EndReason reason, - boolean forceAfterKmsRestart) { log.info("Stopping individual ({}) recording {} of session {}. Reason: {}", recording.hasVideo() ? (recording.hasAudio() ? "video+audio" : "video-only") : "audioOnly", recording.getId(), recording.getSessionId(), reason); final int numberOfActiveRecorders = recorders.get(recording.getSessionId()).size(); final CountDownLatch stoppedCountDown = new CountDownLatch(numberOfActiveRecorders); + final long timeOfKurentoClientDisconnection = ((KurentoSession) session).getKms() + .getTimeOfKurentoClientDisconnection(); for (RecorderEndpointWrapper wrapper : recorders.get(recording.getSessionId()).values()) { this.stopRecorderEndpointOfPublisherEndpoint(recording.getSessionId(), wrapper.getStreamId(), - stoppedCountDown, forceAfterKmsRestart); + stoppedCountDown, timeOfKurentoClientDisconnection); } try { if (!stoppedCountDown.await(5, TimeUnit.SECONDS)) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); log.error("Error waiting for some recorder endpoint to stop in session {}", recording.getSessionId()); } } catch (InterruptedException e) { recording.setStatus(io.openvidu.java.client.Recording.Status.failed); log.error("Exception while waiting for state change", e); } this.cleanRecordingMaps(recording); this.recorders.remove(recording.getSessionId()); recording = this.sealMetadataFiles(recording); if (reason != null && session != null) { this.recordingManager.sessionHandler.sendRecordingStoppedNotification(session, recording, reason); } return recording; } public void startRecorderEndpointForPublisherEndpoint(Session session, String recordingId, MediaProfileSpecType profile, Participant participant, CountDownLatch globalStartLatch) { log.info("Starting single stream recorder for stream {} in session {}", participant.getPublisherStreamId(), session.getSessionId()); if (recordingId == null) { // Stream is being recorded because is a new publisher in an ongoing recorded // session. If recordingId is defined is because Stream is being recorded from // "startRecording" method Recording recording = this.recordingManager.sessionsRecordings.get(session.getSessionId()); recordingId = recording.getId(); try { profile = generateMediaProfile(recording.getRecordingProperties(), participant); } catch (OpenViduException e) { log.error("Cannot start single stream recorder for stream {} in session {}: {}", participant.getPublisherStreamId(), session.getSessionId(), e.getMessage()); return; } } KurentoParticipant kurentoParticipant = (KurentoParticipant) participant; MediaPipeline pipeline = kurentoParticipant.getPublisher().getPipeline(); RecorderEndpoint recorder = new RecorderEndpoint.Builder(pipeline, "file://" + this.openviduConfig.getOpenViduRecordingPath() + recordingId + "/" + participant.getPublisherStreamId() + ".webm").withMediaProfile(profile).build(); recorder.addRecordingListener(new EventListener() { @Override public void onEvent(RecordingEvent event) { recorders.get(session.getSessionId()).get(participant.getPublisherStreamId()) .setStartTime(Long.parseLong(event.getTimestampMillis())); log.info("Recording started event for stream {}", participant.getPublisherStreamId()); globalStartLatch.countDown(); } }); recorder.addErrorListener(new EventListener() { @Override public void onEvent(ErrorEvent event) { log.error(event.getErrorCode() + " " + event.getDescription()); } }); connectAccordingToProfile(kurentoParticipant.getPublisher(), recorder, profile); RecorderEndpointWrapper wrapper = new RecorderEndpointWrapper(recorder, participant.getParticipantPublicId(), recordingId, participant.getPublisherStreamId(), participant.getClientMetadata(), participant.getServerMetadata(), kurentoParticipant.getPublisher().getMediaOptions().hasAudio(), kurentoParticipant.getPublisher().getMediaOptions().hasVideo(), kurentoParticipant.getPublisher().getMediaOptions().getTypeOfVideo()); recorders.get(session.getSessionId()).put(participant.getPublisherStreamId(), wrapper); wrapper.getRecorder().record(); } public void stopRecorderEndpointOfPublisherEndpoint(String sessionId, String streamId, - CountDownLatch globalStopLatch, boolean forceAfterKmsRestart) { + CountDownLatch globalStopLatch, Long kmsDisconnectionTime) { log.info("Stopping single stream recorder for stream {} in session {}", streamId, sessionId); final RecorderEndpointWrapper finalWrapper = this.recorders.get(sessionId).remove(streamId); - if (finalWrapper != null && !forceAfterKmsRestart) { + if (finalWrapper != null && kmsDisconnectionTime == 0) { finalWrapper.getRecorder().addStoppedListener(new EventListener() { @Override public void onEvent(StoppedEvent event) { finalWrapper.setEndTime(Long.parseLong(event.getTimestampMillis())); generateIndividualMetadataFile(finalWrapper); log.info("Recording stopped event for stream {}", streamId); finalWrapper.getRecorder().release(); globalStopLatch.countDown(); } }); finalWrapper.getRecorder().stop(); } else { - if (forceAfterKmsRestart) { - finalWrapper.setEndTime(FixedOneKmsManager.TIME_OF_DISCONNECTION.get()); + if (kmsDisconnectionTime != 0) { + finalWrapper.setEndTime(kmsDisconnectionTime); generateIndividualMetadataFile(finalWrapper); log.warn("Forcing individual recording stop after KMS restart for stream {} in session {}", streamId, sessionId); } else { log.error("Stream {} wasn't being recorded in session {}", streamId, sessionId); } globalStopLatch.countDown(); } } private MediaProfileSpecType generateMediaProfile(RecordingProperties properties, Participant participant) throws OpenViduException { KurentoParticipant kParticipant = (KurentoParticipant) participant; MediaProfileSpecType profile = null; boolean streamHasAudio = kParticipant.getPublisher().getMediaOptions().hasAudio(); boolean streamHasVideo = kParticipant.getPublisher().getMediaOptions().hasVideo(); boolean propertiesHasAudio = properties.hasAudio(); boolean propertiesHasVideo = properties.hasVideo(); if (streamHasAudio) { if (streamHasVideo) { // Stream has both audio and video tracks if (propertiesHasAudio) { if (propertiesHasVideo) { profile = MediaProfileSpecType.WEBM; } else { profile = MediaProfileSpecType.WEBM_AUDIO_ONLY; } } else { profile = MediaProfileSpecType.WEBM_VIDEO_ONLY; } } else { // Stream has audio track only if (propertiesHasAudio) { profile = MediaProfileSpecType.WEBM_AUDIO_ONLY; } else { // ERROR: RecordingProperties set to video only but there's no video track throw new OpenViduException( Code.MEDIA_TYPE_STREAM_INCOMPATIBLE_WITH_RECORDING_PROPERTIES_ERROR_CODE, "RecordingProperties set to \"hasAudio(false)\" but stream is audio-only"); } } } else if (streamHasVideo) { // Stream has video track only if (propertiesHasVideo) { profile = MediaProfileSpecType.WEBM_VIDEO_ONLY; } else { // ERROR: RecordingProperties set to audio only but there's no audio track throw new OpenViduException(Code.MEDIA_TYPE_STREAM_INCOMPATIBLE_WITH_RECORDING_PROPERTIES_ERROR_CODE, "RecordingProperties set to \"hasVideo(false)\" but stream is video-only"); } } else { // ERROR: Stream has no track at all. This branch should never be reachable throw new OpenViduException(Code.MEDIA_TYPE_STREAM_INCOMPATIBLE_WITH_RECORDING_PROPERTIES_ERROR_CODE, "Stream has no track at all. Cannot be recorded"); } return profile; } private void connectAccordingToProfile(PublisherEndpoint publisherEndpoint, RecorderEndpoint recorder, MediaProfileSpecType profile) { switch (profile) { case WEBM: publisherEndpoint.connect(recorder, MediaType.AUDIO); publisherEndpoint.connect(recorder, MediaType.VIDEO); break; case WEBM_AUDIO_ONLY: publisherEndpoint.connect(recorder, MediaType.AUDIO); break; case WEBM_VIDEO_ONLY: publisherEndpoint.connect(recorder, MediaType.VIDEO); break; default: throw new UnsupportedOperationException("Unsupported profile when single stream recording: " + profile); } } private void generateIndividualMetadataFile(RecorderEndpointWrapper wrapper) { String filesPath = this.openviduConfig.getOpenViduRecordingPath() + wrapper.getRecordingId() + "/"; File videoFile = new File(filesPath + wrapper.getStreamId() + ".webm"); wrapper.setSize(videoFile.length()); String metadataFilePath = filesPath + INDIVIDUAL_STREAM_METADATA_FILE + wrapper.getStreamId(); String metadataFileContent = wrapper.toJson().toString(); this.fileWriter.createAndWriteFile(metadataFilePath, metadataFileContent); } private Recording sealMetadataFiles(Recording recording) { // Must update recording "status" (to stopped), "duration" (min startTime of all // individual recordings) and "size" (sum of all individual recordings size) String folderPath = this.openviduConfig.getOpenViduRecordingPath() + recording.getId() + "/"; String metadataFilePath = folderPath + RecordingManager.RECORDING_ENTITY_FILE + recording.getId(); String syncFilePath = folderPath + recording.getName() + ".json"; recording = this.recordingManager.getRecordingFromEntityFile(new File(metadataFilePath)); long minStartTime = Long.MAX_VALUE; long maxEndTime = 0; long accumulatedSize = 0; File folder = new File(folderPath); File[] files = folder.listFiles(); Reader reader = null; Gson gson = new Gson(); // Sync metadata json object to store in "RECORDING_NAME.json" JsonObject json = new JsonObject(); json.addProperty("createdAt", recording.getCreatedAt()); json.addProperty("id", recording.getId()); json.addProperty("name", recording.getName()); json.addProperty("sessionId", recording.getSessionId()); JsonArray jsonArrayFiles = new JsonArray(); for (int i = 0; i < files.length; i++) { if (files[i].isFile() && files[i].getName().startsWith(INDIVIDUAL_STREAM_METADATA_FILE)) { try { reader = new FileReader(files[i].getAbsolutePath()); } catch (FileNotFoundException e) { log.error("Error reading file {}. Error: {}", files[i].getAbsolutePath(), e.getMessage()); } RecorderEndpointWrapper wr = gson.fromJson(reader, RecorderEndpointWrapper.class); minStartTime = Math.min(minStartTime, wr.getStartTime()); maxEndTime = Math.max(maxEndTime, wr.getEndTime()); accumulatedSize += wr.getSize(); JsonObject jsonFile = new JsonObject(); jsonFile.addProperty("connectionId", wr.getConnectionId()); jsonFile.addProperty("streamId", wr.getStreamId()); jsonFile.addProperty("size", wr.getSize()); jsonFile.addProperty("clientData", wr.getClientData()); jsonFile.addProperty("serverData", wr.getServerData()); jsonFile.addProperty("hasAudio", wr.hasAudio() && recording.hasAudio()); jsonFile.addProperty("hasVideo", wr.hasVideo() && recording.hasVideo()); if (wr.hasVideo()) { jsonFile.addProperty("typeOfVideo", wr.getTypeOfVideo()); } jsonFile.addProperty("startTimeOffset", wr.getStartTime() - recording.getCreatedAt()); jsonFile.addProperty("endTimeOffset", wr.getEndTime() - recording.getCreatedAt()); jsonArrayFiles.add(jsonFile); } } json.add("files", jsonArrayFiles); this.fileWriter.createAndWriteFile(syncFilePath, new GsonBuilder().setPrettyPrinting().create().toJson(json)); this.generateZipFileAndCleanFolder(folderPath, recording.getName() + ".zip"); double duration = (double) (maxEndTime - minStartTime) / 1000; duration = duration > 0 ? duration : 0; recording = this.sealRecordingMetadataFile(recording, accumulatedSize, duration, metadataFilePath); return recording; } private void generateZipFileAndCleanFolder(String folder, String fileName) { FileOutputStream fos = null; ZipOutputStream zipOut = null; final File[] files = new File(folder).listFiles(); try { fos = new FileOutputStream(folder + fileName); zipOut = new ZipOutputStream(fos); for (int i = 0; i < files.length; i++) { String fileExtension = FilenameUtils.getExtension(files[i].getName()); if (files[i].isFile() && (fileExtension.equals("json") || fileExtension.equals("webm"))) { // Zip video files and json sync metadata file FileInputStream fis = new FileInputStream(files[i]); ZipEntry zipEntry = new ZipEntry(files[i].getName()); zipOut.putNextEntry(zipEntry); byte[] bytes = new byte[1024]; int length; while ((length = fis.read(bytes)) >= 0) { zipOut.write(bytes, 0, length); } fis.close(); } if (!files[i].getName().startsWith(RecordingManager.RECORDING_ENTITY_FILE)) { // Clean inspected file if it is not files[i].delete(); } } } catch (IOException e) { log.error("Error generating ZIP file {}. Error: {}", folder + fileName, e.getMessage()); } finally { try { zipOut.close(); fos.close(); this.updateFilePermissions(folder); } catch (IOException e) { log.error("Error closing FileOutputStream or ZipOutputStream. Error: {}", e.getMessage()); e.printStackTrace(); } } } } diff --git a/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java b/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java index f9644622..63c2fdea 100644 --- a/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java +++ b/openvidu-server/src/test/java/io/openvidu/server/test/core/RoomManagerTest.java @@ -1,1380 +1,1336 @@ /* * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.openvidu.server.test.core; -import static org.junit.matchers.JUnitMatchers.containsString; -import static org.junit.matchers.JUnitMatchers.hasItem; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; - -import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.hamcrest.CoreMatchers; + import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.kurento.client.Continuation; import org.kurento.client.ErrorEvent; import org.kurento.client.EventListener; import org.kurento.client.FaceOverlayFilter; import org.kurento.client.HubPort; -import org.kurento.client.IceCandidate; import org.kurento.client.KurentoClient; -import org.kurento.client.MediaElement; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaType; import org.kurento.client.Mixer; import org.kurento.client.OnIceCandidateEvent; import org.kurento.client.PassThrough; import org.kurento.client.RtpEndpoint; import org.kurento.client.ServerManager; import org.kurento.client.WebRtcEndpoint; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.Matchers; import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import org.springframework.context.ConfigurableApplicationContext; -import io.openvidu.client.OpenViduException; -import io.openvidu.client.OpenViduException.Code; -import io.openvidu.server.OpenViduServer; import io.openvidu.server.core.Participant; import io.openvidu.server.core.SessionManager; -import io.openvidu.server.core.Token; -import io.openvidu.server.kurento.KurentoClientProvider; -import io.openvidu.server.kurento.KurentoClientSessionInfo; import io.openvidu.server.kurento.core.KurentoSessionEventsHandler; -import io.openvidu.server.kurento.core.KurentoSessionManager; +import io.openvidu.server.kurento.kms.KmsManager; /** * Tests for {@link RoomManager} when using mocked {@link KurentoClient} resources. * * @author Radu Tom Vlad */ @RunWith(PowerMockRunner.class) @PrepareForTest(fullyQualifiedNames = "org.kurento.*") @PowerMockIgnore( {"javax.management.*"}) public class RoomManagerTest { private static final String SDP_WEB_OFFER = "peer sdp web offer"; private static final String SDP_WEB_ANSWER = "endpoint sdp web answer"; private static final String SDP_WEB_SERVER_OFFER = "server sdp web offer"; private static final String SDP_WEB_PEER_ANSWER = "peer sdp web answer"; private static final String SDP_WEB_SERVER_UPDATED_OFFER = "server sdp updated web offer"; private static final String SDP_RTP_OFFER = "peer sdp rtp offer"; private static final String SDP_RTP_ANSWER = "endpoint sdp rtp answer"; // private static final String SDP_WEB_SERVER_OFFER = "server sdp offer"; // private static final String SDP_WEB_PEER_ANSWER = "peer sdp answer"; // private static final String SDP_WEB_SERVER_UPDATED_OFFER = // "server sdp updated offer"; private static final int USERS = 10; private static final int ROOMS = 3; private SessionManager manager; @Mock - private KurentoClientProvider kcProvider; + private KmsManager kcProvider; @Mock private KurentoSessionEventsHandler roomHandler; @Mock private KurentoClient kurentoClient; @Mock private ServerManager serverManager; @Captor private ArgumentCaptor> kurentoClientCaptor; @Mock private MediaPipeline pipeline; @Mock private WebRtcEndpoint endpoint; @Mock private PassThrough passThru; @Mock private RtpEndpoint rtpEndpoint; @Mock private WebRtcEndpoint.Builder webRtcBuilder; @Captor private ArgumentCaptor> webRtcCaptor; @Captor private ArgumentCaptor> webRtcConnectCaptor; @Captor private ArgumentCaptor> webRtcDisconnectCaptor; @Mock private PassThrough.Builder passThruBuilder; @Captor private ArgumentCaptor> passThruConnectCaptor; @Captor private ArgumentCaptor> passThruDisconnectCaptor; @Mock private RtpEndpoint.Builder rtpBuilder; @Captor private ArgumentCaptor> rtpCaptor; @Captor private ArgumentCaptor> rtpConnectCaptor; @Captor private ArgumentCaptor> rtpDisconnectCaptor; @Mock private Mixer mixer; @Mock private Mixer.Builder mixerBuilder; @Mock private HubPort hubPort; @Mock private HubPort.Builder hubPortBuilder; @Captor private ArgumentCaptor> hubPortConnectCaptor; @Captor private ArgumentCaptor hubPortConnectTypeCaptor; @Mock private FaceOverlayFilter.Builder faceFilterBuilder; @Mock private FaceOverlayFilter faceFilter; @Captor private ArgumentCaptor> faceFilterConnectCaptor; @Captor private ArgumentCaptor> iceEventCaptor; @Captor private ArgumentCaptor> mediaErrorEventCaptor; @Captor private ArgumentCaptor> pipelineErrorEventCaptor; @Rule public final ExpectedException exception = ExpectedException.none(); private String userx = "userx"; private String pidx = "pidx"; private String roomx = "roomx"; // usernames will be used as participantIds private String[] users = new String[USERS]; private String[] rooms = new String[ROOMS]; private Map usersParticipantIds = new HashMap(); private Map usersParticipants = new HashMap(); @Before public void setup() { /* ConfigurableApplicationContext app = OpenViduServer .start(new String[] { "--server.port=7777" }); manager = app.getBean(KurentoSessionManager.class); when(kcProvider.getKurentoClient(any(KurentoClientSessionInfo.class))) .thenReturn(kurentoClient); when(kurentoClient.getServerManager()).thenReturn(serverManager); when(serverManager.getName()).thenReturn("mocked-kurento-client"); // call onSuccess when creating the pipeline to use the mocked instance doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { kurentoClientCaptor.getValue().onSuccess(pipeline); return null; } }).when(kurentoClient).createMediaPipeline(kurentoClientCaptor.capture()); // call onSuccess when building the endpoint to use the mocked instance doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { webRtcCaptor.getValue().onSuccess(endpoint); return null; } }).when(webRtcBuilder).buildAsync(webRtcCaptor.capture()); // call onSuccess when building the RTP endpoint to use the mocked // instance doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { rtpCaptor.getValue().onSuccess(rtpEndpoint); return null; } }).when(rtpBuilder).buildAsync(rtpCaptor.capture()); // still using the sync version when(passThruBuilder.build()).thenReturn(passThru); try { // mock the constructor for the endpoint builder whenNew(WebRtcEndpoint.Builder.class).withArguments(pipeline).thenReturn(webRtcBuilder); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } try { // mock the constructor for the RTP endpoint builder whenNew(RtpEndpoint.Builder.class).withArguments(pipeline).thenReturn(rtpBuilder); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } try { // mock the constructor for the passThru builder whenNew(PassThrough.Builder.class).withArguments(pipeline).thenReturn(passThruBuilder); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } // mock the SDP answer when processing the offer on the endpoint when(endpoint.processOffer(SDP_WEB_OFFER)).thenReturn(SDP_WEB_ANSWER); // mock the SDP offer when generating it from the server endpoint when(endpoint.generateOffer()).thenReturn(SDP_WEB_SERVER_OFFER); // mock the SDP offer when generating it from the server endpoint when(endpoint.processAnswer(SDP_WEB_PEER_ANSWER)).thenReturn(SDP_WEB_SERVER_UPDATED_OFFER); // mock the SDP answer when processing the offer on the RTP endpoint when(rtpEndpoint.processOffer(SDP_RTP_OFFER)).thenReturn(SDP_RTP_ANSWER); // call onSuccess when connecting the WebRtc endpoint to any media // element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { webRtcConnectCaptor.getValue().onSuccess(null); return null; } }).when(endpoint).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // call onSuccess when disconnecting the WebRtc endpoint from any media // element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { webRtcDisconnectCaptor.getValue().onSuccess(null); return null; } }).when(endpoint).disconnect(any(MediaElement.class), webRtcDisconnectCaptor.capture()); // call onSuccess when connecting the RTP endpoint to any media // element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { rtpConnectCaptor.getValue().onSuccess(null); return null; } }).when(rtpEndpoint).connect(any(MediaElement.class), rtpConnectCaptor.capture()); // call onSuccess when disconnecting the RTP endpoint from any media // element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { rtpDisconnectCaptor.getValue().onSuccess(null); return null; } }).when(rtpEndpoint).disconnect(any(MediaElement.class), rtpDisconnectCaptor.capture()); // call onSuccess when connecting the PassThrough element to any media // element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { passThruConnectCaptor.getValue().onSuccess(null); return null; } }).when(passThru).connect(any(MediaElement.class), passThruConnectCaptor.capture()); // call onSuccess when disconnecting the PassThrough element from any // media // element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { passThruDisconnectCaptor.getValue().onSuccess(null); return null; } }).when(passThru).disconnect(any(MediaElement.class), passThruDisconnectCaptor.capture()); try { // mock the constructor for the mixer builder whenNew(Mixer.Builder.class).withArguments(pipeline).thenReturn(mixerBuilder); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } // using the sync version to build the mixer when(mixerBuilder.build()).thenReturn(mixer); try { // mock the constructor for the hubPort builder whenNew(HubPort.Builder.class).withArguments(mixer).thenReturn(hubPortBuilder); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } // using the sync version to build the hubPort when(hubPortBuilder.build()).thenReturn(hubPort); // call onSuccess when connecting the hubPort to any media element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { hubPortConnectCaptor.getValue().onSuccess(null); return null; } }).when(hubPort).connect(any(MediaElement.class), hubPortConnectCaptor.capture()); // call onSuccess when connecting the hubPort to any media element and // with a given media type doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { hubPortConnectCaptor.getValue().onSuccess(null); return null; } }).when(hubPort).connect(any(MediaElement.class), hubPortConnectTypeCaptor.capture(), hubPortConnectCaptor.capture()); try { // mock the constructor for the face filter builder whenNew(FaceOverlayFilter.Builder.class).withArguments(pipeline) .thenReturn(faceFilterBuilder); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } // using the sync version to build the face filter when(faceFilterBuilder.build()).thenReturn(faceFilter); // call onSuccess when connecting the face filter to any media element doAnswer(new Answer>() { @Override public Continuation answer(InvocationOnMock invocation) throws Throwable { faceFilterConnectCaptor.getValue().onSuccess(null); return null; } }).when(faceFilter).connect(any(MediaElement.class), faceFilterConnectCaptor.capture()); when(pipeline.getId()).thenReturn("mocked-pipeline"); when(endpoint.getId()).thenReturn("mocked-webrtc-endpoint"); when(rtpEndpoint.getId()).thenReturn("mocked-rtp-endpoint"); when(passThru.getId()).thenReturn("mocked-pass-through"); when(hubPort.getId()).thenReturn("mocked-hub-port"); when(faceFilter.getId()).thenReturn("mocked-faceoverlay-filter"); for (int i = 0; i < USERS; i++) { users[i] = "user" + i; usersParticipantIds.put(users[i], "pid" + i); usersParticipants.put(users[i], new Participant(users[i], users[i], new Token("token"), "clientMetadata")); } for (int i = 0; i < ROOMS; i++) { rooms[i] = "room" + i; }*/ } @After public void tearDown() { /* manager.close(); */ } @Test public void joinNewRoom() { /*assertThat(manager.getRooms(), not(hasItem(roomx))); assertTrue(userJoinRoom(roomx, userx, pidx, true).isEmpty()); assertThat(manager.getRooms(), hasItem(roomx)); assertThat(manager.getParticipants(roomx), hasItem(new UserParticipant(pidx, userx)));*/ } /*@Test public void rtpJoinNewRoom() { assertThat(manager.getRooms(), not(hasItem(roomx))); assertTrue(userJoinRoom(roomx, userx, pidx, true).isEmpty()); assertThat(manager.getRooms(), hasItem(roomx)); assertThat(manager.getParticipants(roomx), hasItem(new UserParticipant(pidx, userx))); } @Test public void joinRoomFail() { assertThat(manager.getSessions(), not(hasItem(roomx))); //exception.expect(OpenViduException.class); //exception.expectMessage(containsString("must be created before")); userJoinRoom(roomx, userx, pidx, false); assertThat(manager.getSessions(), (hasItem(roomx))); } @Test public void joinManyUsersOneRoom() { int count = 0; for (Entry userPid : usersParticipantIds.entrySet()) { String user = userPid.getKey(); String pid = userPid.getValue(); if (count == 0) { assertThat(manager.getRooms(), not(hasItem(roomx))); } else { assertThat(manager.getParticipants(roomx), not(hasItem(usersParticipants.get(user)))); } Set peers = userJoinRoom(roomx, user, pid, count == 0); if (count == 0) { assertTrue(peers.isEmpty()); assertThat(manager.getRooms(), hasItem(roomx)); } else { assertTrue(!peers.isEmpty()); } assertThat(manager.getParticipants(roomx), hasItem(usersParticipants.get(user))); count++; } } @Test public void joinManyWebUsersAndOneRTP() { joinManyUsersOneRoom(); assertFalse(userJoinRoom(roomx, userx, pidx, false, false).isEmpty()); assertThat(manager.getRooms(), hasItem(roomx)); assertThat(manager.getParticipants(roomx), hasItem(new UserParticipant(pidx, userx))); } @Test public void joinManyUsersManyRooms() { final Map usersRooms = new HashMap(); final Map> roomsUsers = new HashMap>(); for (int i = 0; i < users.length; i++) { String room = rooms[i % rooms.length]; usersRooms.put(users[i], room); if (!roomsUsers.containsKey(room)) { roomsUsers.put(room, new ArrayList()); } roomsUsers.get(room).add(users[i]); } for (final String room : roomsUsers.keySet()) { manager.createRoom(new KurentoClientSessionInfo() { @Override public String getRoomName() { return room; } }); } for (Entry userRoom : usersRooms.entrySet()) { String user = userRoom.getKey(); final String room = userRoom.getValue(); Set peers = manager.joinRoom(user, room, false, true, new KurentoClientSessionInfo() { @Override public String getRoomName() { return room; } }, usersParticipantIds.get(user)).existingParticipants; if (peers.isEmpty()) { assertEquals("Expected one peer in room " + room + ": " + user, 1, manager.getParticipants(room).size()); } } // verifies create media pipeline was called once for each new room verify(kurentoClient, times(roomsUsers.size())).createMediaPipeline( kurentoClientCaptor.capture()); } @Test public void leaveRoom() { joinManyUsersOneRoom(); assertTrue(!userJoinRoom(roomx, userx, pidx, false).isEmpty()); UserParticipant userxParticipant = new UserParticipant(pidx, userx); assertThat(manager.getParticipants(roomx), hasItem(userxParticipant)); Set remainingUsers = manager.leaveRoom(pidx); assertEquals(new HashSet(usersParticipants.values()), remainingUsers); assertEquals(manager.getParticipants(roomx), remainingUsers); assertThat(manager.getParticipants(roomx), not(hasItem(userxParticipant))); } @Test public void rtpLeaveRoom() { joinManyWebUsersAndOneRTP(); UserParticipant userxParticipant = new UserParticipant(pidx, userx); assertThat(manager.getParticipants(roomx), hasItem(userxParticipant)); Set remainingUsers = manager.leaveRoom(pidx); assertEquals(new HashSet(usersParticipants.values()), remainingUsers); assertEquals(manager.getParticipants(roomx), remainingUsers); assertThat(manager.getParticipants(roomx), not(hasItem(userxParticipant))); } @Test public void publisherLifecycle() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); manager.unpublishMedia(participantId0); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void rtpPublisherLifecycle() { joinManyWebUsersAndOneRTP(); assertEquals("SDP RTP answer doesn't match", SDP_RTP_ANSWER, manager.publishMedia(pidx, true, SDP_RTP_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); for (String pid : usersParticipantIds.values()) { assertEquals("SDP WEB answer (for the web peer) doesn't match", SDP_WEB_ANSWER, manager.subscribe(userx, SDP_WEB_OFFER, pid)); } assertThat(manager.getSubscribers(roomx).size(), is(users.length)); manager.unpublishMedia(pidx); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void invertedPublisherLifecycle() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP server offer doesn't match", SDP_WEB_SERVER_OFFER, manager.generatePublishOffer(participantId0)); assertThat(manager.getPublishers(roomx).size(), is(0)); assertEquals("SDP updated offer doesn't match", SDP_WEB_SERVER_UPDATED_OFFER, manager.publishMedia(participantId0, false, SDP_WEB_PEER_ANSWER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); manager.unpublishMedia(participantId0); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void publishAndLeave() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected without loopback, publisher's internal connection verify(endpoint, times(1)).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // no external connection until someone subscribes verify(passThru, never()).connect(any(MediaElement.class), passThruConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // connected without loopback, verify(endpoint, times(1)).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // using same endpoint, subscribers connections verify(passThru, times(users.length - 1)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); }*/ /** * Tests publishing (w/o loopback) when the SDP offer is generated on the server-side. * * @throws AdminException */ /*@Test public void invertedPublishAndLeave() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP server offer doesn't match", SDP_WEB_SERVER_OFFER, manager.generatePublishOffer(participantId0)); assertThat(manager.getPublishers(roomx).size(), is(0)); assertEquals("SDP updated offer doesn't match", SDP_WEB_SERVER_UPDATED_OFFER, manager.publishMedia(participantId0, false, SDP_WEB_PEER_ANSWER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected without loopback, no external connection until someone // subscribes verify(endpoint, times(1)).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); verify(passThru, never()).connect(any(MediaElement.class), passThruConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // connected without loopback, publisher's internal connection verify(endpoint, times(1)).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // using same endpoint, subscribers connections verify(passThru, times(users.length - 1)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void publishWithLoopbackError() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); doThrow( new OpenViduException(Code.MEDIA_WEBRTC_ENDPOINT_ERROR_CODE, "Loopback connection error test")) .when(passThru).connect(any(WebRtcEndpoint.class), Matchers.> any()); exception.expect(OpenViduException.class); exception.expectMessage(containsString("Loopback connection error test")); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, true)); assertThat(manager.getPublishers(roomx).size(), is(0)); assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void publishWithLoopback() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, true)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected with loopback, so the internal connection is performed // right away verify(endpoint).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); verify(passThru).connect(any(MediaElement.class), passThruConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // using same endpoint, subscribers connections + the internal one verify(passThru, times(users.length)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); }*/ /** * Tests publishing (w/ loopback) when the SDP offer is generated on the server-side. * * @throws AdminException */ /*@Test public void invertedPublishWithLoopback() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP server offer doesn't match", SDP_WEB_SERVER_OFFER, manager.generatePublishOffer(participantId0)); assertThat(manager.getPublishers(roomx).size(), is(0)); assertEquals("SDP updated offer doesn't match", SDP_WEB_SERVER_UPDATED_OFFER, manager.publishMedia(participantId0, false, SDP_WEB_PEER_ANSWER, true)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected with loopback, so the internal connection is performed // right away verify(endpoint).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); verify(passThru).connect(any(MediaElement.class), passThruConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // using same endpoint, subscribers connections + the internal one verify(passThru, times(users.length)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void publishWithAlternativeLoopbackSrc() { joinManyUsersOneRoom(); Mixer m = new Mixer.Builder(pipeline).build(); assertThat("Mixer returned by the builder is not the same as the mocked one", m, is(mixer)); HubPort hb = new HubPort.Builder(m).build(); assertThat("HubPort returned by the builder is not the same as the mocked one", hb, is(hubPort)); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, hb, null, true)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected with loopback, so the internal connection is performed // right away verify(endpoint).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // the loopback is not done using the passThru elem verify(passThru, never()).connect(any(MediaElement.class), passThruConnectCaptor.capture()); // the hubPort is connected to the webrtc endpoint verify(hubPort).connect(any(MediaElement.class), hubPortConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // using same endpoint, subscribers connections only verify(passThru, times(users.length - 1)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void publishWithAlternativeLoopbackSrcAudioType() { joinManyUsersOneRoom(); Mixer m = new Mixer.Builder(pipeline).build(); assertThat("Mixer returned by the builder is not the same as the mocked one", m, is(mixer)); HubPort hb = new HubPort.Builder(m).build(); assertThat("HubPort returned by the builder is not the same as the mocked one", hb, is(hubPort)); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, hb, MediaType.AUDIO, true)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected with loopback, so the internal connection is performed // right away verify(endpoint).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // the loopback is not done using the passThru elem verify(passThru, never()).connect(any(MediaElement.class), passThruConnectCaptor.capture()); // the hubPort is connected to the webrtc endpoint verify(hubPort).connect(any(MediaElement.class), hubPortConnectTypeCaptor.capture(), hubPortConnectCaptor.capture()); assertThat("Connection type is not audio", hubPortConnectTypeCaptor.getValue(), is(MediaType.AUDIO)); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // using same endpoint, subscribers connections only verify(passThru, times(users.length - 1)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void muteUnmutePublished() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected without loopback, publisher's internal connection verify(endpoint).connect(passThru, webRtcConnectCaptor.getValue()); // no external connection until someone subscribes verify(passThru, never()).connect(any(MediaElement.class), passThruConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // connected without loopback, verify(endpoint, times(1)).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // using same endpoint, subscribers connections verify(passThru, times(users.length - 1)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); manager.mutePublishedMedia(MutedMediaType.ALL, participantId0); // disconnects once from the PassThrough verify(endpoint).disconnect(passThru, webRtcDisconnectCaptor.getValue()); manager.unmutePublishedMedia(participantId0); // reconnects once to the PassThrough verify(endpoint).connect(passThru, webRtcConnectCaptor.getValue()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void muteUnmuteSubscribed() { joinManyUsersOneRoom(); String participantId0 = usersParticipantIds.get(users[0]); String participantId1 = usersParticipantIds.get(users[1]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); // connected without loopback, publisher's internal connection verify(endpoint).connect(passThru, webRtcConnectCaptor.getValue()); // no external connection until someone subscribes verify(passThru, never()).connect(any(MediaElement.class), passThruConnectCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // connected without loopback, verify(endpoint, times(1)).connect(any(MediaElement.class), webRtcConnectCaptor.capture()); // using same endpoint, subscribers connections verify(passThru, times(users.length - 1)).connect(any(MediaElement.class), passThruConnectCaptor.capture()); manager.muteSubscribedMedia(users[0], MutedMediaType.ALL, participantId1); // disconnects the PassThrough once from the subscriber's endpoint verify(passThru).disconnect(endpoint, passThruDisconnectCaptor.getValue()); manager.unmuteSubscribedMedia(users[0], participantId1); // reconnects once to the subscriber's endpoint verify(passThru).connect(endpoint, passThruConnectCaptor.getValue()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void addMediaFilterInParallel() throws InterruptedException, ExecutionException { joinManyUsersOneRoom(); final FaceOverlayFilter filter = new FaceOverlayFilter.Builder(pipeline).build(); assertNotNull("FaceOverlayFiler is null", filter); assertThat("Filter returned by the builder is not the same as the mocked one", filter, is(faceFilter)); final String participantId0 = usersParticipantIds.get(users[0]); ExecutorService threadPool = Executors.newFixedThreadPool(1); ExecutorCompletionService exec = new ExecutorCompletionService<>(threadPool); exec.submit(new Callable() { @Override public Void call() throws Exception { System.out.println("Starting execution of addMediaElement"); manager.addMediaElement(participantId0, filter); return null; } }); Thread.sleep(10); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); boolean firstSubscriber = true; for (String pid : usersParticipantIds.values()) { if (pid.equals(participantId0)) { continue; } assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); if (firstSubscriber) { firstSubscriber = false; try { exec.take().get(); System.out .println("Execution of addMediaElement ended (just after first peer subscribed)"); } finally { threadPool.shutdownNow(); } } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); verify(faceFilter, times(1)).connect(passThru, faceFilterConnectCaptor.getValue()); verify(endpoint, times(1)).connect(faceFilter, webRtcConnectCaptor.getValue()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void addMediaFilterBeforePublishing() throws InterruptedException, ExecutionException { joinManyUsersOneRoom(); final FaceOverlayFilter filter = new FaceOverlayFilter.Builder(pipeline).build(); assertNotNull("FaceOverlayFiler is null", filter); assertThat("Filter returned by the builder is not the same as the mocked one", filter, is(faceFilter)); final String participantId0 = usersParticipantIds.get(users[0]); System.out.println("Starting execution of addMediaElement"); manager.addMediaElement(participantId0, filter); System.out.println("Execution of addMediaElement ended"); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); verify(faceFilter, times(1)).connect(passThru, faceFilterConnectCaptor.getValue()); verify(endpoint, times(1)).connect(faceFilter, webRtcConnectCaptor.getValue()); Set remainingUsers = manager.leaveRoom(participantId0); Set roomParticipants = manager.getParticipants(roomx); assertEquals(roomParticipants, remainingUsers); assertThat(roomParticipants, not(hasItem(usersParticipants.get(users[0])))); assertThat(manager.getPublishers(roomx).size(), is(0)); // peers are automatically unsubscribed assertThat(manager.getSubscribers(roomx).size(), is(0)); } @Test public void iceCandidate() { joinManyUsersOneRoom(); final String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); // verifies listener is added to publisher verify(endpoint, times(1)).addOnIceCandidateListener(iceEventCaptor.capture()); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // verifies listener is added to each subscriber verify(endpoint, times(usersParticipantIds.size())).addOnIceCandidateListener( iceEventCaptor.capture()); final IceCandidate ic = new IceCandidate("1 candidate test", "audio", 1); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); assertThat(args.length, is(4)); // first arg : roomName assertThat(args[0], instanceOf(String.class)); assertEquals(roomx, args[0]); // second arg : participantId assertThat(args[1], instanceOf(String.class)); String participantId = (String) args[1]; assertThat(usersParticipantIds.values(), hasItem(participantId)); // not the publisher cus the captored event // is for one of the subscribers assertThat(participantId, is(not(participantId0))); // third arg : endpointName == publisher's userName assertThat(args[2], instanceOf(String.class)); String epn = (String) args[2]; assertEquals(users[0], epn); // fourth arg : iceCandidate assertThat(args[3], instanceOf(IceCandidate.class)); IceCandidate icParam = (IceCandidate) args[3]; assertEquals(ic, icParam); return null; } }).when(roomHandler).onIceCandidate(anyString(), anyString(), anyString(), Matchers.any(IceCandidate.class)); // triggers the last captured listener iceEventCaptor.getValue().onEvent( new OnIceCandidateEvent(endpoint, "12345", null, "candidate", ic)); // verifies the handler's method was called once (we only triggered the // event once) verify(roomHandler, times(1)).onIceCandidate(anyString(), anyString(), anyString(), Matchers.any(IceCandidate.class)); } @Test public void mediaError() { joinManyUsersOneRoom(); final String participantId0 = usersParticipantIds.get(users[0]); assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.publishMedia(participantId0, true, SDP_WEB_OFFER, false)); assertThat(manager.getPublishers(roomx).size(), is(1)); // verifies error listener is added to publisher verify(endpoint, times(1)).addErrorListener(mediaErrorEventCaptor.capture()); final String expectedErrorMessage = "TEST_ERR: Fake media error(errCode=101)"; doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); assertThat(args.length, is(3)); // first arg : roomName assertThat(args[0], instanceOf(String.class)); assertEquals(roomx, args[0]); // second arg : participantId assertThat(args[1], instanceOf(String.class)); String participantId = (String) args[1]; assertThat(usersParticipantIds.values(), hasItem(participantId)); // error on the publisher's endpoint assertThat(participantId, is(participantId0)); // third arg : error description assertThat(args[2], instanceOf(String.class)); assertEquals(expectedErrorMessage, args[2]); return null; } }).when(roomHandler).onMediaElementError(anyString(), anyString(), anyString()); // triggers the last captured listener mediaErrorEventCaptor.getValue().onEvent( new ErrorEvent(endpoint, "12345", null, "Fake media error", 101, "TEST_ERR")); for (String pid : usersParticipantIds.values()) { if (!pid.equals(participantId0)) { assertEquals("SDP answer doesn't match", SDP_WEB_ANSWER, manager.subscribe(users[0], SDP_WEB_OFFER, pid)); } } assertThat(manager.getSubscribers(roomx).size(), is(users.length - 1)); // verifies listener is added to each subscriber verify(endpoint, times(usersParticipantIds.size())).addErrorListener( mediaErrorEventCaptor.capture()); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); assertThat(args.length, is(3)); // first arg : roomName assertThat(args[0], instanceOf(String.class)); assertEquals(roomx, args[0]); // second arg : participantId assertThat(args[1], instanceOf(String.class)); String participantId = (String) args[1]; assertThat(usersParticipantIds.values(), hasItem(participantId)); // error on a subscriber's endpoint assertThat(participantId, is(not(participantId0))); // third arg : error description assertThat(args[2], instanceOf(String.class)); assertEquals(expectedErrorMessage, args[2]); return null; } }).when(roomHandler).onMediaElementError(anyString(), anyString(), anyString()); // triggers the last captured listener (once again) mediaErrorEventCaptor.getValue().onEvent( new ErrorEvent(endpoint, "12345", null, "Fake media error", 101, "TEST_ERR")); // verifies the handler's method was called twice verify(roomHandler, times(2)).onMediaElementError(anyString(), anyString(), anyString());; } @Test public void pipelineError() { joinManyUsersOneRoom(); // verifies pipeline error listener is added to room verify(pipeline, times(1)).addErrorListener(pipelineErrorEventCaptor.capture()); final String expectedErrorMessage = "TEST_PP_ERR: Fake pipeline error(errCode=505)"; doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); assertThat(args.length, is(3)); // first arg : roomName assertThat(args[0], instanceOf(String.class)); assertEquals(roomx, args[0]); // second arg : participantIds assertThat(args[1], instanceOf(Set.class)); Set pids = new HashSet(); for (Object o : (Set) args[1]) { assertThat(o, instanceOf(String.class)); pids.add((String) o); } assertThat( pids, CoreMatchers.hasItems(usersParticipantIds.values().toArray( new String[usersParticipantIds.size()]))); // third arg : error description assertThat(args[2], instanceOf(String.class)); assertEquals(expectedErrorMessage, args[2]); return null; } }).when(roomHandler).onPipelineError(anyString(), Matchers.> any(), anyString()); // triggers the last captured listener pipelineErrorEventCaptor.getValue().onEvent( new ErrorEvent(pipeline, "12345", null, "Fake pipeline error", 505, "TEST_PP_ERR")); // verifies the handler's method was called only once (one captor event) verify(roomHandler, times(1)).onPipelineError(anyString(), Matchers.> any(), anyString());; } private Set userJoinRoom(final String room, String user, String pid, boolean joinMustSucceed) { return userJoinRoom(room, user, pid, joinMustSucceed, true); } private Set userJoinRoom(final String room, String user, String pid, boolean joinMustSucceed) { KurentoClientSessionInfo kcsi = null; if (joinMustSucceed) { kcsi = new KurentoClientSessionInfo() { @Override public String getRoomName() { return room; } }; } Participant p = new Participant(user, user, new Token(user), user); manager.joinRoom(p, room, 1); Set existingPeers = this.manager.getParticipants(room); // verifies create media pipeline was called once verify(kurentoClient, times(0)).createMediaPipeline(kurentoClientCaptor.capture()); return existingPeers; }*/ }