diff --git a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java index 92964b48..a6f6bfc4 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java +++ b/openvidu-server/src/main/java/io/openvidu/server/OpenViduServer.java @@ -1,362 +1,373 @@ /* * (C) Copyright 2017-2020 OpenVidu (https://openvidu.io) + * (C) Copyright 2021 Christian Mollekopf (mollekopf@kolabsystems.com) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import org.bouncycastle.util.Arrays; import org.kurento.jsonrpc.internal.server.config.JsonRpcConfiguration; import org.kurento.jsonrpc.server.JsonRpcConfigurer; import org.kurento.jsonrpc.server.JsonRpcHandlerRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Import; import org.springframework.context.event.EventListener; import io.openvidu.server.cdr.CDRLogger; import io.openvidu.server.cdr.CDRLoggerFile; import io.openvidu.server.cdr.CallDetailRecord; import io.openvidu.server.config.HttpHandshakeInterceptor; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.config.OpenviduConfig.Error; import io.openvidu.server.core.SessionEventsHandler; import io.openvidu.server.core.SessionManager; import io.openvidu.server.core.TokenGenerator; import io.openvidu.server.coturn.CoturnCredentialsService; import io.openvidu.server.coturn.CoturnCredentialsServiceFactory; import io.openvidu.server.kurento.core.KurentoParticipantEndpointConfig; import io.openvidu.server.kurento.core.KurentoSessionEventsHandler; import io.openvidu.server.kurento.core.KurentoSessionManager; import io.openvidu.server.kurento.kms.DummyLoadManager; +import io.openvidu.server.kurento.kms.MaxWebRtcLoadManager; import io.openvidu.server.kurento.kms.FixedOneKmsManager; +import io.openvidu.server.kurento.kms.FixedNKmsManager; import io.openvidu.server.kurento.kms.KmsManager; import io.openvidu.server.kurento.kms.LoadManager; import io.openvidu.server.recording.DummyRecordingDownloader; import io.openvidu.server.recording.DummyRecordingUploader; import io.openvidu.server.recording.RecordingDownloader; import io.openvidu.server.recording.RecordingUploader; import io.openvidu.server.recording.service.RecordingManager; import io.openvidu.server.recording.service.RecordingManagerUtils; import io.openvidu.server.recording.service.RecordingManagerUtilsLocalStorage; import io.openvidu.server.rest.ApiRestPathRewriteFilter; import io.openvidu.server.rest.RequestMappings; import io.openvidu.server.rpc.RpcHandler; import io.openvidu.server.rpc.RpcNotificationService; import io.openvidu.server.utils.CommandExecutor; import io.openvidu.server.utils.GeoLocationByIp; import io.openvidu.server.utils.GeoLocationByIpDummy; import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.MediaNodeStatusManagerDummy; import io.openvidu.server.utils.QuarantineKiller; import io.openvidu.server.utils.QuarantineKillerDummy; import io.openvidu.server.webhook.CDRLoggerWebhook; /** * OpenVidu Server application * * @author Pablo Fuente (pablofuenteperez@gmail.com) */ @Import({ JsonRpcConfiguration.class }) @SpringBootApplication public class OpenViduServer implements JsonRpcConfigurer { private static final Logger log = LoggerFactory.getLogger(OpenViduServer.class); public static String wsUrl; public static String httpUrl; @Autowired OpenviduConfig config; @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public KmsManager kmsManager(OpenviduConfig openviduConfig) { if (openviduConfig.getKmsUris().isEmpty()) { throw new IllegalArgumentException("'KMS_URIS' should contain at least one KMS url"); } - String firstKmsWsUri = openviduConfig.getKmsUris().get(0); - log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); - return new FixedOneKmsManager(); + + List kmsUris = openviduConfig.getKmsUris(); + + if (kmsUris.size() == 1) { + String firstKmsWsUri = kmsUris.get(0); + log.info("OpenVidu Server using one KMS: {}", firstKmsWsUri); + return new FixedOneKmsManager(); + } + + log.info("OpenVidu Server using KMS: {}", kmsUris); + return new FixedNKmsManager(); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public CallDetailRecord cdr(OpenviduConfig openviduConfig) { List loggers = new ArrayList<>(); if (openviduConfig.isCdrEnabled()) { log.info("OpenVidu CDR service is enabled"); loggers.add(new CDRLoggerFile()); } else { log.info("OpenVidu CDR service is disabled (may be enable with 'OPENVIDU_CDR=true')"); } if (openviduConfig.isWebhookEnabled()) { log.info("OpenVidu Webhook service is enabled"); loggers.add(new CDRLoggerWebhook(openviduConfig)); } else { log.info("OpenVidu Webhook service is disabled (may be enabled with 'OPENVIDU_WEBHOOK=true')"); } return new CallDetailRecord(loggers); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public CoturnCredentialsService coturnCredentialsService(OpenviduConfig openviduConfig) { return new CoturnCredentialsServiceFactory().getCoturnCredentialsService(openviduConfig.getSpringProfile()); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public SessionManager sessionManager() { return new KurentoSessionManager(); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public RpcHandler rpcHandler() { return new RpcHandler(); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public SessionEventsHandler sessionEventsHandler() { return new KurentoSessionEventsHandler(); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public TokenGenerator tokenGenerator() { return new TokenGenerator(); } @Bean @ConditionalOnMissingBean @DependsOn("openviduConfig") public RecordingManager recordingManager() { return new RecordingManager(); } @Bean @ConditionalOnMissingBean public LoadManager loadManager() { - return new DummyLoadManager(); + return new MaxWebRtcLoadManager(10000); } @Bean @ConditionalOnMissingBean public RpcNotificationService notificationService() { return new RpcNotificationService(); } @Bean @ConditionalOnMissingBean public KurentoParticipantEndpointConfig kurentoEndpointConfig() { return new KurentoParticipantEndpointConfig(); } @Bean @ConditionalOnMissingBean @DependsOn({ "openviduConfig", "recordingManager" }) public RecordingManagerUtils recordingManagerUtils(OpenviduConfig openviduConfig, RecordingManager recordingManager) { return new RecordingManagerUtilsLocalStorage(openviduConfig, recordingManager); } @Bean @ConditionalOnMissingBean public RecordingUploader recordingUpload() { return new DummyRecordingUploader(); } @Bean @ConditionalOnMissingBean public RecordingDownloader recordingDownload() { return new DummyRecordingDownloader(); } @Bean @ConditionalOnMissingBean public GeoLocationByIp geoLocationByIp() { return new GeoLocationByIpDummy(); } @Bean @ConditionalOnMissingBean public QuarantineKiller quarantineKiller() { return new QuarantineKillerDummy(); } @Bean @ConditionalOnMissingBean public MediaNodeStatusManager mediaNodeStatusManager() { return new MediaNodeStatusManagerDummy(); } @Bean @ConditionalOnMissingBean @ConditionalOnProperty(name = "SUPPORT_DEPRECATED_API", havingValue = "true") public FilterRegistrationBean filterRegistrationBean() { FilterRegistrationBean registrationBean = new FilterRegistrationBean(); ApiRestPathRewriteFilter apiRestPathRewriteFilter = new ApiRestPathRewriteFilter(); registrationBean.setFilter(apiRestPathRewriteFilter); return registrationBean; } @Override public void registerJsonRpcHandlers(JsonRpcHandlerRegistry registry) { registry.addHandler(rpcHandler().withPingWatchdog(true).withInterceptors(new HttpHandshakeInterceptor()), RequestMappings.WS_RPC); } public static String getContainerIp() throws IOException, InterruptedException { return CommandExecutor.execCommand(5000, "/bin/sh", "-c", "hostname -i | awk '{print $1}'"); } public static void main(String[] args) throws Exception { Map CONFIG_PROPS = checkConfigProperties(OpenviduConfig.class); if (CONFIG_PROPS.get("SERVER_PORT") != null) { // Configuration property SERVER_PORT has been explicitly defined. // Must initialize the application in that port on the host regardless of what // HTTPS_PORT says. HTTPS_PORT does get used in the public URL. System.setProperty("server.port", CONFIG_PROPS.get("SERVER_PORT")); log.warn( "You have set property server.port (or SERVER_PORT). This will serve OpenVidu Server on your host at port " + CONFIG_PROPS.get("SERVER_PORT") + ". But property HTTPS_PORT (" + CONFIG_PROPS.get("HTTPS_PORT") + ") still configures the port that should be used to connect to OpenVidu Server from outside. " + "Bear this in mind when configuring a proxy in front of OpenVidu Server"); } else if (CONFIG_PROPS.get("HTTPS_PORT") != null) { // Configuration property SERVER_PORT has NOT been explicitly defined. // Must initialize the application in port HTTPS_PORT on the host. HTTPS_PORT // does get used in the public URL as well. System.setProperty("server.port", CONFIG_PROPS.get("HTTPS_PORT")); } log.info("Using /dev/urandom for secure random generation"); System.setProperty("java.security.egd", "file:/dev/./urandom"); SpringApplication.run(OpenViduServer.class, Arrays.append(args, "--spring.main.banner-mode=off")); } public static Map checkConfigProperties(Class configClass) throws InterruptedException { ConfigurableApplicationContext app = SpringApplication.run(configClass, new String[] { "--spring.main.web-application-type=none" }); OpenviduConfig config = app.getBean(OpenviduConfig.class); List errors = config.getConfigErrors(); if (!errors.isEmpty()) { // @formatter:off String msg = "\n\n\n" + " Configuration errors\n" + " --------------------\n" + "\n"; for (Error error : config.getConfigErrors()) { msg += " * "; if (error.getProperty() != null) { msg += "Property " + config.getPropertyName(error.getProperty()); if (error.getValue() == null || error.getValue().equals("")) { msg += " is not set. "; } else { msg += "=" + error.getValue() + ". "; } } msg += error.getMessage() + "\n"; } msg += "\n" + "\n" + " Fix config errors\n" + " ---------------\n" + "\n" + " 1) Return to shell pressing Ctrl+C\n" + " 2) Set correct values in '.env' configuration file\n" + " 3) Restart OpenVidu with:\n" + "\n" + " $ ./openvidu restart\n" + "\n"; // @formatter:on log.info(msg); // Wait forever new Semaphore(0).acquire(); } else { String msg = "\n\n\n" + " Configuration properties\n" + " ------------------------\n" + "\n"; final Map CONFIG_PROPS = config.getConfigProps(); List configPropNames = new ArrayList<>(config.getUserProperties()); Collections.sort(configPropNames); for (String property : configPropNames) { String value = CONFIG_PROPS.get(property); msg += " * " + config.getPropertyName(property) + "=" + (value == null ? "" : value) + "\n"; } msg += "\n\n"; log.info(msg); // Close the auxiliary ApplicationContext app.close(); return CONFIG_PROPS; } return null; } @EventListener(ApplicationReadyEvent.class) public void whenReady() { String dashboardUrl = httpUrl + config.getOpenViduFrontendDefaultPath().replaceAll("^/", ""); // @formatter:off String msg = "\n\n----------------------------------------------------\n" + "\n" + " OpenVidu is ready!\n" + " ---------------------------\n" + "\n" + " * OpenVidu Server URL: " + httpUrl + "\n" + "\n" + " * OpenVidu Dashboard: " + dashboardUrl + "\n" + "\n" + "----------------------------------------------------\n"; // @formatter:on log.info(msg); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java index 77a8f67a..6653cddb 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/DummyLoadManager.java @@ -1,27 +1,31 @@ /* * (C) Copyright 2017-2020 OpenVidu (https://openvidu.io) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; public class DummyLoadManager implements LoadManager { @Override public double calculateLoad(Kms kms) { return 1; } + @Override + public boolean allowMoreElements(Kms kms) { + return true; + } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedNKmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedNKmsManager.java new file mode 100644 index 00000000..f2cf3ce9 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/FixedNKmsManager.java @@ -0,0 +1,76 @@ +/* + * (C) Copyright 2015 Kurento (http://kurento.org/) + * (C) Copyright 2021 Christian Mollekopf (mollekopf@kolabsystems.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openvidu.server.kurento.kms; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.RandomStringUtils; +import org.kurento.client.KurentoClient; +import org.kurento.commons.exception.KurentoException; + +import io.openvidu.server.core.IdentifierPrefixes; + +public class FixedNKmsManager extends KmsManager { + + @Override + public List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception { + List kmss = new ArrayList<>(); + for (KmsProperties kmsProps : kmsProperties) { + KurentoClient kClient = null; + Kms kms = new Kms(kmsProps, loadManager); + try { + kClient = KurentoClient.create(kmsProps.getUri(), this.generateKurentoConnectionListener(kms.getId())); + this.addKms(kms); + kms.setKurentoClient(kClient); + + // TODO: This should be done in KurentoClient connected event + kms.setKurentoClientConnected(true); + kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); + + kmss.add(kms); + } catch (KurentoException e) { + log.error("KMS in {} is not reachable by OpenVidu Server", kmsProps.getUri()); + if (kClient != null) { + kClient.destroy(); + } + } + } + return kmss; + } + + @Override + @PostConstruct + protected void postConstructInitKurentoClients() { + try { + List kmsProps = new ArrayList<>(); + for (String kmsUri : this.openviduConfig.getKmsUris()) { + String kmsId = KmsManager.generateKmsId(); + kmsProps.add(new KmsProperties(kmsId, kmsUri)); + } + this.initializeKurentoClients(kmsProps, true); + } catch (Exception e) { + log.error("Shutting down OpenVidu Server"); + System.exit(1); + } + } + +} + diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java index d63ec9d9..21613d10 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/Kms.java @@ -1,222 +1,222 @@ /* * (C) Copyright 2017-2020 OpenVidu (https://openvidu.io) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.kurento.client.KurentoClient; import org.kurento.client.ModuleInfo; import org.kurento.client.ServerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.openvidu.server.kurento.core.KurentoSession; /** * Abstraction of a KMS instance: an object of this class corresponds to a KMS * process running somewhere. * * It is uniquely identified by the KMS ws URI endpoint. It encapsulates the * WebSocket client to communicate openvidu-server Java process with it and has * a specific LoadManager service to calculate the load of this KMS based on * different measures * * @author Pablo Fuente (pablofuenteperez@gmail.com) */ public class Kms { private static final Logger log = LoggerFactory.getLogger(Kms.class); private String id; // Dynamic ID private String uri; private String ip; private KurentoClient client; private LoadManager loadManager; private AtomicBoolean isKurentoClientConnected = new AtomicBoolean(false); private AtomicLong timeOfKurentoClientConnection = new AtomicLong(0); private AtomicLong timeOfKurentoClientDisconnection = new AtomicLong(0); private Map kurentoSessions = new ConcurrentHashMap<>(); private AtomicInteger activeRecordings = new AtomicInteger(0); public Kms(KmsProperties props, LoadManager loadManager) { this.id = props.getId(); this.uri = props.getUri(); String parsedUri = uri.replaceAll("^ws://", "http://").replaceAll("^wss://", "https://"); URL url = null; try { url = new URL(parsedUri); } catch (MalformedURLException e) { log.error(e.getMessage()); } this.ip = url.getHost(); this.loadManager = loadManager; } public void setKurentoClient(KurentoClient client) { this.client = client; } public String getId() { return id; } public String getUri() { return uri; } public String getIp() { return ip; } public KurentoClient getKurentoClient() { return this.client; } public double getLoad() { return loadManager.calculateLoad(this); } public boolean allowMoreElements() { - return true; // loadManager.allowMoreElements(this); + return loadManager.allowMoreElements(this); } public boolean isKurentoClientConnected() { return this.isKurentoClientConnected.get(); } public void setKurentoClientConnected(boolean isConnected) { this.isKurentoClientConnected.set(isConnected); } public long getTimeOfKurentoClientConnection() { return this.timeOfKurentoClientConnection.get(); } public void setTimeOfKurentoClientConnection(long time) { this.timeOfKurentoClientConnection.set(time); } public long getTimeOfKurentoClientDisconnection() { return this.timeOfKurentoClientDisconnection.get(); } public void setTimeOfKurentoClientDisconnection(long time) { this.timeOfKurentoClientDisconnection.set(time); } public Collection getKurentoSessions() { return this.kurentoSessions.values(); } public void addKurentoSession(KurentoSession session) { this.kurentoSessions.put(session.getSessionId(), session); } public void removeKurentoSession(String sessionId) { this.kurentoSessions.remove(sessionId); } public AtomicInteger getActiveRecordings() { return this.activeRecordings; } public JsonObject toJson() { JsonObject json = new JsonObject(); json.addProperty("id", this.id); json.addProperty("object", "mediaNode"); json.addProperty("ip", this.ip); json.addProperty("uri", this.uri); final boolean connected = this.isKurentoClientConnected(); json.addProperty("connected", connected); json.addProperty("connectionTime", this.getTimeOfKurentoClientConnection()); if (!connected) { json.addProperty("disconnectionTime", this.getTimeOfKurentoClientDisconnection()); } return json; } public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) { JsonObject json = this.toJson(); if (withSessions) { JsonArray sessions = new JsonArray(); for (KurentoSession session : this.kurentoSessions.values()) { sessions.add(session.toJson(false, false)); } json.add("sessions", sessions); } if (withExtraInfo) { if (json.get("connected").getAsBoolean()) { JsonObject kurentoExtraInfo = new JsonObject(); try { kurentoExtraInfo.addProperty("memory", this.client.getServerManager().getUsedMemory() / 1024); ServerInfo info = this.client.getServerManager().getInfo(); kurentoExtraInfo.addProperty("version", info.getVersion()); kurentoExtraInfo.addProperty("capabilities", info.getCapabilities().toString()); JsonArray modules = new JsonArray(); for (ModuleInfo moduleInfo : info.getModules()) { JsonObject moduleJson = new JsonObject(); moduleJson.addProperty("name", moduleInfo.getName()); moduleJson.addProperty("version", moduleInfo.getVersion()); moduleJson.addProperty("generationTime", moduleInfo.getGenerationTime()); JsonArray factories = new JsonArray(); moduleInfo.getFactories().forEach(fact -> factories.add(fact)); moduleJson.add("factories", factories); modules.add(moduleJson); } kurentoExtraInfo.add("modules", modules); json.add("kurentoInfo", kurentoExtraInfo); } catch (Exception e) { log.warn("KMS {} extra info was requested but there's no connection to it", this.id); } } } return json; } @Override public String toString() { return this.uri; } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java index c63c3ebf..cb8f7ab8 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/KmsManager.java @@ -1,379 +1,379 @@ /* * (C) Copyright 2017-2020 OpenVidu (https://openvidu.io) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.commons.lang3.RandomStringUtils; import org.kurento.client.KurentoConnectionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import com.google.gson.JsonObject; import io.openvidu.server.config.OpenviduConfig; import io.openvidu.server.core.IdentifierPrefixes; import io.openvidu.server.kurento.core.KurentoSession; import io.openvidu.server.utils.MediaNodeStatusManager; import io.openvidu.server.utils.UpdatableTimerTask; public abstract class KmsManager { protected static final Logger log = LoggerFactory.getLogger(KmsManager.class); public static final Lock selectAndRemoveKmsLock = new ReentrantLock(true); public static final int MAX_SECONDS_LOCK_WAIT = 15; private Map kmsReconnectionLocks = new ConcurrentHashMap<>(); private UpdatableTimerTask kurentoReconnectTimer; public class KmsLoad implements Comparable { private Kms kms; private double load; public KmsLoad(Kms kms, double load) { this.kms = kms; this.load = load; } public Kms getKms() { return kms; } public double getLoad() { return load; } @Override public int compareTo(KmsLoad o) { return Double.compare(this.load, o.load); } public JsonObject toJson() { JsonObject json = this.kms.toJson(); json.addProperty("load", this.load); return json; } public JsonObject toJsonExtended(boolean withSessions, boolean withExtraInfo) { JsonObject json = this.kms.toJsonExtended(withSessions, withExtraInfo); json.addProperty("load", this.load); return json; } } @Autowired protected OpenviduConfig openviduConfig; @Autowired protected LoadManager loadManager; @Autowired protected MediaNodeStatusManager mediaNodeStatusManager; final protected Map kmss = new ConcurrentHashMap<>(); public synchronized void addKms(Kms kms) { this.kmss.put(kms.getId(), kms); } public synchronized Kms removeKms(String kmsId) { return this.kmss.remove(kmsId); } public synchronized Kms getLessLoadedConnectedAndRunningKms() throws NoSuchElementException { List kmsLoads = getKmsLoads().stream().filter(kmsLoad -> kmsLoad.kms.isKurentoClientConnected() - && mediaNodeStatusManager.isRunning(kmsLoad.kms.getId())).collect(Collectors.toList()); + && mediaNodeStatusManager.isRunning(kmsLoad.kms.getId()) && kmsLoad.kms.allowMoreElements()).collect(Collectors.toList()); if (kmsLoads.isEmpty()) { throw new NoSuchElementException(); } else { return Collections.min(kmsLoads).kms; } } public synchronized boolean atLeastOneConnectedAndRunningKms() { Optional optional = this.kmss.values().stream() .filter(kms -> kms.isKurentoClientConnected() && mediaNodeStatusManager.isRunning(kms.getId())) .findFirst(); return optional.isPresent(); } public synchronized List getKmssSortedByLoad() { List kmsLoads = getKmsLoads(); Collections.sort(kmsLoads); return kmsLoads; } public Kms getKms(String kmsId) { return this.kmss.get(kmsId); } public KmsLoad getKmsLoad(String kmsId) { Kms kms = this.kmss.get(kmsId); return new KmsLoad(kms, kms.getLoad()); } public Collection getKmss() { return this.kmss.values(); } public boolean kmsWithUriExists(String kmsUri) { return this.kmss.values().stream().anyMatch(kms -> kms.getUri().equals(kmsUri)); } private List getKmsLoads() { ArrayList kmsLoads = new ArrayList<>(); for (Kms kms : kmss.values()) { double load = kms.getLoad(); kmsLoads.add(new KmsLoad(kms, load)); log.trace("Calc load {} for kms {}", load, kms.getUri()); } return kmsLoads; } protected KurentoConnectionListener generateKurentoConnectionListener(final String kmsId) { return new KurentoConnectionListener() { @Override public void reconnected(boolean sameServer) { final Kms kms = kmss.get(kmsId); log.info("Kurento Client \"reconnected\" event for KMS {} (sameServer: {}) [{}]", kms.getUri(), sameServer, kms.getKurentoClient().toString()); kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); boolean lockAcquired = false; try { if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { lockAcquired = true; if (kms.isKurentoClientConnected()) { // Timer task of disconnected event successfully executed log.warn( "Timer task already executed for reconnected Kurento Client [{}] to KMS with uri {}. Skipping event listener execution", kms.getKurentoClient().toString(), kms.getUri()); return; } kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); if (!sameServer) { // Different KMS. Reset sessions status (no Publisher or SUbscriber endpoints) log.warn("Kurento Client reconnected to a different KMS instance, with uri {}", kms.getUri()); log.warn("Updating all webrtc endpoints for active sessions"); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); kms.getKurentoSessions().forEach(kSession -> { kSession.restartStatusInKurento(timeOfKurentoDisconnection); }); } else { // Same KMS. We may infer that openvidu-server/KMS connection has been lost, but // not the clients/KMS connections log.warn("Kurento Client reconnected to same KMS {} with uri {}", kmsId, kms.getUri()); } kms.setTimeOfKurentoClientDisconnection(0); } } catch (InterruptedException e) { log.error("InterruptedException when waiting for lock on reconnected event of KMS with uri {}", kms.getUri()); } finally { if (lockAcquired) { kmsReconnectionLocks.get(kms.getId()).unlock(); } } } @Override public void disconnected() { final Kms kms = kmss.get(kmsId); kms.setKurentoClientConnected(false); kms.setTimeOfKurentoClientDisconnection(System.currentTimeMillis()); if (kms.getKurentoClient().isClosed()) { log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Closed explicitly", kms.getUri(), kms.getKurentoClient().toString()); return; } else { log.info("Kurento Client \"disconnected\" event for KMS {} [{}]. Waiting reconnection", kms.getUri(), kms.getKurentoClient().toString()); } // TODO: this is a fix for the lack of reconnected event kmsReconnectionLocks.putIfAbsent(kms.getId(), new ReentrantLock()); final AtomicInteger ITERATION = new AtomicInteger(0); kurentoReconnectTimer = new UpdatableTimerTask(() -> { boolean lockAcquired = false; try { if (kmsReconnectionLocks.get(kms.getId()).tryLock(5, TimeUnit.SECONDS)) { lockAcquired = true; if (kms.isKurentoClientConnected()) { // reconnected listener already executed log.info( "Timer of KMS with uri {} and KurentoClient [{}] cancelled (reconnected event received during interval wait)", kms.getUri(), kms.getKurentoClient().toString()); kurentoReconnectTimer.cancelTimer(); return; } if (kms.getKurentoClient().isClosed()) { log.info( "Timer of KMS with uri {} and KurentoClient [{}] has been closed. Cancelling Timer", kms.getUri(), kms.getKurentoClient().toString()); kurentoReconnectTimer.cancelTimer(); return; } kms.getKurentoClient().getServerManager().getInfo(); log.info("According to Timer KMS with uri {} and KurentoClient [{}] is now reconnected", kms.getUri(), kms.getKurentoClient().toString()); kurentoReconnectTimer.cancelTimer(); kms.setKurentoClientConnected(true); kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); final long timeOfKurentoDisconnection = kms.getTimeOfKurentoClientDisconnection(); if (kms.getKurentoSessions().isEmpty()) { log.info("There were no sessions in the KMS with uri {}. Nothing must be done", kms.getUri()); } else { if (isNewKms(kms)) { log.warn("KMS with URI {} is a new KMS process. Resetting {} sessions: {}", kms.getUri(), kms.getKurentoSessions().size(), kms.getKurentoSessions().stream().map(s -> s.getSessionId()) .collect(Collectors.joining(",", "[", "]"))); kms.getKurentoSessions().forEach(kSession -> { kSession.restartStatusInKurento(timeOfKurentoDisconnection); }); } else { log.info("KMS with URI {} is the same process. Nothing must be done", kms.getUri()); } } kms.setTimeOfKurentoClientDisconnection(0); } } catch (Exception e) { log.error( "According to Timer KMS with uri {} and KurentoClient [{}] is not reconnected yet. Exception {}", kms.getUri(), kms.getKurentoClient().toString(), e.getClass().getName()); } finally { if (lockAcquired) { kmsReconnectionLocks.get(kms.getId()).unlock(); } } }, () -> new Long(dynamicReconnectLoopSeconds(ITERATION.getAndIncrement()) * 1000)); kurentoReconnectTimer.updateTimer(); } @Override public void connectionFailed() { final Kms kms = kmss.get(kmsId); log.error("Kurento Client \"connectionFailed\" event for KMS {} [{}]", kms.getUri(), kms.getKurentoClient().toString()); kms.setKurentoClientConnected(false); } @Override public void connected() { final Kms kms = kmss.get(kmsId); log.info("Kurento Client \"connected\" event for KMS {} [{}]", kms.getUri(), kms.getKurentoClient().toString()); // TODO: This should be done here, not after KurentoClient#create method returns // kms.setKurentoClientConnected(true); // kms.setTimeOfKurentoClientConnection(System.currentTimeMillis()); } }; } private boolean isNewKms(Kms kms) { try { KurentoSession kSession = kms.getKurentoSessions().iterator().next(); kSession.getPipeline().getName(); return false; } catch (NoSuchElementException e) { return false; } catch (Exception e) { return true; } } private int dynamicReconnectLoopSeconds(int iteration) { // First 10 loops every second, next 20 loops ever 3s, the rest every 10s final int[][] intervals = { new int[] { 1, 10 }, new int[] { 3, 20 }, new int[] { 10, Integer.MAX_VALUE } }; int accumulatedIntervals = 0; for (int i = 0; i < intervals.length - 1; i++) { if ((accumulatedIntervals + intervals[i][1]) > iteration) { // Interval found for current iteration return intervals[i][0]; } else { // This iteration has already been surpassed accumulatedIntervals += intervals[i][1]; } } // Return last interval return intervals[intervals.length - 1][0]; } public abstract List initializeKurentoClients(List kmsProperties, boolean disconnectUponFailure) throws Exception; public LoadManager getLoadManager() { return this.loadManager; } @PostConstruct protected abstract void postConstructInitKurentoClients(); @PreDestroy public void close() { log.info("Closing all KurentoClients"); this.kmss.values().forEach(kms -> { kms.getKurentoClient().destroy(); }); if (kurentoReconnectTimer != null) { kurentoReconnectTimer.cancelTimer(); } } public static String generateKmsId() { return IdentifierPrefixes.KMS_ID + RandomStringUtils.randomAlphabetic(1).toUpperCase() + RandomStringUtils.randomAlphanumeric(7); } } diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java index 924298ec..adcfce28 100644 --- a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/LoadManager.java @@ -1,24 +1,25 @@ /* * (C) Copyright 2017-2020 OpenVidu (https://openvidu.io) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package io.openvidu.server.kurento.kms; public interface LoadManager { public double calculateLoad(Kms kms); -} \ No newline at end of file + public boolean allowMoreElements(Kms kms); +} diff --git a/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java new file mode 100644 index 00000000..5f256a19 --- /dev/null +++ b/openvidu-server/src/main/java/io/openvidu/server/kurento/kms/MaxWebRtcLoadManager.java @@ -0,0 +1,57 @@ +/* + * (C) Copyright 2017-2019 OpenVidu (https://openvidu.io/) + * (C) Copyright 2021 Christian Mollekopf (mollekopf@kolabsystems.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.openvidu.server.kurento.kms; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MaxWebRtcLoadManager implements LoadManager { + private static final Logger log = LoggerFactory.getLogger(MaxWebRtcLoadManager.class); + + private int maxWebRtcPerKms; + + public MaxWebRtcLoadManager(int maxWebRtcPerKms) { + this.maxWebRtcPerKms = maxWebRtcPerKms; + } + + @Override + public double calculateLoad(Kms kms) { + int numWebRtcs = countWebRtcEndpoints(kms); + if (numWebRtcs > maxWebRtcPerKms) { + return 1; + } else { + return numWebRtcs / (double) maxWebRtcPerKms; + } + } + + @Override + public boolean allowMoreElements(Kms kms) { + return countWebRtcEndpoints(kms) < maxWebRtcPerKms; + } + + private synchronized int countWebRtcEndpoints(Kms kms) { + try { + return kms.getKurentoClient().getServerManager().getPipelines().size(); + } catch (Throwable e) { + log.warn("Error counting KurentoClient pipelines", e); + return 0; + } + } +} +