diff --git a/bower.json b/bower.json index 0bdb160..ed708b0 100644 --- a/bower.json +++ b/bower.json @@ -1,31 +1,31 @@ { "name": "manticore", "version": "0.0.0", "dependencies": { "angular": "1.4.1", "json3": "3.3.1", "es5-shim": "4.1.7", "jquery": "1.11.0", "bootstrap": "~3.1.1", "angular-resource": "1.4.1", "angular-cookies": "1.4.1", "angular-sanitize": "1.4.1", "angular-bootstrap": "0.13.0", "font-awesome": ">=4.1.0", "lodash": "3.9.3", "angular-ui-router": "0.2.15", "angular-file-upload": "1.1.5", - "wodo": "http://webodf.org/download/wodocollabtexteditor-0.5.8.preview1.zip", + "wodo": "http://webodf.org/download/wodocollabtexteditor-0.5.8.zip", "angular-file-saver": "~0.0.4", "angular-smart-table": "~2.1.0", "angular-moment": "~0.10.1", "angular-load": "~0.2.0" }, "devDependencies": { "angular-mocks": "1.4.1", "angular-scenario": "1.4.1" }, "resolutions": { "angular": "1.4.1" } } diff --git a/client/components/wodo/adaptor.service.js b/client/components/wodo/adaptor.service.js index af73880..1ac97d6 100644 --- a/client/components/wodo/adaptor.service.js +++ b/client/components/wodo/adaptor.service.js @@ -1,285 +1,285 @@ /*jslint unparam: true*/ /*global runtime, core, ops, io*/ 'use strict'; angular.module('manticoreApp') .factory('Adaptor', function () { var OperationRouter = function (socket, odfContainer, errorCb) { var EVENT_BEFORESAVETOFILE = 'beforeSaveToFile', EVENT_SAVEDTOFILE = 'savedToFile', EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED = 'hasLocalUnsyncedOperationsChanged', EVENT_HASSESSIONHOSTCONNECTIONCHANGED = 'hasSessionHostConnectionChanged', EVENT_MEMBERADDED = 'memberAdded', EVENT_MEMBERCHANGED = 'memberChanged', EVENT_MEMBERREMOVED = 'memberRemoved', eventNotifier = new core.EventNotifier([ EVENT_BEFORESAVETOFILE, EVENT_SAVEDTOFILE, EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED, EVENT_HASSESSIONHOSTCONNECTIONCHANGED, EVENT_MEMBERADDED, EVENT_MEMBERCHANGED, EVENT_MEMBERREMOVED, ops.OperationRouter.signalProcessingBatchStart, ops.OperationRouter.signalProcessingBatchEnd ]), operationFactory, playbackFunction, lastServerSyncHeadId = 0, sendClientOpspecsLock = false, sendClientOpspecsTask, hasSessionHostConnection = true, unplayedServerOpSpecQueue = [], unsyncedClientOpSpecQueue = [], operationTransformer = new ops.OperationTransformer(), /**@const*/sendClientOpspecsDelay = 300; function playbackOpspecs(opspecs) { var op, i; if (!opspecs.length) { return; } eventNotifier.emit(ops.OperationRouter.signalProcessingBatchStart, {}); for (i = 0; i < opspecs.length; i += 1) { op = operationFactory.create(opspecs[i]); if (op !== null) { if (!playbackFunction(op)) { eventNotifier.emit(ops.OperationRouter.signalProcessingBatchEnd, {}); errorCb('opExecutionFailure'); return; } } else { eventNotifier.emit(ops.OperationRouter.signalProcessingBatchEnd, {}); errorCb('Unknown opspec: ' + runtime.toJson(opspecs[i])); return; } } eventNotifier.emit(ops.OperationRouter.signalProcessingBatchEnd, {}); } function handleNewServerOpsWithUnsyncedClientOps(serverOps) { var transformResult = operationTransformer.transform(unsyncedClientOpSpecQueue, serverOps); if (!transformResult) { errorCb('Has unresolvable conflict.'); return false; } unsyncedClientOpSpecQueue = transformResult.opSpecsA; unplayedServerOpSpecQueue = unplayedServerOpSpecQueue.concat(transformResult.opSpecsB); return true; } function handleNewClientOpsWithUnplayedServerOps(clientOps) { var transformResult = operationTransformer.transform(clientOps, unplayedServerOpSpecQueue); if (!transformResult) { errorCb('Has unresolvable conflict.'); return false; } unsyncedClientOpSpecQueue = unsyncedClientOpSpecQueue.concat(transformResult.opSpecsA); unplayedServerOpSpecQueue = transformResult.opSpecsB; return true; } function receiveServerOpspecs(headId, serverOpspecs) { if (unsyncedClientOpSpecQueue.length > 0) { handleNewServerOpsWithUnsyncedClientOps(serverOpspecs); // could happen that ops from server make client ops obsolete if (unsyncedClientOpSpecQueue.length === 0) { eventNotifier.emit(EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED, false); } } else { // apply directly playbackOpspecs(serverOpspecs); } lastServerSyncHeadId = headId; } function sendClientOpspecs() { var originalUnsyncedLength = unsyncedClientOpSpecQueue.length; if (originalUnsyncedLength) { sendClientOpspecsLock = true; socket.emit('commit_ops', { head: lastServerSyncHeadId, ops: unsyncedClientOpSpecQueue }, function (response) { if (response.conflict === true) { sendClientOpspecs(); } else { lastServerSyncHeadId = response.head; // on success no other server ops should have sneaked in meanwhile, so no need to check // got no other client ops meanwhile? if (unsyncedClientOpSpecQueue.length === originalUnsyncedLength) { unsyncedClientOpSpecQueue.length = 0; // finally apply all server ops collected while waiting for sync playbackOpspecs(unplayedServerOpSpecQueue); unplayedServerOpSpecQueue.length = 0; eventNotifier.emit(EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED, false); sendClientOpspecsLock = false; } else { // send off the new client ops directly unsyncedClientOpSpecQueue.splice(0, originalUnsyncedLength); sendClientOpspecs(); } } }); } } this.setOperationFactory = function (f) { operationFactory = f; }; this.setPlaybackFunction = function (f) { playbackFunction = f; }; this.push = function (operations) { var clientOpspecs = [], now = Date.now(), hasLocalUnsyncedOpsBefore = (unsyncedClientOpSpecQueue.length !== 0), hasLocalUnsyncedOpsNow; operations.forEach(function(op) { var opspec = op.spec(); opspec.timestamp = now; clientOpspecs.push(opspec); }); playbackOpspecs(clientOpspecs); if (unplayedServerOpSpecQueue.length > 0) { handleNewClientOpsWithUnplayedServerOps(clientOpspecs); } else { unsyncedClientOpSpecQueue = unsyncedClientOpSpecQueue.concat(clientOpspecs); } hasLocalUnsyncedOpsNow = (unsyncedClientOpSpecQueue.length !== 0); if (hasLocalUnsyncedOpsNow !== hasLocalUnsyncedOpsBefore) { eventNotifier.emit(EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED, hasLocalUnsyncedOpsNow); } sendClientOpspecsTask.trigger(); }; this.requestReplay = function (cb) { var cbOnce = function () { eventNotifier.unsubscribe(ops.OperationRouter.signalProcessingBatchEnd, cbOnce); cb(); }; // hack: relies on at least addmember op being added for ourselves and being executed eventNotifier.subscribe(ops.OperationRouter.signalProcessingBatchEnd, cbOnce); socket.emit('replay', {}); }; this.close = function (cb) { cb(); }; this.subscribe = function (eventId, cb) { eventNotifier.subscribe(eventId, cb); }; this.unsubscribe = function (eventId, cb) { eventNotifier.unsubscribe(eventId, cb); }; this.hasLocalUnsyncedOps = function () { return unsyncedClientOpSpecQueue.length !== 0; }; this.hasSessionHostConnection = function () { return hasSessionHostConnection; }; function init() { sendClientOpspecsTask = core.Task.createTimeoutTask(function () { if (!sendClientOpspecsLock) { sendClientOpspecs(); } }, sendClientOpspecsDelay); socket.on('replay', function (data) { receiveServerOpspecs(data.head, data.ops); socket.on('new_ops', function (data) { receiveServerOpspecs(data.head, data.ops); }); }); } init(); }; var ClientAdaptor = function (documentId, authToken, connectedCb, kickedCb, disconnectedCb) { var self = this, memberId, - documentUrl, + genesisUrl, socket; this.getMemberId = function () { return memberId; }; this.getGenesisUrl = function () { - return documentUrl; + return genesisUrl; }; this.createOperationRouter = function (odfContainer, errorCb) { runtime.assert(Boolean(memberId), 'You must be connected to a session before creating an operation router'); return new OperationRouter(socket, odfContainer, errorCb); }; this.joinSession = function (cb) { socket.on('join_success', function handleJoinSuccess(data) { socket.removeListener('join_success', handleJoinSuccess); memberId = data.memberId; - documentUrl = '/api/documents/snapshot/' + data.snapshotId; + genesisUrl = data.genesisUrl; cb(memberId); }); socket.emit('join', { documentId: documentId }); }; this.leaveSession = function (cb) { socket.emit('leave', {}, cb); socket.removeAllListeners(); }; this.getSocket = function () { return socket; }; this.destroy = function () { socket.disconnect(); }; function init() { socket = io({ query: 'token=' + authToken, forceNew: true }); socket.on('connect', connectedCb); socket.on('kick', kickedCb); socket.on('disconnect', disconnectedCb); } init(); }; return ClientAdaptor; }); diff --git a/package.json b/package.json index 548ca78..f0bb764 100644 --- a/package.json +++ b/package.json @@ -1,95 +1,96 @@ { "name": "manticore", "version": "0.0.0", "main": "server/app.js", "dependencies": { "express": "~4.0.0", "morgan": "~1.0.0", "body-parser": "~1.5.0", "method-override": "~1.0.0", "serve-favicon": "~2.0.1", "cookie-parser": "~1.0.1", "express-session": "~1.0.2", "errorhandler": "~1.0.0", "compression": "~1.0.1", "lodash": "~2.4.1", "jade": "~1.2.0", "mongoose": "~3.8.8", "gridfs-stream": "1.1.1", "jsonwebtoken": "^0.3.0", "express-jwt": "^0.1.3", "passport": "~0.2.0", "passport-local": "~0.1.6", "composable-middleware": "^0.3.0", "connect-mongo": "^0.4.1", "multer": "0.1.8", "socket.io": "1.3.5", "socketio-jwt": "4.2.0", - "async": "~1.3.0" + "async": "~1.3.0", + "phantom": "~0.7.2" }, "devDependencies": { "grunt": "~0.4.4", "grunt-autoprefixer": "~0.7.2", "grunt-wiredep": "~1.8.0", "grunt-concurrent": "~0.5.0", "grunt-contrib-clean": "~0.5.0", "grunt-contrib-concat": "~0.4.0", "grunt-contrib-copy": "~0.5.0", "grunt-contrib-cssmin": "~0.9.0", "grunt-contrib-htmlmin": "~0.2.0", "grunt-contrib-imagemin": "~0.7.1", "grunt-contrib-jshint": "~0.10.0", "grunt-contrib-uglify": "~0.4.0", "grunt-contrib-watch": "~0.6.1", "grunt-contrib-jade": "^0.11.0", "grunt-google-cdn": "~0.4.0", "grunt-newer": "~0.7.0", "grunt-ng-annotate": "^0.2.3", "grunt-filerev": "~2.2.0", "grunt-svgmin": "~0.4.0", "grunt-usemin": "~3.0.0", "grunt-env": "~0.4.1", "grunt-node-inspector": "~0.1.5", "grunt-nodemon": "~0.2.0", "grunt-angular-templates": "^0.5.4", "grunt-dom-munger": "^3.4.0", "grunt-protractor-runner": "^1.1.0", "grunt-asset-injector": "^0.1.0", "grunt-karma": "~0.8.2", "grunt-build-control": "DaftMonk/grunt-build-control", "grunt-mocha-test": "~0.10.2", "grunt-contrib-stylus": "latest", "jit-grunt": "^0.5.0", "time-grunt": "~0.3.1", "grunt-express-server": "~0.4.17", "grunt-open": "~0.2.3", "open": "~0.0.4", "jshint-stylish": "~0.1.5", "connect-livereload": "~0.4.0", "karma-ng-scenario": "~0.1.0", "karma-firefox-launcher": "~0.1.3", "karma-script-launcher": "~0.1.0", "karma-html2js-preprocessor": "~0.1.0", "karma-ng-jade2js-preprocessor": "^0.1.2", "karma-jasmine": "~0.1.5", "karma-chrome-launcher": "~0.1.3", "requirejs": "~2.1.11", "karma-requirejs": "~0.2.1", "karma-coffee-preprocessor": "~0.2.1", "karma-jade-preprocessor": "0.0.11", "karma-phantomjs-launcher": "~0.1.4", "karma": "~0.12.9", "karma-ng-html2js-preprocessor": "~0.1.0", "supertest": "~0.11.0", "should": "~3.3.1" }, "engines": { "node": ">=0.10.0" }, "scripts": { "start": "node server/app.js", "test": "grunt test", "update-webdriver": "node node_modules/grunt-protractor-runner/node_modules/protractor/bin/webdriver-manager update" }, "private": true } diff --git a/server/app.js b/server/app.js index e47b49e..3900244 100644 --- a/server/app.js +++ b/server/app.js @@ -1,74 +1,74 @@ /** * Main application file */ 'use strict'; // Set default node environment to development process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var express = require('express'); var mongoose = require('mongoose'); var config = require('./config/environment'); var Adaptor = require('./components/adaptor'); var ObjectCache = require('./components/objectcache'); // Connect to database mongoose.connect(config.mongo.uri, config.mongo.options); // Populate DB with sample data if(config.seedDB) { require('./config/seed'); } // Setup server var app = express(); var server = require('http').createServer(app); var socketio = require('socket.io')(server, { path: '/socket.io' }); var adaptor; var objectCache; var sockets = []; require('./config/socketio')(socketio); require('./config/express')(app); require('./routes')(app); server.on('connection', function (socket) { sockets.push(socket); socket.on('close', function () { sockets.splice(sockets.indexOf(socket), 1); }); }); // Start server server.listen(config.port, config.ip, function () { console.log('Express server listening on %d, in %s mode', config.port, app.get('env')); objectCache = new ObjectCache(); - adaptor = new Adaptor(socketio, objectCache); + adaptor = new Adaptor(app, socketio, objectCache); }); function destroy() { adaptor.destroy(function () { console.log('All realtime clients disconnected.'); objectCache.destroy(function () { console.log('All objects persisted to DB.'); sockets.forEach(function (socket) { socket.destroy(); }); server.close(function () { console.log('HTTP server shut down.'); mongoose.disconnect(function () { console.log('DB connection closed.'); console.log('Everything successfully shut down. Bye!'); process.exit(); }); }); }); }); } process.on("SIGTERM", destroy); process.on("SIGINT", destroy); // Expose app exports = module.exports = app; diff --git a/server/components/adaptor/index.js b/server/components/adaptor/index.js index 88a0866..ed38ffd 100644 --- a/server/components/adaptor/index.js +++ b/server/components/adaptor/index.js @@ -1,84 +1,84 @@ /* * Copyright (C) 2015 KO GmbH * * @licstart * This file is part of Kotype. * * Kotype is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License (GNU AGPL) * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * Kotype is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Kotype. If not, see . * @licend * * @source: https://github.com/kogmbh/Kotype/ */ "use strict"; /*jslint nomen: true, unparam: true */ /*global require, console, setInterval, module */ var async = require("async"), Room = require("./room"), Document = require("../../api/document/document.model").Document, User = require("../../api/user/user.model"); // Maintains an in-memory cache of users, documents, and sessions. // And writes/reads them from the DB on demand. -var ServerAdaptor = function (socketServer, objectCache) { +var ServerAdaptor = function (app, socketServer, objectCache) { var rooms = {}; function addToRoom(documentId, socket) { var room = rooms[documentId]; if (!room) { Document.findById(documentId, function (err, doc) { if (err) { return console.log(err); } if (!doc) { return console.log("documentId unknown:"+documentId); } - room = new Room(doc, objectCache, function () { + room = new Room(app, doc, objectCache, function () { rooms[documentId] = room; room.attachSocket(socket); }); }); } else { room.attachSocket(socket); } } this.destroy = function (callback) { async.each(Object.keys(rooms), function (documentId, cb) { rooms[documentId].destroy(cb); }, function () { rooms = {}; callback() }); }; function init() { socketServer.on("connection", function (socket) { User.findById(socket.decoded_token._id, function (err, user) { if (err) { return console.log(err); } socket.user = user; socket.on("join", function (data) { var documentId = data.documentId; if (documentId) { console.log("Authorized user " + user.name + " for document " + documentId); addToRoom(documentId, socket); } else { console.log("Error: Client did not specify a document ID"); } }); }); }); } init(); }; module.exports = ServerAdaptor; diff --git a/server/components/adaptor/odf.html b/server/components/adaptor/odf.html new file mode 100644 index 0000000..a90e102 --- /dev/null +++ b/server/components/adaptor/odf.html @@ -0,0 +1,27 @@ + + + + + + + + + + + +
+ + diff --git a/server/components/adaptor/recorder.js b/server/components/adaptor/recorder.js new file mode 100644 index 0000000..f077214 --- /dev/null +++ b/server/components/adaptor/recorder.js @@ -0,0 +1,135 @@ +var phantom = require('phantom'); +var fs = require('fs'); +var EventEmitter = require('events').EventEmitter; + +var Recorder = function (lastChunk, cb) { + var self = this, + baseSnapshotUrl = 'http://localhost:9000/api/documents/snapshot/' + lastChunk.fileId, + emitter = new EventEmitter(), + snapshotReadyCb = function () {}, + ph, + page; + + function phantomCallback(data) { + function documentLoaded(data) { + cb(); + } + switch(data.event) { + case 'log': console.log('PhantomJS Log :', data.message); break; + case 'documentLoaded': documentLoaded(data); break; + + default: emitter.emit(data.event, data); + } + } + + /* jshint ignore:start */ + function loadDocument(url, operations) { + console.log = function (message) { + window.callPhantom({event: 'log', message: message}); + }; + + var odfElement = document.getElementById('odf'); + document.odfCanvas = new odf.OdfCanvas(odfElement); + document.odfCanvas.addListener('statereadychange', function () { + document.session = new ops.Session(document.odfCanvas); + document.odtDocument = document.session.getOdtDocument(); + + var operationRouter = new ops.OperationRouter(); + operationRouter.push = function (opspecs) { + var op, i; + if (!opspecs.length) { return; } + for (i = 0; i < opspecs.length; i++) { + op = document.session.getOperationFactory().create(opspecs[i]); + if (op && op.execute(document.session.getOdtDocument())) { + // console.log('Just executed op ' + opspecs[i].optype + 'by ' + opspecs[i].memberid); + } else { + break; + } + } + }; + document.session.setOperationRouter(operationRouter); + document.session.enqueue(operations); + window.callPhantom({event: 'documentLoaded'}); + }); + document.odfCanvas.load(url); + } + /* jshint ignore:end */ + + this.push = function (operations, cb) { + page.evaluate(function (opspecs) { + document.session.enqueue(opspecs); + }, function () { cb(); }, operations); + }; + + + this.getSnapshot = function (cb) { + var eventId = Date.now(); + emitter.once('snapshotReady' + eventId, function (data) { + cb(data); + }); + + page.evaluate(function (eventId) { + var doc = document.odtDocument, + ops = []; + + doc.getMemberIds().forEach(function (memberId) { + ops.push({ + optype: 'AddMember', + timestamp: Date.now(), + memberid: memberId, + setProperties: doc.getMember(memberId).getProperties() + }); + var cursor = doc.getCursor(memberId); + if (cursor) { + var selection = doc.getCursorSelection(memberId); + ops.push({ + optype: 'AddCursor', + timestamp: Date.now(), + memberid: memberId + }); + ops.push({ + optype: 'MoveCursor', + timestamp: Date.now(), + memberid: memberId, + position: selection.position, + length: selection.length, + selectionType: cursor.getSelectionType() + }); + } + }); + + document.odfCanvas.odfContainer().createByteArray(function (data) { + window.callPhantom({ + event: 'snapshotReady' + eventId, + operations: ops, + data: data + }); + }); + }, function () {}, eventId); + }; + + this.destroy = function (cb) { + ph.exit(); + cb(); + }; + + + function init() { + phantom.create('--web-security=no', function (ph) { + ph.createPage(function (p) { + p.open('file://' + __dirname + '/odf.html', function (status) { + if (status === 'success') { + page = p; + page.set('onCallback', phantomCallback); + page.evaluate(loadDocument, function (){}, baseSnapshotUrl, lastChunk.operations); + } else { + return cb(new Error('Could not initialize recorder module.')); + } + }); + }); + }); + } + init(); +}; + +module.exports = Recorder; diff --git a/server/components/adaptor/room.js b/server/components/adaptor/room.js index 47fdfe5..719339a 100644 --- a/server/components/adaptor/room.js +++ b/server/components/adaptor/room.js @@ -1,370 +1,401 @@ /* * Copyright (C) 2015 KO GmbH * * @licstart * This file is part of Kotype. * * Kotype is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License (GNU AGPL) * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * Kotype is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Kotype. If not, see . * @licend * * @source: https://github.com/kogmbh/Kotype/ */ var async = require("async"); var _ = require("lodash"); var RColor = require('../colors'); var DocumentChunk = require("../../api/document/document.model").DocumentChunk; +var Recorder = require('./recorder'); -var Room = function (doc, objectCache, cb) { +var Room = function (app, doc, objectCache, cb) { var document, chunk, + recorder, hasCursor = {}, sockets = [], userColorMap = {}, randomColor = new RColor(), serverSeq = 0; function trackTitle(ops) { var newTitle, i; for (i = 0; i < ops.length; i += 1) { if (ops[i].optype === "UpdateMetadata" && ops[i].setProperties["dc:title"] !== undefined) { newTitle = ops[i].setProperties["dc:title"]; } } if (newTitle !== undefined) { if (newTitle.length === 0) { newTitle = "Untitled Document"; } } if (newTitle) { document.title = newTitle; } } function trackEditors() { // TODO: rather track by ops, to decouple from socket implementation sockets.forEach(function (socket) { var _id = socket.user._id; if (document.editors.indexOf(_id) === -1) { document.editors.push(_id); } }); } function trackCursors(ops) { var i; for (i = 0; i < ops.length; i += 1) { if (ops[i].optype === "AddCursor") { hasCursor[ops[i].memberid] = true; } if (ops[i].optype === "RemoveCursor") { hasCursor[ops[i].memberid] = false; } } } function sanitizeDocument() { var ops = chunk.operations, unbalancedCursors = {}, unbalancedMembers = {}, lastAccessDate = document.date, newOps = [], i; for (i = 0; i < ops.length; i += 1) { if (ops[i].optype === "AddCursor") { unbalancedCursors[ops[i].memberid] = true; } else if (ops[i].optype === "RemoveCursor") { unbalancedCursors[ops[i].memberid] = false; } else if (ops[i].optype === "AddMember") { unbalancedMembers[ops[i].memberid] = true; } else if (ops[i].optype === "RemoveMember") { unbalancedMembers[ops[i].memberid] = false; } } Object.keys(unbalancedCursors).forEach(function (memberId) { if (unbalancedCursors[memberId]) { newOps.push({ optype: "RemoveCursor", memberid: memberId, timestamp: lastAccessDate }); } }); Object.keys(unbalancedMembers).forEach(function (memberId) { if (unbalancedMembers[memberId]) { newOps.push({ optype: "RemoveMember", memberid: memberId, timestamp: lastAccessDate }); } }); if (newOps.length) { // Update op stack chunk.operations = chunk.operations.concat(newOps); serverSeq = chunk.operations.length; } } function broadcastMessage(message, data) { sockets.forEach(function (peerSocket) { peerSocket.emit(message, data) }); } function sendOpsToMember(socket, ops) { socket.emit("new_ops", { head: serverSeq, ops: ops }); } - function replayOpsToMember(socket) { + function setupMemberSnapshot(socket, operations, genesisSeq) { socket.emit("replay", { head: serverSeq, - ops: chunk.operations + ops: operations.concat(getOpsAfter(genesisSeq)) }); } function broadcastOpsByMember(socket, ops) { if (!ops.length) { return; } sockets.forEach(function (peerSocket) { - if (peerSocket !== socket) { + if (peerSocket.memberId !== socket.memberId) { sendOpsToMember(peerSocket, ops); } }); } function writeOpsToDocument(ops, cb) { if (!ops.length) { cb(); } - trackTitle(ops); - trackEditors(); + recorder.push(ops, function () { + trackTitle(ops); + trackEditors(); - // Update op stack - chunk.operations = chunk.operations.concat(ops); - serverSeq = chunk.operations.length; + // Update op stack + chunk.operations = chunk.operations.concat(ops); + serverSeq = chunk.operations.length; - // Update modified date - document.date = new Date(); + // Update modified date + document.date = new Date(); - cb(); + cb(); + }); } function addMember(user, cb) { var memberId, op, timestamp = Date.now(), color = userColorMap[user._id]; memberId = user.name + "_" + timestamp.toString(); // Let user colors persist in a Room even after they've // left and joined. if (!color) { userColorMap[user._id] = color = randomColor.get(true, 0.7); } op = { optype: "AddMember", memberid: memberId, timestamp: timestamp, setProperties: { fullName: user.name, color: color } }; writeOpsToDocument([op], function () { cb(memberId, [op]); }); } function removeMember(memberId, cb) { var ops = [], timestamp = Date.now(); if (hasCursor[memberId]) { ops.push({ optype: "RemoveCursor", memberid: memberId, timestamp: timestamp }); } ops.push({ optype: "RemoveMember", memberid: memberId, timestamp: timestamp }); writeOpsToDocument(ops, function () { cb(ops); }); } function getOpsAfter(basedOn) { return chunk.operations.slice(basedOn, serverSeq); } this.socketCount = function () { return sockets.length; }; this.attachSocket = function (socket) { // Add the socket to the room and give the // client it's unique memberId addMember(socket.user, function (memberId, ops) { socket.memberId = memberId; sockets.push(socket); - broadcastOpsByMember(socket, ops); - socket.emit("join_success", { - memberId: memberId, - snapshotId: chunk.fileId - }); - // Service replay requests - socket.on("replay", function () { - replayOpsToMember(socket); - }); - // Store, analyze, and broadcast incoming commits - socket.on("commit_ops", function (data, cb) { - var clientSeq = data.head, - ops = data.ops; - if (clientSeq === serverSeq) { - writeOpsToDocument(ops, function () { + // Generate genesis URL with the latest document version's snapshot + // We use a two-time URL because WebODF makes two identical GET requests (!) + var genesisUrl = '/genesis/' + Date.now().toString(), + genesisSeq = serverSeq, + usages = 0; + + recorder.getSnapshot(function (snapshot) { + app.get(genesisUrl, function (req, res) { + usages++; + res.set('Content-Type', 'application/vnd.oasis.opendocument.text'); + res.attachment(document.title); + res.send(new Buffer(snapshot.data)); + var routes = app._router.stack; + if (usages === 2) { + for (var i = 0; i < routes.length; i++) { + if (routes[i].path === genesisUrl) { + routes.splice(i, 1); + break; + } + } + } + }); + + socket.emit("join_success", { + memberId: memberId, + genesisUrl: genesisUrl + }); + + // Service replay requests + socket.on("replay", function () { + setupMemberSnapshot(socket, snapshot.operations, genesisSeq); + }); + + // Store, analyze, and broadcast incoming commits + socket.on("commit_ops", function (data, cb) { + var clientSeq = data.head, + ops = data.ops; + if (clientSeq === serverSeq) { + writeOpsToDocument(ops, function () { + cb({ + conflict: false, + head: serverSeq + }); + trackCursors(ops); + broadcastOpsByMember(socket, data.ops); + }); + } else { cb({ - conflict: false, - head: serverSeq + conflict: true }); - trackCursors(ops); - broadcastOpsByMember(socket, data.ops); - }); - } else { + } + }); + + // Service various requests + socket.on("access_get", function (data, cb) { cb({ - conflict: true + access: document.isPublic ? "public" : "normal" }); - } - }); - - // Service various requests - socket.on("access_get", function (data, cb) { - cb({ - access: document.isPublic ? "public" : "normal" }); - }); - if (socket.user.identity !== "guest") { - socket.on("access_change", function (data) { - document.isPublic = data.access === "public"; - broadcastMessage("access_changed", { - access: data.access === "public" ? "public" : "normal" - }); - if (data.access !== "public") { - sockets.forEach(function (peerSocket) { - if (peerSocket.user.identity === "guest") { - console.log(peerSocket.user.name); - removeSocket(peerSocket); - } + if (socket.user.identity !== "guest") { + socket.on("access_change", function (data) { + document.isPublic = data.access === "public"; + broadcastMessage("access_changed", { + access: data.access === "public" ? "public" : "normal" }); - } - }); - } + if (data.access !== "public") { + sockets.forEach(function (peerSocket) { + if (peerSocket.user.identity === "guest") { + console.log(peerSocket.user.name); + removeSocket(peerSocket); + } + }); + } + }); + } + }); // Handle dropout events socket.on("leave", function () { removeSocket(socket); }); socket.on("disconnect", function () { removeSocket(socket); }); }); }; function detachSocket(socket, callback) { removeMember(socket.memberId, function (ops) { broadcastOpsByMember(socket, ops); socket.removeAllListeners(); function lastCB() { socket.removeAllListeners(); if (callback) { callback(); } } // If a socket that is already connected is being // removed, this means that this is a deliberate // kicking-out, and not a natural event that could // result in a reconnection later. Therefore, clean // up. if (socket.connected) { console.log(socket.user.name + " is connected, removing"); socket.on('disconnect', lastCB); socket.emit("kick"); socket.emit("disconnect"); } else { console.log(socket.user.name + " is not connected, removing"); lastCB(); } }); } function removeSocket(socket) { var index = sockets.indexOf(socket); detachSocket(socket); if (index !== -1) { sockets.splice(index, 1); } } this.getDocument = function () { return document; }; this.destroy = function (callback) { async.each(sockets, function (socket, cb) { detachSocket(socket, cb); }, function () { sockets.length = 0; - callback(); + recorder.destroy(callback); }); }; function init() { // Setup caching document = objectCache.getTrackedObject(doc); DocumentChunk.findById(_.last(document.chunks), function (err, lastChunk) { chunk = objectCache.getTrackedObject(lastChunk); // Sanitize leftovers from previous session, if any sanitizeDocument(); - cb(); + recorder = new Recorder(chunk, function () { + cb(); + }); }); } init(); }; module.exports = Room; diff --git a/server/routes.js b/server/routes.js index f5cb747..4290697 100644 --- a/server/routes.js +++ b/server/routes.js @@ -1,27 +1,28 @@ /** * Main application routes */ 'use strict'; var errors = require('./components/errors'); module.exports = function(app) { // Insert routes below app.use('/api/templates', require('./api/template')); app.use('/api/documents', require('./api/document')); app.use('/api/users', require('./api/user')); app.use('/auth', require('./auth')); // All undefined asset or api routes should return a 404 app.route('/:url(api|auth|components|app|bower_components|assets)/*') .get(errors[404]); - // All other routes should redirect to the index.html - app.route('/*') + // All other routes (except dynamically-added genesis routes) + // should redirect to the index.html + app.route(/^\/(?!genesis).*/) .get(function(req, res) { res.sendfile(app.get('appPath') + '/index.html'); }); };