diff --git a/server/api/document/document.controller.js b/server/api/document/document.controller.js index 817e54f..2250232 100644 --- a/server/api/document/document.controller.js +++ b/server/api/document/document.controller.js @@ -1,171 +1,112 @@ /** * Using Rails-like standard naming convention for endpoints. * GET /documents -> index * POST /documents -> create * GET /documents/:id -> show * PUT /documents/:id -> update * DELETE /documents/:id -> destroy */ 'use strict'; var _ = require('lodash'); var mongoose = require('mongoose'); var Grid = require('gridfs-stream'); var multer = require('multer'); +var storage = require('./storage'); + var DocumentChunk = require('./document.model').DocumentChunk; var Document = require('./document.model').Document; var Template = require('../template/template.model'); var gfs = Grid(mongoose.connection.db, mongoose.mongo); // Get list of documents -exports.index = function(req, res) { - Document.find().populate('creator').exec(function (err, documents) { - if(err) { return handleError(res, err); } - return res.json(200, documents); - }); -}; +exports.index = storage.index; // Get a single document -exports.show = function(req, res) { - Document.findById(req.params.id, function (err, document) { - if(err) { return handleError(res, err); } - if(!document) { return res.send(404); } - return res.json(document); - }); -}; +exports.show = storage.show; -exports.upload = function (req, res, next) { - multer({ - upload: null, - limits: { - fileSize: 1024 * 1024 * 20, // 20 Megabytes - files: 5 - }, - onFileUploadStart: function (file) { - var chunkId = new mongoose.Types.ObjectId(), - fileId = new mongoose.Types.ObjectId(); - - var firstChunk = new DocumentChunk({ - _id: chunkId, - fileId: fileId - }); - var newDocument = new Document({ - title: file.originalname, - creator: req.user._id, - chunks: [chunkId] - }); - this.upload = gfs.createWriteStream({ - _id: fileId, - filename: file.originalname, - mode: 'w', - chunkSize: 1024 * 4, - content_type: file.mimetype, - root: 'fs' - }); - this.upload.on('finish', function () { - firstChunk.save(function (err) { - if (!err) { - newDocument.save(); - } - }); - }); - }, - onFileUploadData: function (file, data) { - this.upload.write(data); - }, - onFileUploadComplete: function (file) { - this.upload.end(); - } - })(req, res, next); -}; +// Middleware for handling file uploads +exports.upload = storage.upload; exports.acknowledgeUpload = function (req, res) { return res.send(200); }; exports.showSnapshot = function(req, res) { var snapshotId = req.params.id; gfs.findOne({_id: snapshotId}, function (err, file) { if (err) { return handleError(res, err); } if (!file) { return res.send(404); } var download = gfs.createReadStream({ _id: snapshotId }); download.on('error', function (err) { return handleError(res, err); }); res.set('Content-Type', file.contentType); res.attachment(file.filename) download.pipe(res); }); }; -// Creates a new document in the DB. -exports.create = function(req, res) { - Document.create(req.body, function(err, document) { - if(err) { return handleError(res, err); } - return res.json(201, document); - }); -}; - exports.createFromTemplate = function (req, res) { Template.findById(req.params.id, function (err, template) { if (err) { return handleError(res, err); } if (!template) { return res.send(404); } var chunkId = new mongoose.Types.ObjectId(); var firstChunk = new DocumentChunk({ _id: chunkId, fileId: template.fileId }); var newDocument = new Document({ title: template.title, creator: req.user._id, chunks: [chunkId] }); firstChunk.save(function (err) { if (!err) { newDocument.save(function (err) { return res.json(201, newDocument); }); } }) }); }; // Updates an existing document in the DB. exports.update = function(req, res) { if(req.body._id) { delete req.body._id; } Document.findById(req.params.id, function (err, document) { if (err) { return handleError(res, err); } if(!document) { return res.send(404); } var updated = _.merge(document, req.body); updated.save(function (err) { if (err) { return handleError(res, err); } return res.json(200, document); }); }); }; // Deletes a document from the DB. exports.destroy = function(req, res) { Document.findById(req.params.id, function (err, document) { if(err) { return handleError(res, err); } if(!document) { return res.send(404); } document.remove(function(err) { if(err) { return handleError(res, err); } return res.send(204); }); }); }; function handleError(res, err) { return res.send(500, err); } diff --git a/server/api/document/document.model.js b/server/api/document/document.model.js index 7c21ee8..73014cd 100644 --- a/server/api/document/document.model.js +++ b/server/api/document/document.model.js @@ -1,27 +1,32 @@ 'use strict'; var mongoose = require('mongoose'), Schema = mongoose.Schema; +var storageTypes = ['webdav']; + /* * Each Document Chunk has an associated ODF snapshot file within * GridFS of the same ID. */ var DocumentChunk = new Schema({ operations: { type: Array, default: [] }, fileId: { type: Schema.Types.ObjectId } }); var DocumentSchema = new Schema({ title: String, created: { type: Date, default: Date.now, required: true }, date: { type: Date, default: Date.now }, creator: { type: Schema.Types.ObjectId, ref: 'User' }, editors: { type: [{type: Schema.Types.ObjectId, ref: 'User'}], default: [] }, - chunks: { type: [{type: Schema.Types.ObjectId, ref: 'DocumentChunk'}], required: true } + chunks: { type: [{type: Schema.Types.ObjectId, ref: 'DocumentChunk'}], default: [] }, + live: Boolean, + provider: String, + webdav: {} }); module.exports = { DocumentChunk: mongoose.model('DocumentChunk', DocumentChunk), Document: mongoose.model('Document', DocumentSchema) }; diff --git a/server/api/document/storage/index.js b/server/api/document/storage/index.js new file mode 100644 index 0000000..81b3033 --- /dev/null +++ b/server/api/document/storage/index.js @@ -0,0 +1,4 @@ +'use strict'; + +var config = require('../../../config/environment'); +module.exports = require('./' + config.storage.type); diff --git a/server/api/document/storage/local/index.js b/server/api/document/storage/local/index.js new file mode 100644 index 0000000..4bd2e8b --- /dev/null +++ b/server/api/document/storage/local/index.js @@ -0,0 +1,81 @@ +'use strict'; + +var multer = require('multer'); +var mongoose = require('mongoose'); +var Grid = require('gridfs-stream'); + +var Document = require('../../document.model').Document; +var DocumentChunk = require('../../document.model').DocumentChunk; + +var gfs = Grid(mongoose.connection.db, mongoose.mongo); + +exports.index = function (req, res) { + var userId = req.user._id; + + Document.find({ + '$or': [ + { 'creator': userId }, + { 'editors': { '$in': [userId] } } + ] + }).populate('creator').exec(function (err, documents) { + if(err) { return handleError(res, err); } + return res.json(200, documents); + }); +}; + +exports.show = function(req, res) { + Document.findById(req.params.id, function (err, document) { + if(err) { return handleError(res, err); } + if(!document) { return res.send(404); } + return res.json(document); + }); +}; + +exports.upload = function (req, res, next) { + multer({ + upload: null, + limits: { + fileSize: 1024 * 1024 * 20, // 20 Megabytes + files: 5 + }, + onFileUploadStart: function (file) { + var chunkId = new mongoose.Types.ObjectId(), + fileId = new mongoose.Types.ObjectId(); + + var firstChunk = new DocumentChunk({ + _id: chunkId, + fileId: fileId + }); + var newDocument = new Document({ + title: file.originalname, + creator: req.user._id, + chunks: [chunkId] + }); + this.upload = gfs.createWriteStream({ + _id: fileId, + filename: file.originalname, + mode: 'w', + chunkSize: 1024 * 4, + content_type: file.mimetype, + root: 'fs' + }); + this.upload.on('finish', function () { + firstChunk.save(function (err) { + if (!err) { + newDocument.save(); + } + }); + }); + }, + onFileUploadData: function (file, data) { + this.upload.write(data); + }, + onFileUploadComplete: function (file) { + this.upload.end(); + } + })(req, res, next); +}; + +function handleError(res, err) { + return res.send(500, err); +} diff --git a/server/api/document/storage/webdav/index.js b/server/api/document/storage/webdav/index.js new file mode 100644 index 0000000..04e12cf --- /dev/null +++ b/server/api/document/storage/webdav/index.js @@ -0,0 +1,290 @@ +'use strict'; + +var _ = require('lodash'); +var dav = require('dav'); +var async = require('async'); +var mongoose = require('mongoose'); +var https = require('https'); +var url = require('url'); +var Grid = require('gridfs-stream'); +var multer = require('multer'); +var request = require('request'); + +var config = require('../../../../config/environment'); +var Document = require('../../document.model').Document; +var DocumentChunk = require('../../document.model').DocumentChunk; + +var gfs = Grid(mongoose.connection.db, mongoose.mongo); + +function makeDavClient (user) { + return new dav.Client( + new dav.transport.Basic(new dav.Credentials({ + username: user.webdav.username, + password: user.webdav.password + })), + { + baseUrl: config.storage.server + } + ); +} + +function saveToGridFS(user, href, fileId, cb) { + var request = https.request({ + method: 'GET', + hostname: url.parse(config.storage.server).hostname, + path: href, + auth: user.webdav.username + ':' + user.webdav.password + }, function (response) { + var file = gfs.createWriteStream({ + _id: fileId, + filename: href.split('/').pop(), + mode: 'w', + chunkSize: 1024 * 4, + content_type: 'application/vnd.oasis.opendocument.text', + root: 'fs' + }); + file.on('finish', cb); + response.pipe(file); + }).on('error', function (err) { + cb(err); + }); + + request.end(); +} + +// The WebDAV server is always authoritative. +function synchronizeUserFilesToDB(user, webdavDocuments, objectCache, cb) { + Document.find({'creator': user._id }) + .populate('creator', 'name email').exec(function (err, dbDocuments) { + // Since we're doing things that could possibly invalidate DB documents, + // we have to work with the live cache + var trackedDocuments = dbDocuments.map(objectCache.getTrackedObject); + var persistenceQueue = []; + var finalDocuments = []; + + trackedDocuments.forEach(function (trackedDoc) { + finalDocuments.push(trackedDoc); + + var sameHrefDoc = _.find(webdavDocuments, function (doc) { + return doc.href === trackedDoc.webdav.href; + }); + var sameEtagDoc = _.find(webdavDocuments, function (doc) { + return doc.props.getetag === trackedDoc.webdav.etag; + }); + + if (!sameEtagDoc) { + if (!sameHrefDoc) { + // Document has been effectively deleted, take it offline and remove + trackedDoc.live = false; + persistenceQueue.push(function (cb) { trackedDoc.remove(cb); }); + finalDocuments.pop(); + } else { + // Document has been modified, take it offline, throw away local changes, and update etag + trackedDoc.live = false; + trackedDoc.chunks.length = 0; + trackedDoc.markModified('chunks'); + trackedDoc.webdav.etag = sameHrefDoc.props.getetag; + trackedDoc.date = Date.parse(sameHrefDoc.props.getlastmodified); + trackedDoc.markModified('webdav'); + persistenceQueue.push(function (cb) { trackedDoc.save(cb); }); + } + } else { + if (!sameHrefDoc) { + // Document has been moved, just update href and save without interrupting anything + trackedDoc.webdav.href = sameEtagDoc.href; + trackedDoc.markModified('webdav'); + persistenceQueue.push(function (cb) { trackedDoc.save(cb); }); + } else { + if (sameHrefDoc.props.getetag === sameEtagDoc.props.getetag) { + // nothing changed + } else { + // Document moved to sameEtagDoc.props.href and there is a new doc at sameHrefDoc.href + trackedDoc.webdav.href = sameEtagDoc.href; + trackedDoc.markModified('webdav'); + persistenceQueue.push(function (cb) { trackedDoc.save(cb); }); + } + } + } + }); + + // Once modified documents are persisted, purge non-live documents from the cache + async.parallel(persistenceQueue, function (err) { + if (err) return cb(err); + + trackedDocuments.forEach(function (trackedDoc) { + if (!trackedDoc.live) { + objectCache.forgetTrackedObject(trackedDoc); + } + }); + + return cb(null, finalDocuments); + }); + }); +} + +exports.index = function (req, res) { + var davClient = makeDavClient(req.user); + + // Retrieve a list of files and filter by mimetype + davClient.send(dav.request.propfind({ + depth: 'infinity', + props: [ + { name: 'getcontenttype', namespace: dav.ns.DAV }, + { name: 'getetag', namespace: dav.ns.DAV }, + { name: 'getlastmodified', namespace: dav.ns.DAV } + ] + }), config.storage.path) + .then( + function success(response) { + var webdavDocuments = _.filter(response, function (item) { + return item.props.getcontenttype === 'application/vnd.oasis.opendocument.text'; + }); + synchronizeUserFilesToDB(req.user, webdavDocuments, req.app.get('objectCache'), function (err, updatedDocuments) { + if (err) return handleError(res, err); + // Transform the webdavDocuments array into something more usable + webdavDocuments = webdavDocuments.map(function (doc) { + return { + _id: new Buffer(doc.href).toString('base64'), + title: doc.href.split('/').pop(), + creator: req.user.profile, + date: Date.parse(doc.props.getlastmodified), + webdav: { + href: doc.href, + etag: doc.props.getetag + } + }; + }); + // Merge the two document types, such that hrefs are unique in the resulting list. DB docs have priority here + var mergedDocuments = _.uniq(updatedDocuments.concat(webdavDocuments), function (doc) { + return (doc.webdav && doc.webdav.href) || doc.href; + }); + return res.json(200, mergedDocuments); + }); + }, + function failure(error) { + handleError(res, error); + }); +}; + +function createFirstChunk(user, href, cb) { + var chunkId = new mongoose.Types.ObjectId(), + fileId = new mongoose.Types.ObjectId(); + + var firstChunk = new DocumentChunk({ + _id: chunkId, + fileId: fileId + }); + + saveToGridFS(user, href, fileId, function(err) { + if (err) { return cb(err); } + firstChunk.save(function (err) { + cb(err, firstChunk); + }); + }); +} +exports.show = function(req, res) { + if (mongoose.Types.ObjectId.isValid(req.params.id)) { + Document.findById(req.params.id, function (err, document) { + if(err) { return handleError(res, err); } + if(!document) { return res.send(404); } + if (document.chunks.length) { return res.json(200, document); } + // If the document had been invalidated, it has no chunks, so generate one + createFirstChunk(req.user, document.webdav.href, function (err, firstChunk) { + if (err) { return handleError(res, err); } + document.chunks.push(firstChunk); + document.save(function () { + res.json(200, document); + }) + }); + }); + } else { + var href = new Buffer(req.params.id, 'base64').toString('ascii'); + Document.findOne({ + 'webdav.href': href, + creator: req.user._id, + }, function (err, document) { + if (err) { return handleError(res, err); } + if (document) { + return res.json(200, document); + } else { + var davClient = makeDavClient(req.user); + davClient.send(dav.request.propfind({ props: [] }), href) + .then( + function success(response) { + var webdavDoc = response[0]; + + createFirstChunk(req.user, href, function(err, firstChunk) { + if (err) { return handleError(res, err); } + Document.create({ + title: href.split('/').pop(), + creator: req.user._id, + created: Date.parse(webdavDoc.props.getlastmodified), + date: Date.parse(webdavDoc.props.getlastmodified), + chunks: [firstChunk._id], + webdav: { + href: href, + etag: webdavDoc.props.getetag + } + }, function (err, document) { + if (err) { return handleError(res, err); } + res.json(201, document); + }); + }) + }, + function failure(error) { + res.send(404, error); + }); + } + }); + } +}; + +function saveToWebDAV(user, href, readStream, cb) { + +} +/* +exports.upload = function (req, res, next) { + multer({ + upload: null, + limits: { + fileSize: 1024 * 1024 * 20, // 20 Megabytes + files: 5 + }, + onFileUploadStart: function (file) { + var request = request.post({ + hostname: url.parse(config.storage.server).hostname, + url: config.storage.server + '/' + config.storage.path, + auth: req.user.webdav.username + ':' + req.user.webdav.password, + headers: { + 'Content-Type': file.mimetype + } + }); + + this.upload = gfs.createWriteStream({ + _id: fileId, + filename: file.originalname, + mode: 'w', + chunkSize: 1024 * 4, + content_type: file.mimetype, + root: 'fs' + }); + this.upload.on('finish', function () { + firstChunk.save(function (err) { + if (!err) { + newDocument.save(); + } + }); + }); + }, + onFileUploadData: function (file, data) { + this.upload.write(data); + }, + onFileUploadComplete: function (file) { + this.upload.end(); + } + })(req, res, next); +}; +*/ +function handleError(res, err) { + return res.send(500, err); +} diff --git a/server/api/user/user.model.js b/server/api/user/user.model.js index 2a96013..48a06a0 100644 --- a/server/api/user/user.model.js +++ b/server/api/user/user.model.js @@ -1,145 +1,151 @@ 'use strict'; var mongoose = require('mongoose'); var Schema = mongoose.Schema; var crypto = require('crypto'); var authTypes = ['webdav']; +var WebDAVSchema = new Schema({ + username: String, + password: String +}); + var UserSchema = new Schema({ name: String, email: { type: String, lowercase: true }, role: { type: String, default: 'user' }, hashedPassword: String, provider: String, + webdav: {}, salt: String }); /** * Virtuals */ UserSchema .virtual('password') .set(function(password) { this._password = password; this.salt = this.makeSalt(); this.hashedPassword = this.encryptPassword(password); }) .get(function() { return this._password; }); // Public profile information UserSchema .virtual('profile') .get(function() { return { 'name': this.name, 'role': this.role }; }); // Non-sensitive info we'll be putting in the token UserSchema .virtual('token') .get(function() { return { '_id': this._id, 'role': this.role }; }); /** * Validations */ // Validate empty email UserSchema .path('email') .validate(function(email) { return email.length; }, 'Email cannot be blank'); // Validate empty password UserSchema .path('hashedPassword') .validate(function(hashedPassword) { if (authTypes.indexOf(this.provider) !== -1) return true; return hashedPassword.length; }, 'Password cannot be blank'); // Validate email is not taken UserSchema .path('email') .validate(function(value, respond) { var self = this; this.constructor.findOne({email: value}, function(err, user) { if(err) throw err; if(user) { if(self.id === user.id) return respond(true); return respond(false); } respond(true); }); }, 'The specified email address is already in use.'); var validatePresenceOf = function(value) { return value && value.length; }; /** * Pre-save hook */ UserSchema .pre('save', function(next) { if (!this.isNew) return next(); if (!validatePresenceOf(this.hashedPassword) && authTypes.indexOf(this.provider) === -1) next(new Error('Invalid password')); else next(); }); /** * Methods */ UserSchema.methods = { /** * Authenticate - check if the passwords are the same * * @param {String} plainText * @return {Boolean} * @api public */ authenticate: function(plainText) { return this.encryptPassword(plainText) === this.hashedPassword; }, /** * Make salt * * @return {String} * @api public */ makeSalt: function() { return crypto.randomBytes(16).toString('base64'); }, /** * Encrypt password * * @param {String} password * @return {String} * @api public */ encryptPassword: function(password) { if (!password || !this.salt) return ''; var salt = new Buffer(this.salt, 'base64'); return crypto.pbkdf2Sync(password, salt, 10000, 64).toString('base64'); } }; module.exports = mongoose.model('User', UserSchema); diff --git a/server/app.js b/server/app.js index e47b49e..98fc9e8 100644 --- a/server/app.js +++ b/server/app.js @@ -1,74 +1,74 @@ /** * Main application file */ 'use strict'; // Set default node environment to development process.env.NODE_ENV = process.env.NODE_ENV || 'development'; var express = require('express'); var mongoose = require('mongoose'); var config = require('./config/environment'); var Adaptor = require('./components/adaptor'); var ObjectCache = require('./components/objectcache'); // Connect to database mongoose.connect(config.mongo.uri, config.mongo.options); // Populate DB with sample data if(config.seedDB) { require('./config/seed'); } // Setup server var app = express(); +app.set('objectCache', new ObjectCache()); + var server = require('http').createServer(app); var socketio = require('socket.io')(server, { path: '/socket.io' }); var adaptor; -var objectCache; var sockets = []; require('./config/socketio')(socketio); require('./config/express')(app); require('./routes')(app); server.on('connection', function (socket) { sockets.push(socket); socket.on('close', function () { sockets.splice(sockets.indexOf(socket), 1); }); }); // Start server server.listen(config.port, config.ip, function () { console.log('Express server listening on %d, in %s mode', config.port, app.get('env')); - objectCache = new ObjectCache(); - adaptor = new Adaptor(socketio, objectCache); + adaptor = new Adaptor(socketio, app.get('objectCache')); }); function destroy() { adaptor.destroy(function () { console.log('All realtime clients disconnected.'); - objectCache.destroy(function () { + app.get('objectCache').destroy(function () { console.log('All objects persisted to DB.'); sockets.forEach(function (socket) { socket.destroy(); }); server.close(function () { console.log('HTTP server shut down.'); mongoose.disconnect(function () { console.log('DB connection closed.'); console.log('Everything successfully shut down. Bye!'); process.exit(); }); }); }); }); } process.on("SIGTERM", destroy); process.on("SIGINT", destroy); // Expose app exports = module.exports = app; diff --git a/server/auth/webdav/passport.js b/server/auth/webdav/passport.js index 8a0ce50..77e9482 100644 --- a/server/auth/webdav/passport.js +++ b/server/auth/webdav/passport.js @@ -1,54 +1,58 @@ var passport = require('passport'); var LocalStrategy = require('passport-local').Strategy; var dav = require('dav'); -dav.debug.enabled = true; + exports.setup = function (User, config) { passport.use(new LocalStrategy({ usernameField: 'email', passwordField: 'password' // this is the virtual field on the model }, function(email, password, done) { var client = new dav.Client( new dav.transport.Basic(new dav.Credentials({ username: email, password: password })), { baseUrl: config.storage.server } ); client.send( dav.request.basic({ method: 'OPTIONS', data: ''}), config.storage.path ).then( function success() { User.findOne({ email: email.toLowerCase() }, function (err, user) { if (err) { return done(err); } if (!user) { var newUser = new User({ name: email, email: email, provider: 'webdav', - role: 'user' + role: 'user', + webdav: { + username: email, + password: password + } }); newUser.save(function (err, user) { if (err) { return done(err); } if (!err) { return done(null, user); } }); } else { return done(null, user); } }); }, function failure(err) { return done(err); } ); } )); }; diff --git a/server/components/adaptor/index.js b/server/components/adaptor/index.js index 88a0866..042be99 100644 --- a/server/components/adaptor/index.js +++ b/server/components/adaptor/index.js @@ -1,84 +1,97 @@ /* * Copyright (C) 2015 KO GmbH * * @licstart * This file is part of Kotype. * * Kotype is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License (GNU AGPL) * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * Kotype is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Kotype. If not, see . * @licend * * @source: https://github.com/kogmbh/Kotype/ */ "use strict"; /*jslint nomen: true, unparam: true */ /*global require, console, setInterval, module */ var async = require("async"), Room = require("./room"), Document = require("../../api/document/document.model").Document, User = require("../../api/user/user.model"); // Maintains an in-memory cache of users, documents, and sessions. // And writes/reads them from the DB on demand. var ServerAdaptor = function (socketServer, objectCache) { var rooms = {}; function addToRoom(documentId, socket) { var room = rooms[documentId]; if (!room) { - Document.findById(documentId, function (err, doc) { + Document.findById(documentId).populate('creator', 'name email') + .exec(function (err, doc) { if (err) { return console.log(err); } if (!doc) { return console.log("documentId unknown:"+documentId); } - room = new Room(doc, objectCache, function () { + var document = objectCache.getTrackedObject(doc); + document.live = true; + room = new Room(document, objectCache, function () { rooms[documentId] = room; room.attachSocket(socket); }); + + var interval = setInterval(function () { + if (!document.live) { + clearInterval(interval); + objectCache.forgetTrackedObject(document); + room.destroy(function () { + delete rooms[documentId]; + }); + } + }, 500); }); } else { room.attachSocket(socket); } } this.destroy = function (callback) { async.each(Object.keys(rooms), function (documentId, cb) { rooms[documentId].destroy(cb); }, function () { rooms = {}; callback() }); }; function init() { socketServer.on("connection", function (socket) { User.findById(socket.decoded_token._id, function (err, user) { if (err) { return console.log(err); } socket.user = user; socket.on("join", function (data) { var documentId = data.documentId; if (documentId) { console.log("Authorized user " + user.name + " for document " + documentId); addToRoom(documentId, socket); } else { console.log("Error: Client did not specify a document ID"); } }); }); }); } init(); }; module.exports = ServerAdaptor; diff --git a/server/components/adaptor/room.js b/server/components/adaptor/room.js index 47fdfe5..7c276c1 100644 --- a/server/components/adaptor/room.js +++ b/server/components/adaptor/room.js @@ -1,370 +1,370 @@ /* * Copyright (C) 2015 KO GmbH * * @licstart * This file is part of Kotype. * * Kotype is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License (GNU AGPL) * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * Kotype is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Kotype. If not, see . * @licend * * @source: https://github.com/kogmbh/Kotype/ */ var async = require("async"); var _ = require("lodash"); var RColor = require('../colors'); var DocumentChunk = require("../../api/document/document.model").DocumentChunk; -var Room = function (doc, objectCache, cb) { - var document, - chunk, +var Room = function (document, objectCache, cb) { + var chunk, hasCursor = {}, sockets = [], userColorMap = {}, randomColor = new RColor(), serverSeq = 0; function trackTitle(ops) { var newTitle, i; for (i = 0; i < ops.length; i += 1) { if (ops[i].optype === "UpdateMetadata" && ops[i].setProperties["dc:title"] !== undefined) { newTitle = ops[i].setProperties["dc:title"]; } } if (newTitle !== undefined) { if (newTitle.length === 0) { newTitle = "Untitled Document"; } } if (newTitle) { document.title = newTitle; } } function trackEditors() { // TODO: rather track by ops, to decouple from socket implementation sockets.forEach(function (socket) { var _id = socket.user._id; if (document.editors.indexOf(_id) === -1) { document.editors.push(_id); } }); } function trackCursors(ops) { var i; for (i = 0; i < ops.length; i += 1) { if (ops[i].optype === "AddCursor") { hasCursor[ops[i].memberid] = true; } if (ops[i].optype === "RemoveCursor") { hasCursor[ops[i].memberid] = false; } } } function sanitizeDocument() { var ops = chunk.operations, unbalancedCursors = {}, unbalancedMembers = {}, lastAccessDate = document.date, newOps = [], i; for (i = 0; i < ops.length; i += 1) { if (ops[i].optype === "AddCursor") { unbalancedCursors[ops[i].memberid] = true; } else if (ops[i].optype === "RemoveCursor") { unbalancedCursors[ops[i].memberid] = false; } else if (ops[i].optype === "AddMember") { unbalancedMembers[ops[i].memberid] = true; } else if (ops[i].optype === "RemoveMember") { unbalancedMembers[ops[i].memberid] = false; } } Object.keys(unbalancedCursors).forEach(function (memberId) { if (unbalancedCursors[memberId]) { newOps.push({ optype: "RemoveCursor", memberid: memberId, timestamp: lastAccessDate }); } }); Object.keys(unbalancedMembers).forEach(function (memberId) { if (unbalancedMembers[memberId]) { newOps.push({ optype: "RemoveMember", memberid: memberId, timestamp: lastAccessDate }); } }); if (newOps.length) { // Update op stack chunk.operations = chunk.operations.concat(newOps); serverSeq = chunk.operations.length; } } function broadcastMessage(message, data) { sockets.forEach(function (peerSocket) { peerSocket.emit(message, data) }); } function sendOpsToMember(socket, ops) { socket.emit("new_ops", { head: serverSeq, ops: ops }); } function replayOpsToMember(socket) { socket.emit("replay", { head: serverSeq, ops: chunk.operations }); } function broadcastOpsByMember(socket, ops) { if (!ops.length) { return; } sockets.forEach(function (peerSocket) { if (peerSocket !== socket) { sendOpsToMember(peerSocket, ops); } }); } function writeOpsToDocument(ops, cb) { - if (!ops.length) { + if (!ops.length || !document.live) { cb(); } trackTitle(ops); trackEditors(); // Update op stack chunk.operations = chunk.operations.concat(ops); serverSeq = chunk.operations.length; // Update modified date document.date = new Date(); cb(); } function addMember(user, cb) { var memberId, op, timestamp = Date.now(), color = userColorMap[user._id]; memberId = user.name + "_" + timestamp.toString(); // Let user colors persist in a Room even after they've // left and joined. if (!color) { userColorMap[user._id] = color = randomColor.get(true, 0.7); } op = { optype: "AddMember", memberid: memberId, timestamp: timestamp, setProperties: { fullName: user.name, color: color } }; writeOpsToDocument([op], function () { cb(memberId, [op]); }); } function removeMember(memberId, cb) { var ops = [], timestamp = Date.now(); if (hasCursor[memberId]) { ops.push({ optype: "RemoveCursor", memberid: memberId, timestamp: timestamp }); } ops.push({ optype: "RemoveMember", memberid: memberId, timestamp: timestamp }); writeOpsToDocument(ops, function () { cb(ops); }); } function getOpsAfter(basedOn) { return chunk.operations.slice(basedOn, serverSeq); } this.socketCount = function () { return sockets.length; }; this.attachSocket = function (socket) { // Add the socket to the room and give the // client it's unique memberId addMember(socket.user, function (memberId, ops) { socket.memberId = memberId; sockets.push(socket); broadcastOpsByMember(socket, ops); socket.emit("join_success", { memberId: memberId, snapshotId: chunk.fileId }); // Service replay requests socket.on("replay", function () { replayOpsToMember(socket); }); // Store, analyze, and broadcast incoming commits socket.on("commit_ops", function (data, cb) { var clientSeq = data.head, ops = data.ops; if (clientSeq === serverSeq) { writeOpsToDocument(ops, function () { cb({ conflict: false, head: serverSeq }); trackCursors(ops); broadcastOpsByMember(socket, data.ops); }); } else { cb({ conflict: true }); } }); // Service various requests socket.on("access_get", function (data, cb) { cb({ access: document.isPublic ? "public" : "normal" }); }); if (socket.user.identity !== "guest") { socket.on("access_change", function (data) { document.isPublic = data.access === "public"; broadcastMessage("access_changed", { access: data.access === "public" ? "public" : "normal" }); if (data.access !== "public") { sockets.forEach(function (peerSocket) { if (peerSocket.user.identity === "guest") { console.log(peerSocket.user.name); removeSocket(peerSocket); } }); } }); } // Handle dropout events socket.on("leave", function () { removeSocket(socket); }); socket.on("disconnect", function () { removeSocket(socket); }); }); }; function detachSocket(socket, callback) { removeMember(socket.memberId, function (ops) { broadcastOpsByMember(socket, ops); socket.removeAllListeners(); function lastCB() { socket.removeAllListeners(); if (callback) { callback(); } } // If a socket that is already connected is being // removed, this means that this is a deliberate // kicking-out, and not a natural event that could // result in a reconnection later. Therefore, clean // up. if (socket.connected) { console.log(socket.user.name + " is connected, removing"); socket.on('disconnect', lastCB); socket.emit("kick"); socket.emit("disconnect"); } else { console.log(socket.user.name + " is not connected, removing"); lastCB(); } }); } function removeSocket(socket) { var index = sockets.indexOf(socket); detachSocket(socket); if (index !== -1) { sockets.splice(index, 1); } } this.getDocument = function () { return document; }; this.destroy = function (callback) { async.each(sockets, function (socket, cb) { detachSocket(socket, cb); }, function () { + objectCache.forgetTrackedObject(chunk); + document.live = false; sockets.length = 0; callback(); }); }; function init() { // Setup caching - document = objectCache.getTrackedObject(doc); DocumentChunk.findById(_.last(document.chunks), function (err, lastChunk) { chunk = objectCache.getTrackedObject(lastChunk); // Sanitize leftovers from previous session, if any sanitizeDocument(); cb(); }); } init(); }; module.exports = Room; diff --git a/server/components/objectcache/index.js b/server/components/objectcache/index.js index a9a484a..bd30a85 100644 --- a/server/components/objectcache/index.js +++ b/server/components/objectcache/index.js @@ -1,89 +1,89 @@ /* * Copyright (C) 2015 KO GmbH * * @licstart * This file is part of Kotype. * * Kotype is free software: you can redistribute it and/or modify it * under the terms of the GNU Affero General Public License (GNU AGPL) * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * Kotype is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with Kotype. If not, see . * @licend * * @source: https://github.com/kogmbh/Kotype/ */ /*jslint nomen: true, unparam: true */ /*global require, console, setInterval, module */ var mongoose = require("mongoose"), async = require("async"), ObjectCache; -// Maintains an in-memory cache of objects from mongoose collections, +// Maintains an in-memory cache of documents from mongoose collections, // and writes them to the DB periodically. var ObjectCache = function () { "use strict"; var objects = {}, timer, writeInterval = 1000 * 5; function isTracked(object) { return objects.hasOwnProperty(object._id); } this.isTracked = isTracked; function getTrackedObject(object) { var id = object._id; if (!objects[id]) { objects[id] = object; } return objects[id]; } this.getTrackedObject = getTrackedObject; function forgetTrackedObject(object) { var id = object._id; if (objects.hasOwnProperty(id)) { delete objects[id]; } } this.forgetTrackedObject = forgetTrackedObject; function saveObjects(callback) { async.each(Object.keys(objects), function (id, cb) { - if (objects[id].isModified()) { + if (objects[id].isModified() && objects[id].live) { objects[id].save(cb); } else { cb(); } }, callback); } this.destroy = function (callback) { clearInterval(timer); saveObjects(callback); }; function init() { timer = setInterval(function () { saveObjects(); }, writeInterval); } init(); }; module.exports = ObjectCache; diff --git a/server/config/local.env.sample.js b/server/config/local.env.sample.js index 8604415..eeda872 100644 --- a/server/config/local.env.sample.js +++ b/server/config/local.env.sample.js @@ -1,25 +1,25 @@ 'use strict'; // Use local.env.js for environment variables that grunt will set when the server starts locally. // Use for your api keys, secrets, etc. This file should not be tracked by git. // // You will need to set these on the server you deploy to. module.exports = { DOMAIN: 'http://localhost:9000', SESSION_SECRET: 'manticore-secret', // Control debug level for modules using visionmedia/debug DEBUG: '', /* * Supported authentication strategies. * 1. 'local' for storing everything in Mongo/GridFS, auth using username/password * 2. 'webdav' for linking with a WebDAV server, auth using WebDAV credentials */ STORAGE: 'webdav', // More configuration for the chosen auth type. None required for 'local' - WEBDAV_SERVER: 'https://apps.kolabnow.com', - WEBDAV_PATH: '/' + WEBDAV_SERVER: 'https://demo.owncloud.org', + WEBDAV_PATH: '/remote.php/webdav' };