diff --git a/app/model/kolabobject.py b/app/model/kolabobject.py index 02a535f..08c4fe6 100644 --- a/app/model/kolabobject.py +++ b/app/model/kolabobject.py @@ -1,385 +1,385 @@ # -*- coding: utf-8 -*- # # Copyright 2014 Kolab Systems AG (http://www.kolabsys.com) # # Thomas Bruederli # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # import json import pytz import hashlib import datetime import logging from dateutil.parser import parse as parse_date from pykolab.xml.utils import compute_diff from collections import OrderedDict from email import message_from_string -from app import storage log = logging.getLogger('model') class KolabObject(object): """ Base Model class for accessing Kolab Groupware Object data """ folder_type = 'unknown' x_kolab_type = 'application/x-vnd.kolab.*' def __init__(self, env={}): from flask import current_app + from app.storage import instance as storage_instance self.env = env self.config = current_app.config - self.storage = storage.factory() + self.storage = storage_instance def created(self, uid, mailbox, msguid=None): """ Provide created date and user """ changelog = self._object_changelog(uid, mailbox, msguid, 1) if changelog and len(changelog) > 0: for change in changelog: if change['op'] == 'APPEND': change['uid'] = uid change.pop('op', None) return change return False def lastmodified(self, uid, mailbox, msguid=None): """ Provide last change information """ changelog = self._object_changelog(uid, mailbox, msguid, -3) if changelog and len(changelog) > 0: for change in changelog: if change['op'] == 'APPEND': change['uid'] = uid change.pop('op', None) return change return False def changelog(self, uid, mailbox, msguid=None): """ Full changelog """ changelog = self._object_changelog(uid, mailbox, msguid) if changelog: return dict(uid=uid, changes=changelog) return False def get(self, uid, rev, mailbox, msguid=None): """ Retrieve an old revision """ obj = self._get(uid, mailbox, msguid, rev) if obj is not None: return dict(uid=uid, rev=rev, xml=str(obj), mailbox=mailbox) return False def _get(self, uid, mailbox, msguid, rev): """ Get an old revision and return the pykolab.xml object """ obj = False rec = self.storage.get_revision(uid, self._resolve_mailbox_uri(mailbox), msguid, rev) if rec is not None: raw = self.storage.get_message_data(rec) try: message = message_from_string(raw.encode('utf8','replace')) obj = self._object_from_message(message) or False except Exception, e: log.warning("Failed to parse mime message for UID %s @%s: %r", uid, rev, e) if obj is False: log.warning("Failed to parse mime message for UID %s @%s", uid, rev) return obj def diff(self, uid, rev1, rev2, mailbox, msguid=None, instance=None): """ Compare two revisions of an object and return a list of property changes """ rev_old = rev1 rev_new = rev2 if rev_old >= rev_new: raise ValueError("Invalid argument 'rev'") old = self._get(uid, mailbox, msguid, rev_old) if old == False: raise ValueError("Object %s @rev:%s not found" % (uid, str(rev_old))) new = self._get(uid, mailbox, msguid, rev_new) if new == False: raise ValueError("Object %s @rev:%s not found" % (uid, str(rev_new))) # compute diff for the requested recurrence instance if instance is not None and hasattr(old, 'get_instance') and hasattr(new, 'get_instance'): log.debug("Get recurrence instance %s for object %s", instance, uid) try: recurrence_date = datetime.datetime.strptime(str(instance), "%Y%m%dT%H%M%S") except: try: recurrence_date = datetime.datetime.strptime(str(instance), "%Y%m%d").date() except: raise ValueError("Invalid isntance identifier %r" % (instance)) _old = old.get_instance(recurrence_date) if _old == None: raise ValueError("Object instance %s-%s @rev:%s not found" % (uid, instance, str(rev_old))) old_dict = _old.to_dict() old_dict['recurrence'] = old.get_recurrence().to_dict() _new = new.get_instance(recurrence_date) if _new == None: raise ValueError("Object instance %s-%s @rev:%s not found" % (uid, instance, str(rev_new))) new_dict = _new.to_dict() new_dict['recurrence'] = new.get_recurrence().to_dict() else: old_dict = old.to_dict() new_dict = new.to_dict() # compute diff and compose result result = dict(uid=uid, rev=rev_new, changes=convert2primitives(compute_diff(old_dict, new_dict, False))) if instance is not None: result['instance'] = instance return result def rawdata(self, uid, mailbox, rev, msguid=None): """ Get the full message payload of an old revision """ rec = self.storage.get_revision(uid, self._resolve_mailbox_uri(mailbox), msguid, rev) if rec is not None: return self.storage.get_message_data(rec) return False def _object_from_message(self, message): """ To be implemented in derived classes """ return None def _object_changelog(self, uid, mailbox, msguid, limit=None): """ Query storage for changelog events related to the given UID """ # this requires a user context if not self.env.has_key('REQUEST_USER') or not self.env['REQUEST_USER']: return None # fetch event log from storage eventlog = self.storage.get_events(uid, self._resolve_mailbox_uri(mailbox), msguid, limit) # convert logstash entries into a sane changelog event_op_map = { 'MessageNew': 'APPEND', 'MessageAppend': 'APPEND', 'MessageTrash': 'DELETE', 'MessageMove': 'MOVE', } last_append_uid = 0 result = [] if eventlog is not None: for log in eventlog: # filter MessageTrash following a MessageAppend event (which is an update operation) if log['event'] == 'MessageTrash' and last_append_uid > int(log['uidset']): continue # remember last appended message uid if log['event'] == 'MessageAppend' and log.has_key('uidset'): last_append_uid = int(log['uidset']) # compose log entry to return logentry = { 'rev': log.get('revision', None), 'op': event_op_map.get(log['event'], 'UNKNOWN'), 'mailbox': self._convert_mailbox_uri(log.get('mailbox', None)) } try: timestamp = parse_date(log['timestamp']) logentry['date'] = datetime.datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%SZ") except: logentry['date'] = log['timestamp'] # TODO: translate mailbox identifier back to a relative folder path? logentry['user'] = self._get_user_info(log) result.append(logentry) return result def _resolve_username(self, user): """ Resovle the given username to the corresponding nsuniqueid from LDAP """ # find existing entry in our storage backend result = self.storage.get_user(username=user) if result and result.has_key('id'): # TODO: cache this lookup in memory? return result['id'] # fall-back: return md5 sum of the username to make usernames work as fields/keys in elasticsearch return hashlib.md5(user).hexdigest() def _get_user_info(self, rec): """ Return user information (name, email) related to the given log entry """ if rec.has_key('user_id'): # get real user name from rec['user_id'] user = self.storage.get_user(id=rec['user_id']) if user is not None: return "%(cn)s <%(user)s>" % user if rec.has_key('user'): return rec['user'] elif rec['event'] == 'MessageAppend' and rec['headers'].has_key('From'): # fallback to message headers return rec['headers']['From'][0] return 'unknown' def _resolve_mailbox_uri(self, mailbox): """ Convert the given mailbox string into an absolute URI regarding the context of the requesting user. """ # this requires a user context if not self.env.has_key('REQUEST_USER') or not self.env['REQUEST_USER']: return mailbox if mailbox is None: return None # mailbox already is an absolute path if mailbox.startswith('user/') or mailbox.startswith('shared/'): return mailbox domain = '' user = self.env['REQUEST_USER'] if '@' in user: (user,_domain) = user.split('@', 1) domain = '@' + _domain owner = user path = '/' + mailbox # TODO: make this configurable or read from IMAP shared_prefix = 'Shared Folders/' others_prefix = 'Other Users/' imap_delimiter = '/' # case: shared folder if mailbox.startswith(shared_prefix): return mailbox[len(shared_prefix):] + domain # case: other users folder if mailbox.startswith(others_prefix): (owner, subpath) = mailbox[len(others_prefix):].split(imap_delimiter, 1) path = imap_delimiter + subpath if mailbox.upper() == 'INBOX': path = '' # default: personal namespace folder return 'user/' + owner + path + domain def _convert_mailbox_uri(self, mailbox): """ Convert the given absolute mailbox URI into a relative folder name regarding the context of the requesting user. """ if mailbox is None: return None # this requires a user context request_user = str(self.env.get('REQUEST_USER', '')).lower() # TODO: make this configurable or read from IMAP shared_prefix = 'Shared Folders' others_prefix = 'Other Users' imap_delimiter = '/' domain = '' if '@' in mailbox: (folder,domain) = mailbox.split('@', 1) else: folder = mailbox if folder.startswith('user/'): parts = folder.split(imap_delimiter, 2) if len(parts) > 2: (prefix,user,path) = parts else: (prefix,user) = parts path = '' if len(path) == 0: path = 'INBOX' if not (user + '@' + domain).lower() == request_user: folder = imap_delimiter.join([others_prefix, user, path]) else: folder = path elif folder.startswith('shared/'): folder = imap_delimiter.join([shared_prefix, folder]) return folder ##### Utility functions def convert2primitives(struct): """ Convert complex types like datetime into primitives which can be serialized into JSON """ out = None if isinstance(struct, datetime.datetime): tz = 'Z' if struct.tzinfo == pytz.utc else '%z' out = struct.strftime('%Y-%m-%dT%H:%M:%S' + tz) elif isinstance(struct, datetime.date): out = struct.strftime('%Y-%m-%d') elif isinstance(struct, list): out = [convert2primitives(x) for x in struct] elif isinstance(struct, OrderedDict): out = OrderedDict([(key,convert2primitives(struct[key])) for key in struct.keys()]) elif isinstance(struct, dict): out = dict(zip(struct.keys(), map(convert2primitives, struct.values()))) else: out = struct return out diff --git a/app/storage/__init__.py b/app/storage/__init__.py index fb01014..61dc1f3 100644 --- a/app/storage/__init__.py +++ b/app/storage/__init__.py @@ -1,139 +1,143 @@ # -*- coding: utf-8 -*- # # Copyright 2014-2015 Kolab Systems AG (http://www.kolabsys.com) # # Thomas Bruederli # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # class AbstractStorage(object): """ Interface class for abstracted access to storage """ def get(self, key, index, doctype=None, fields=None, **kw): """ Standard API for accessing key/value storage @param key: Primary key of the record to retrieve @param index: Index name (i.e. database name) @param doctype: Document type (i.e. table name) @param fields: List of fields to retrieve (string, comma-separated) """ return None def set(self, key, value, index, doctype=None, **kw): """ Standard API for writing to key/value storage @param key: Primary key of the record to create/update @param value: The record data as dict with field => value pairs @param index: Index name (i.e. database name) @param doctype: Document type (i.e. table name) """ return None def select(self, query, index, doctype=None, fields=None, sortby=None, limit=None, **kw): """ Standard API for querying storage @param query: List of query parameters, each represented as a triplet of ( ). combined to an AND list of search criterias. can either be - a string for direct comparison - a list for "in" comparisons - a tuple with two values for range queries @param index: Index name (i.e. database name) @param doctype: Document type (i.e. table name) @param fields: List of fields to retrieve (string, comma-separated) @param sortby: Fields to be used fort sorting the results (string, comma-separated) @param limit: Number of records to return """ return None def get_user(self, id=None, username=None): """ API for resolving usernames and reading user info: @param id: Unique identifier for a user record @param username: Non-unique username to resolve """ return None def get_folder(self, mailbox=None, user=None): """ API for finding an IMAP folder record @param mailbox: Mailbox name @param user: User context """ return None def get_events(self, objuid, mailbox, msguid, limit=None): """ API for querying event notifications @param objuid: Groupware object UID @param mailbox: IMAP folder that message/object currently resides in @param msguid: IMAP message UID (the last known) @param limit: Number of records to return (negative number for most recent first) """ return None def get_revision(self, objuid, mailbox, msguid, rev): """ API to get a certain revision of a stored object @param objuid: Groupware object UID @param mailbox: IMAP folder that message/object currently resides in @param msguid: IMAP message UID (the last known) @param rev: Revision identifier """ return None def get_message_data(self, rec): """ Getter for the full IMAP message payload for the given event record as previously fetched with get_events() or get_revision() """ return rec.get('message', None) def StorageException(Exception): def __init__(self, message): Exception.__init__(self, message) def factory(): """ Factory function to return the right storage backend instance """ from flask import current_app conf = current_app.config if conf['STORAGE'].has_key('backend'): backend = conf['STORAGE']['backend'] else: backend = 'riak' if backend == 'elasticsearch': from elasticsearch_storage import ElasticseachStorage return ElasticseachStorage() elif backend == 'riak': from riak_storage import RiakStorage return RiakStorage() raise StorageException("Invalid backend %r specified" % (backend)) + + +# create singleton instance +instance = factory() diff --git a/app/storage/caching.py b/app/storage/caching.py new file mode 100644 index 0000000..261d3cb --- /dev/null +++ b/app/storage/caching.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2015 Kolab Systems AG (http://www.kolabsys.com) +# +# Thomas Bruederli +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +import time + +class CachedDict(object): + """ + dict-like class which drops items after the given TTL + """ + def __init__(self, ttl=60): + # TODO: use memcache for distributed memory-based caching + self.ttl = ttl + self.data = {} + + def get(self, key, default=None): + return self.data.get(key, default) + + def remove(self, key): + self.data.remove(key) + + def pop(self, key, default=None): + item = self.data.pop(key) + return item[0] if item is not None else default + + def update(self, other): + expire = int(time.time()) + self.ttl + self.data.update(dict((k, (v, expire)) for k, v in other.items())) + + def keys(self): + now = int(time.time()) + return [k for k, v in self.data.items() if v[1] > now] + + def values(self): + now = int(time.time()) + return [v[0] for v in self.data.values() if v[1] > now] + + def items(self): + now = int(time.time()) + return dict((k, v[0]) for k, v in self.data.items() if v[1] > now).items() + + def iteritems(self): + return self.items().iteritems() + + def has_key(self, key): + return self.data.has_key(key) and self.data[key][1] > int(time.time()) + + def expunge(self): + now = int(time.time()) + self.data = dict((k, v) for k, v in self.data.items() if v[1] > now) + + def clear(self): + self.data = {} + + def __getitem__(self, key): + return self.data[key][0] + + def __setitem__(self, key, value): + self.data[key] = (value, int(time.time()) + self.ttl) + + def __contains__(self, key): + return self.has_key(key) + + def __len__(self): + return len(self.keys()) + + def __iter__(self): + return self.items().__iter__() diff --git a/app/storage/riak_storage.py b/app/storage/riak_storage.py index fb884f8..6891e85 100644 --- a/app/storage/riak_storage.py +++ b/app/storage/riak_storage.py @@ -1,371 +1,380 @@ # -*- coding: utf-8 -*- # # Copyright 2015 Kolab Systems AG (http://www.kolabsys.com) # # Thomas Bruederli # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # import logging, datetime, urllib, urlparse from riak import RiakClient from riak.mapreduce import RiakKeyFilter, RiakMapReduce from dateutil.parser import parse as parse_date +from caching import CachedDict from flask import current_app from . import AbstractStorage conf = current_app.config log = logging.getLogger('storage') class RiakStorage(AbstractStorage): bucket_types = { 'users': 'egara-lww', 'users-current': 'egara-unique', 'imap-events': 'egara-lww', 'imap-folders': 'egara-lww', 'imap-folders-current': 'egara-unique', 'imap-message-timeline': 'egara-lww' } def __init__(self, *args, **kw): riak_host = 'localhost' riak_port = 8098 self.client = RiakClient( protocol='http', host=conf['STORAGE'].get('riak_host', riak_host), http_port=conf['STORAGE'].get('riak_port', riak_port) ) self.client.set_decoder('application/octet-stream', self._decode_binary) + self.users_cache = CachedDict(ttl=10) def _decode_binary(self, data): return str(data).encode("utf-8") def _get_bucket(self, bucketname): _type = self.bucket_types.get(bucketname, None) if _type: return self.client.bucket_type(_type).bucket(bucketname) return None def get(self, key, index, doctype=None, fields=None, **kw): """ Standard API for accessing key/value storage """ result = None log.debug("Riak get key %r from %r", key, index) try: bucket = self._get_bucket(index) res = bucket.get(key) if res and res.data: result = res.data except Exception, e: log.warning("Riak exception: %s", str(e)) result = None return result def set(self, key, value, index, doctype=None, **kw): """ Standard API for writing to key/value storage """ return False def select(self, query, index, doctype=None, fields=None, sortby=None, limit=None, **kw): """ Standard API for querying storage """ result = None try: pass except Exception, e: log.warning("Riak exception: %s", str(e)) result = None return result def _get_keyfilter(self, index, starts_with=None, ends_with=None, sortby=None, limit=None): """ Helper function to execute a key filter query """ results = None fs = None fe = None if starts_with is not None: fs = RiakKeyFilter().starts_with(starts_with) if ends_with is not None: fe = RiakKeyFilter().ends_with(ends_with) if fs and fe: keyfilter = fs & fe else: keyfilter = fs or fe return self._mapreduce_keyfilter(index, keyfilter, sortby, limit) def _mapreduce_keyfilter(self, index, keyfilter, sortby=None, limit=None): """ Helper function to execute a map-reduce query using the given key filter """ results = None log.debug("Riak query %r with key filter %r", index, keyfilter) mapred = RiakMapReduce(self.client) mapred.add_bucket(self._get_bucket(index)) mapred.add_key_filters(keyfilter) # custom Riak.mapValuesJson() function that also adds the entry key to the data structure mapred.map(""" function(value, keyData, arg) { if (value.not_found) { return [value]; } var _data, data = value["values"][0]["data"]; if (Riak.getClassName(data) !== "Array") { _data = JSON.parse(data); _data["_key"] = value.key; return [_data]; } else { return data } } """) if sortby is not None: comp = '<' if limit is not None and limit < 0 else '>' mapred.reduce_sort('function(a,b){ return (a.%s || 0) %s (b.%s || 0) ? 1 : 0; }' % (sortby, comp, sortby)) if limit is not None: mapred.reduce_limit(abs(limit)) try: results = mapred.run() except Exception, e: log.warning("Riak MapReduce exception: %s", str(e)) results = None return results def get_user(self, id=None, username=None): """ API for resolving usernames and reading user info """ + cache_key = id or username + + # check for cached result + self.users_cache.expunge() + if cache_key and self.users_cache.has_key(cache_key): + log.debug("get_user: return cached value for %r", cache_key) + return self.users_cache[cache_key] + # search by ID using a key filter if id is not None: results = self._get_keyfilter('users', starts_with=id + '::', limit=1) if results and len(results) > 0: + self.users_cache[cache_key] = results[0] return results[0] elif username is not None: user = self.get(username, 'users-current') if user is not None: + self.users_cache[cache_key] = user return user # TODO: query 'users' bucket with an ends_with key filter - # TODO: add a very short-term cache for lookups by ID - return None def get_folder(self, mailbox=None, user=None): """ API for finding IMAP folders and their unique identifiers """ folder_id = self.get(mailbox, 'imap-folders-current') if folder_id is not None: return dict(uri=mailbox, id=folder_id) return None def get_events(self, objuid, mailbox, msguid, limit=None): """ API for querying event notifications """ # 1. get timeline entries for current folder folder = self.get_folder(mailbox) if folder is None: log.info("Folder %r not found in storage", mailbox) return None; object_event_keys = self._get_timeline_keys(objuid, folder['id']) # sanity check with msguid if msguid is not None: key_prefix = 'message::%s::%s' % (folder['id'], str(msguid)) if len([k for k in object_event_keys if k.startswith(key_prefix)]) == 0: log.warning("Sanity check failed: requested msguid %r not in timeline keys %r", msguid, object_event_keys) # TODO: abort? # 3. read each corresponding entry from imap-events filters = None for key in object_event_keys: f = RiakKeyFilter().starts_with(key) if filters is None: filters = f else: filters |= f log.debug("Querying imap-events for keys %r", object_event_keys) if filters is not None: - # TODO: query directly using key? results = self._mapreduce_keyfilter('imap-events', filters, sortby='timestamp_utc', limit=limit) return [self._transform_result(x, 'imap-events') for x in results if x.has_key('event') and not x['event'] == 'MessageExpunge'] \ if results is not None else results return None def _get_timeline_keys(self, objuid, folder_id, length=3): """ Helper method to fetch timeline keys recursively following moves accross folders """ object_event_keys = [] results = self._get_keyfilter('imap-message-timeline', starts_with='message::' + folder_id + '::', ends_with='::' + objuid) if not results or len(results) == 0: log.info("No timeline entry found for %r in folder %r", objuid, folder_id) return object_event_keys; for rec in results: key = '::'.join(rec['_key'].split('::', 4)[0:length]) object_event_keys.append(key) # follow moves and add more :: tuples to our list if rec.has_key('history') and isinstance(rec['history'], dict) and rec['history'].has_key('imap'): old_folder_id = rec['history']['imap'].get('previous_folder', None) if old_folder_id: object_event_keys += self._get_timeline_keys(objuid, old_folder_id, length) return object_event_keys def get_revision(self, objuid, mailbox, msguid, rev): """ API to get a certain revision of a stored object """ # resolve mailbox first folder = self.get_folder(mailbox) if folder is None: log.info("Folder %r not found in storage", mailbox) return None; # expand revision into the ISO timestamp format try: ts = datetime.datetime.strptime(str(rev), "%Y%m%d%H%M%S%f") timestamp = ts.strftime("%Y-%m-%dT%H:%M:%S.%f")[0:23] except Exception, e: log.warning("Invalid revision %r for object %r: %r", rev, objuid, e) return None # query message-timeline entries starting at peak with current folder (aka mailbox) object_event_keys = self._get_timeline_keys(objuid, folder['id'], length=4) # get the one key matching the revision timestamp keys = [k for k in object_event_keys if '::' + timestamp in k] log.debug("Get revision entry %r from candidates %r", timestamp, object_event_keys) if len(keys) == 1: result = self.get(keys[0], 'imap-events') if result is not None: return self._transform_result(result, 'imap-events') else: log.info("Revision timestamp %r doesn't match a single key from: %r", timestamp, object_event_keys) return None def get_message_data(self, rec): """ Getter for the full IMAP message payload for the given event record as previously fetched with get_events() or get_revision() """ return rec.get('message', None) def _transform_result(self, result, index): """ Turn an imap-event record into a dict to match the storage API """ result['_index'] = index # derrive (numeric) revision from timestamp if result.has_key('timestamp_utc') and result.get('event','') in ['MessageAppend','MessageMove']: try: ts = parse_date(result['timestamp_utc']) result['revision'] = ts.strftime("%Y%m%d%H%M%S%f")[0:17] except: pass # extract folder name from uri if result.has_key('uri') and not result.has_key('mailbox'): uri = self._parse_imap_uri(result['uri']) username = uri['user'] domain = uri['domain'] folder_name = uri['path'] folder_path = uri['path'] imap_delimiter = '/' if not username == None: if folder_name == "INBOX": folder_path = imap_delimiter.join(['user', '%s@%s' % (username, domain)]) else: folder_path = imap_delimiter.join(['user', username, '%s@%s' % (folder_name, domain)]) result['mailbox'] = folder_path if not result.has_key('uidset') and uri.has_key('UID'): result['uidset'] = uri['UID'] return result def _parse_imap_uri(self, uri): """ Split the given URI string into its components """ split_uri = urlparse.urlsplit(uri) if len(split_uri.netloc.split('@')) == 3: (username, domain, server) = split_uri.netloc.split('@') elif len(split_uri.netloc.split('@')) == 2: (username, server) = split_uri.netloc.split('@') domain = None elif len(split_uri.netloc.split('@')) == 1: username = None domain = None server = split_uri.netloc result = dict(user=username, domain=domain, host=server) # First, .path == '/Calendar/Personal%20Calendar;UIDVALIDITY=$x[/;UID=$y] # Take everything after the first slash, and omit any INBOX/ stuff. path_str = '/'.join([x for x in split_uri.path.split('/') if not x == 'INBOX'][1:]) path_arr = path_str.split(';') result['path'] = urllib.unquote(path_arr[0]) # parse the path/query parameters into a dict param = dict() for p in path_arr[1:]: if '=' in p: (key,val) = p.split('=', 2) result[key] = urllib.unquote(val) return result diff --git a/tests/test_storage.py b/tests/test_storage.py index 38fb22f..21a98d0 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -1,91 +1,88 @@ import os, flask, json, email -from app import storage from twisted.trial import unittest -class FlaskCurrentApp(object): +class FlaskAppMockup(object): config = dict( STORAGE=dict( backend='riak', riak_host='127.0.0.1', riak_port='10018' ), CONFIG_DIR=os.path.join(os.path.abspath(os.path.dirname(__file__)), '..', 'config') ) def __init__(self): import logging.config logging.config.fileConfig(self.config['CONFIG_DIR'] + '/bonnie-flask.conf') class TestStorage(unittest.TestCase): def setUp(self): # patch current_app to return static config - self.patch(flask, 'current_app', FlaskCurrentApp()) + self.patch(flask, 'current_app', FlaskAppMockup()) + + from app.storage import instance as storage_instance + self.storage = storage_instance def test_000_instance(self): - strg = storage.factory() - self.assertIsInstance(strg, storage.AbstractStorage) - self.assertIsInstance(strg, storage.riak_storage.RiakStorage) + from app import storage + self.assertIsInstance(self.storage, storage.AbstractStorage) + self.assertIsInstance(self.storage, storage.riak_storage.RiakStorage) def test_001_get_user_by_name(self): - strg = storage.factory() - user = strg.get_user(username='john.doe@example.org') + user = self.storage.get_user(username='john.doe@example.org') self.assertIsInstance(user, dict) self.assertEqual(user['id'], '55475201-bdc211e4-881c96ef-f248ab46') self.assertEqual(user['user'], 'john.doe@example.org') def test_002_get_user_by_id(self): - strg = storage.factory() - user = strg.get_user(id='55475201-bdc211e4-881c96ef-f248ab46') + user = self.storage.get_user(id='55475201-bdc211e4-881c96ef-f248ab46') self.assertIsInstance(user, dict) self.assertEqual(user['user'], 'john.doe@example.org') self.assertEqual(user['id'], '55475201-bdc211e4-881c96ef-f248ab46') def test_010_get_folder_id(self): - strg = storage.factory() - folder = strg.get_folder('user/john.doe/Calendar@example.org') + folder = self.storage.get_folder('user/john.doe/Calendar@example.org') self.assertIsInstance(folder, dict) self.assertEqual(folder['id'], 'a5660caa-3165-4a84-bacd-ef4b58ef3663') def test_020_get_events(self): - strg = storage.factory() oldmailbox = 'user/john.doe/Calendar@example.org' mailbox = 'user/john.doe/Testing@example.org' - events = strg.get_events('6EE0570E8CA21DDB67FC9ADE5EE38E7F-A4BF5BBB9FEAA271', mailbox, 2) + events = self.storage.get_events('6EE0570E8CA21DDB67FC9ADE5EE38E7F-A4BF5BBB9FEAA271', mailbox, 2) self.assertEqual(len(events), 6) self.assertEqual(events[0]['event'], 'MessageAppend') self.assertEqual(events[0]['uidset'], '3') self.assertEqual(events[0]['mailbox'], oldmailbox) self.assertEqual(events[1]['event'], 'MessageAppend') self.assertEqual(events[1]['uidset'], '4') self.assertEqual(events[1]['mailbox'], oldmailbox) self.assertEqual(events[2]['event'], 'MessageTrash') self.assertEqual(events[2]['uidset'], '3') self.assertEqual(events[3]['event'], 'MessageMove') self.assertEqual(events[3]['uidset'], '1') self.assertEqual(events[4]['event'], 'MessageAppend') self.assertEqual(events[4]['uidset'], '2') self.assertEqual(events[4]['mailbox'], mailbox) self.assertEqual(events[5]['event'], 'MessageTrash') self.assertEqual(events[5]['uidset'], '1') self.assertEqual(events[5]['mailbox'], mailbox) def test_025_get_revision(self): - strg = storage.factory() uid = '5A637BE7895D785671E1732356E65CC8-A4BF5BBB9FEAA271' mailbox = 'user/john.doe/Calendar@example.org' - events = strg.get_events(uid, mailbox, None, limit=1) + events = self.storage.get_events(uid, mailbox, None, limit=1) self.assertEqual(len(events), 1) - rec = strg.get_revision(uid, mailbox, None, events[0]['revision']) + rec = self.storage.get_revision(uid, mailbox, None, events[0]['revision']) self.assertIsInstance(rec, dict) self.assertEqual(rec['event'], 'MessageAppend') - msgsource = strg.get_message_data(rec) + msgsource = self.storage.get_message_data(rec) self.assertIsInstance(msgsource, unicode) message = email.message_from_string(msgsource.encode('utf8','replace')) self.assertIsInstance(message, email.message.Message) self.assertTrue(message.is_multipart())