diff --git a/pykolab/imap/__init__.py b/pykolab/imap/__init__.py index fe37dfc..ce5612f 100644 --- a/pykolab/imap/__init__.py +++ b/pykolab/imap/__init__.py @@ -1,1187 +1,1185 @@ # -*- coding: utf-8 -*- # Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com) # # Jeroen van Meeuwen (Kolab Systems) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import logging import re import time import sys from urlparse import urlparse import pykolab from pykolab import utils from pykolab.translate import _ log = pykolab.getLogger('pykolab.imap') conf = pykolab.getConf() class IMAP(object): def __init__(self): # Pool of named IMAP connections, by hostname self._imap = {} # Place holder for the current IMAP connection self.imap = None def cleanup_acls(self, aci_subject): lm_suffix = "" log.info(_("Cleaning up ACL entries for %s across all folders") % (aci_subject)) if len(aci_subject.split('@')) > 1: lm_suffix = "@%s" % (aci_subject.split('@')[1]) shared_folders = self.imap.lm( "shared/*%s" % (lm_suffix) ) user_folders = self.imap.lm( "user/*%s" % (lm_suffix) ) log.debug( _("Cleaning up ACL entries referring to identifier %s") % ( aci_subject ), level=5 ) # For all folders (shared and user), ... folders = user_folders + shared_folders log.debug(_("Iterating over %d folders") % (len(folders)), level=5) # ... loop through them and ... for folder in folders: # ... list the ACL entries acls = self.imap.lam(folder) # For each ACL entry, see if we think it is a current, valid entry for acl_entry in acls.keys(): # If the key 'acl_entry' does not exist in the dictionary of valid # ACL entries, this ACL entry has got to go. if acl_entry == aci_subject: # Set the ACL to '' (effectively deleting the ACL entry) log.debug(_("Removing acl %r for subject %r from folder %r") % (acls[acl_entry],acl_entry,folder), level=8) self.set_acl(folder, acl_entry, '') def connect(self, uri=None, server=None, domain=None, login=True): """ Connect to the appropriate IMAP backend. Supply a domain (name space) configured in the configuration file as a section, with a setting 'imap_uri' to connect to a domain specific IMAP server, or specify an URI to connect to that particular IMAP server (in that order). Routines sitting behind this will take into account Cyrus IMAP Murder capabilities, brokering actions to take place against the correct server (such as a 'xfer' which needs to happen against the source backend). """ # TODO: We are currently compatible with one IMAP backend technology per # deployment. backend = conf.get('kolab', 'imap_backend') if not domain == None: self.domain = domain if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') if uri == None: if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): uri = conf.get(domain, 'imap_uri') else: self.domain = None scheme = None hostname = None port = None if uri == None: uri = conf.get(backend, 'uri') result = urlparse(uri) if hasattr(result, 'netloc'): scheme = result.scheme if len(result.netloc.split(':')) > 1: hostname = result.netloc.split(':')[0] port = result.netloc.split(':')[1] else: hostname = result.netloc elif hasattr(result, 'hostname'): hostname = result.hostname else: scheme = uri.split(':')[0] (hostname, port) = uri.split('/')[2].split(':') if not server == None: hostname = server if scheme == None or scheme == "": scheme = 'imaps' if port == None: if scheme == "imaps": port = 993 elif scheme == "imap": port = 143 else: port = 993 uri = '%s://%s:%s' % (scheme, hostname, port) # Get the credentials admin_login = conf.get(backend, 'admin_login') admin_password = conf.get(backend, 'admin_password') if admin_password == None or admin_password == '': log.error(_("No administrator password is available.")) if not self._imap.has_key(hostname): if backend == 'cyrus-imap': import cyrus self._imap[hostname] = cyrus.Cyrus(uri) # Actually connect if login: log.debug(_("Logging on to Cyrus IMAP server %s") % (hostname), level=8) self._imap[hostname].login(admin_login, admin_password) self._imap[hostname].logged_in = True elif backend == 'dovecot': import dovecot self._imap[hostname] = dovecot.Dovecot(uri) # Actually connect if login: log.debug(_("Logging on to Dovecot IMAP server %s") % (hostname), level=8) self._imap[hostname].login(admin_login, admin_password) self._imap[hostname].logged_in = True else: import imaplib self._imap[hostname] = imaplib.IMAP4(hostname, port) # Actually connect if login: log.debug(_("Logging on to generic IMAP server %s") % (hostname), level=8) self._imap[hostname].login(admin_login, admin_password) self._imap[hostname].logged_in = True else: if not login: self.disconnect(hostname) self.connect(uri=uri,login=False) elif login and not hasattr(self._imap[hostname],'logged_in'): self.disconnect(hostname) self.connect(uri=uri) else: try: if hasattr(self._imap[hostname], 'm'): self._imap[hostname].m.noop() elif hasattr(self._imap[hostname], 'noop') and callable(self._imap[hostname].noop): self._imap[hostname].noop() log.debug(_("Reusing existing IMAP server connection to %s") % (hostname), level=8) except: log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8) self.disconnect(hostname) self.connect() # Set the newly created technology specific IMAP library as the current # IMAP connection to be used. self.imap = self._imap[hostname] def disconnect(self, server=None): if server == None: # No server specified, but make sure self.imap is None anyways if hasattr(self, 'imap'): del self.imap else: if self._imap.has_key(server): del self._imap[server] else: log.warning(_("Called imap.disconnect() on a server that we had no connection to.")) def create_folder(self, folder_path, server=None, partition=None): folder_path = self.folder_utf7(folder_path) if not server == None: self.connect(server=server) try: self._imap[server].cm(folder_path, partition=partition) return True except: log.error( _("Could not create folder %r on server %r") % ( folder_path, server ) ) else: try: self.imap.cm(folder_path, partition=partition) return True except: log.error(_("Could not create folder %r") % (folder_path)) return False def __getattr__(self, name): if hasattr(self.imap, name): return getattr(self.imap, name) elif hasattr(self.imap, 'm'): if hasattr(self.imap.m, name): return getattr(self.imap.m, name) else: raise AttributeError, _("%r has no attribute %s") % (self,name) else: raise AttributeError, _("%r has no attribute %s") % (self,name) def folder_utf7(self, folder): from pykolab import imap_utf7 return imap_utf7.encode(folder) def folder_utf8(self, folder): from pykolab import imap_utf7 return imap_utf7.decode(folder) def folder_quote(self, folder): return u'"' + str(folder).strip('"') + '"' def get_metadata(self, folder): """ Obtain all metadata entries on a folder """ metadata = {} _metadata = self.imap.getannotation(self.folder_utf7(folder), '*') for (k,v) in _metadata.items(): metadata[self.folder_utf8(k)] = v return metadata def get_separator(self): if not hasattr(self, 'imap') or self.imap == None: self.connect() if hasattr(self.imap, 'separator'): return self.imap.separator elif hasattr(self.imap, 'm') and hasattr(self.imap.m, 'separator'): return self.imap.m.separator else: return '/' def imap_murder(self): if hasattr(self.imap, 'murder') and self.imap.murder: return True else: return False def namespaces(self): """ Obtain the namespaces. Returns a tuple of: (str(personal) [, str(other users) [, list(shared)]]) """ _personal = None _other_users = None _shared = None (_response, _namespaces) = self.imap.m.namespace() if len(_namespaces) == 1: _namespaces = _namespaces[0] _namespaces = re.split(r"\)\)\s\(\(", _namespaces) if len(_namespaces) >= 3: _shared = [] _shared.append(' '.join(_namespaces[2].replace('((','').replace('))','').split()[:-1]).replace('"', '')) if len(_namespaces) >= 2: _other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '') if len(_namespaces) >= 1: _personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '') return (_personal, _other_users, _shared) def set_acl(self, folder, identifier, acl): """ Set an ACL entry on a folder. """ _acl = '' short_rights = { 'all': 'lrsedntxakcpiw', 'append': 'wip', 'full': 'lrswipkxtecdn', 'read': 'lrs', 'read-only': 'lrs', 'read-write': 'lrswitedn', 'post': 'p', 'semi-full': 'lrswit', 'write': 'lrswite', } if short_rights.has_key(acl): acl = short_rights[acl] else: for char in acl: if char in "-+": continue if not char in short_rights['all']: log.error(_("Invalid access identifier %r for subject %r") % (acl, identifier)) return False # Special treatment for '-' and '+' characters if '+' in acl or '-' in acl: acl_map = { 'set': '', 'subtract': '', 'add': '' } mode = 'set' for char in acl: if char == '-': mode = 'subtract' continue if char == '+': mode = 'add' continue acl_map[mode] += char current_acls = self.imap.lam(self.folder_utf7(folder)) for current_acl in current_acls.keys(): if current_acl == identifier: _acl = current_acls[current_acl] break _acl = _acl + acl_map['set'] + acl_map['add'] _acl = [x for x in _acl.split() if x not in acl_map['subtract'].split()] acl = ''.join(list(set(_acl))) try: self.imap.sam(self.folder_utf7(folder), identifier, acl) except Exception, errmsg: log.error( _("Could not set ACL for %s on folder %s: %r") % ( identifier, folder, errmsg ) ) def set_metadata(self, folder, metadata_path, metadata_value, shared=True): """ Set a metadata entry on a folder """ if metadata_path.startswith('/shared/'): shared = True - - if metadata_path.startswith('/shared/'): metadata_path = metadata_path.replace('/shared/', '/') elif metadata_path.startswith('/private/'): shared = False metadata_path = metadata_path.replace('/private/', '/') self.imap._setannotation(self.folder_utf7(folder), metadata_path, metadata_value, shared) def shared_folder_create(self, folder_path, server=None): """ Create a shared folder. """ folder_name = "shared%s%s" % (self.get_separator(), folder_path) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] log.info(_("Creating new shared folder %s") %(folder_name)) self.create_folder(folder_name, server) def shared_folder_exists(self, folder_path): """ Check if a shared mailbox exists. """ folder_name = 'shared%s%s' % (self.get_separator(), folder_path) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] return self.has_folder(folder_name) def shared_folder_set_type(self, folder_path, folder_type): folder_name = 'shared%s%s' % (self.get_separator(), folder_path) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] self.set_metadata(folder_name, '/shared/vendor/kolab/folder-type', folder_type) def shared_mailbox_create(self, mailbox_base_name, server=None): """ Create a shared folder. """ self.connect() folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] log.info(_("Creating new shared folder %s") %(mailbox_base_name)) self.create_folder(folder_name, server) def shared_mailbox_exists(self, mailbox_base_name): """ Check if a shared mailbox exists. """ self.connect() folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) # Correct folder_path being supplied with "shared/shared/" for example if folder_name.startswith("shared%s" % (self.get_separator()) * 2): folder_name = folder_name[7:] return self.has_folder(folder_name) def user_mailbox_create(self, mailbox_base_name, server=None): """ Create a user mailbox. Returns the full path to the new mailbox folder. """ # TODO: Whether or not to lowercase the mailbox name is really up to the # IMAP server setting username_tolower (normalize_uid, lmtp_downcase_rcpt). self.connect() if not mailbox_base_name == mailbox_base_name.lower(): log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) mailbox_base_name = mailbox_base_name.lower() folder_name = "user%s%s" % (self.get_separator(), mailbox_base_name) log.info(_("Creating new mailbox for user %s") %(mailbox_base_name)) self.create_folder(folder_name, server) # In a Cyrus IMAP Murder topology, wait for the murder to have settled if self.imap_murder(): self.disconnect() self.connect() created = False last_log = time.time() while not created: created = self.has_folder(folder_name) if not created: if time.time() - last_log > 5: log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) last_log = time.time() time.sleep(0.5) _additional_folders = None if not hasattr(self, 'domain'): self.domain = None if self.domain == None and len(mailbox_base_name.split('@')) > 1: self.domain = mailbox_base_name.split('@')[1] if not self.domain == None: if conf.has_option(self.domain, "autocreate_folders"): _additional_folders = conf.get_raw( self.domain, "autocreate_folders" ) else: from pykolab.auth import Auth auth = Auth() auth.connect() domains = auth.list_domains(self.domain) auth.disconnect() if len(domains.keys()) > 0: if domains.has_key(self.domain): primary = domains[self.domain] if conf.has_option(primary, "autocreate_folders"): _additional_folders = conf.get_raw( primary, "autocreate_folders" ) if _additional_folders == None: if conf.has_option('kolab', "autocreate_folders"): _additional_folders = conf.get_raw( 'kolab', "autocreate_folders" ) additional_folders = conf.plugins.exec_hook( "create_user_folders", kw={ 'folder': folder_name, 'additional_folders': _additional_folders } ) if not additional_folders == None: self.user_mailbox_create_additional_folders( mailbox_base_name, additional_folders ) if not self.domain == None: if conf.has_option(self.domain, "sieve_mgmt"): sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') if utils.true_or_false(sieve_mgmt_enabled): conf.plugins.exec_hook( 'sieve_mgmt_refresh', kw={ 'user': mailbox_base_name } ) return folder_name def user_mailbox_create_additional_folders(self, user, additional_folders): log.debug( _("Creating additional folders for user %s") % (user), level=8 ) backend = conf.get('kolab', 'imap_backend') admin_login = conf.get(backend, 'admin_login') admin_password = conf.get(backend, 'admin_password') folder = 'user%s%s' % (self.get_separator(), user) if self.imap_murder(): server = self.user_mailbox_server(folder) else: server = None success = False last_log = time.time() while not success: try: self.disconnect() self.connect(login=False, server=server) self.login_plain(admin_login, admin_password, user) (personal, other, shared) = self.namespaces() success = True except Exception, errmsg: if time.time() - last_log > 5 and self.imap_murder(): log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg)) last_log = time.time() if conf.debuglevel > 8: import traceback traceback.print_exc() time.sleep(0.5) for additional_folder in additional_folders.keys(): _add_folder = {} folder_name = additional_folder if not folder_name.startswith(personal): log.error(_("Correcting additional folder name from %r to %r") % (folder_name, "%s%s" % (personal, folder_name))) folder_name = "%s%s" % (personal, folder_name) try: self.create_folder(folder_name) created = False last_log = time.time() while not created: created = self.has_folder(folder_name) if not created: if time.time() - last_log > 5: log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) last_log = time.time() time.sleep(0.5) except: log.warning(_("Mailbox already exists: %s") % (folder_name)) if conf.debuglevel > 8: import traceback traceback.print_exc() continue if additional_folders[additional_folder].has_key("annotations"): for annotation in additional_folders[additional_folder]["annotations"].keys(): self.set_metadata( folder_name, "%s" % (annotation), "%s" % (additional_folders[additional_folder]["annotations"][annotation]) ) if additional_folders[additional_folder].has_key("acls"): for acl in additional_folders[additional_folder]["acls"].keys(): self.set_acl( folder_name, "%s" % (acl), "%s" % (additional_folders[additional_folder]["acls"][acl]) ) if len(user.split('@')) > 1: localpart = user.split('@')[0] domain = user.split('@')[1] domain_suffix = "@%s" % (domain) else: localpart = user domain = None domain_suffix = "" if not domain == None: if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): backend = conf.get(domain, 'imap_backend') if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): uri = conf.get(domain, 'imap_uri') else: uri = None log.debug(_("Subscribing user to the additional folders"), level=8) _tests = [] # Subscribe only to personal folders (personal, other, shared) = self.namespaces() if not other == None: _tests.append(other) if not shared == None: for _shared in shared: _tests.append(_shared) log.debug(_("Using the following tests for folder subscriptions:"), level=8) for _test in _tests: log.debug(_(" %r") % (_test), level=8) for _folder in self.lm(): log.debug(_("Folder %s") % (_folder), level=8) _subscribe = True for _test in _tests: if not _subscribe: continue if _folder.startswith(_test): _subscribe = False if _subscribe: log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8) try: self.subscribe(_folder) except Exception, errmsg: log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) self.logout() self.connect(domain=self.domain) for additional_folder in additional_folders.keys(): if additional_folder.startswith(personal) and not personal == '': folder_name = additional_folder.replace(personal, '') else: folder_name = additional_folder folder_name = "user%s%s%s%s%s" % ( self.get_separator(), localpart, self.get_separator(), folder_name, domain_suffix ) if additional_folders[additional_folder].has_key("quota"): try: self.imap.sq( folder_name, additional_folders[additional_folder]['quota'] ) except Exception, errmsg: log.error(_("Could not set quota on %s") % (additional_folder)) if additional_folders[additional_folder].has_key("partition"): partition = additional_folders[additional_folder]["partition"] try: self.imap._rename(folder_name, folder_name, partition) except: log.error(_("Could not rename %s to reside on partition %s") % (folder_name, partition)) def user_mailbox_delete(self, mailbox_base_name): """ Delete a user mailbox. """ self.connect() folder = "user%s%s" %(self.get_separator(),mailbox_base_name) self.delete_mailfolder(folder) self.cleanup_acls(mailbox_base_name) def user_mailbox_exists(self, mailbox_base_name): """ Check if a user mailbox exists. """ self.connect() if not mailbox_base_name == mailbox_base_name.lower(): log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) mailbox_base_name = mailbox_base_name.lower() return self.has_folder('user%s%s' %(self.get_separator(), mailbox_base_name)) def user_mailbox_quota(self, mailbox_quota): pass def user_mailbox_rename(self, old_name, new_name, partition=None): self.connect() old_name = "user%s%s" % (self.get_separator(),old_name) new_name = "user%s%s" % (self.get_separator(),new_name) if old_name == new_name and partition == None: return if not self.has_folder(old_name): log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) if not self.has_folder(new_name) or not partition == None: log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) try: self.imap.rename(old_name,new_name,partition) except: log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name)) else: log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) def user_mailbox_server(self, mailbox): server = self.imap.find_mailfolder_server(mailbox.lower()).lower() log.debug(_("Server for mailbox %r is %r") % (mailbox, server), level=8) return server def has_folder(self, folder): """ Check if the environment has a folder named folder. """ folders = self.imap.lm(self.folder_utf7(folder)) log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8) # Greater then one, this folder may have subfolders. if len(folders) > 0: return True else: return False def _set_kolab_mailfolder_acls(self, acls, folder=None, update=False): # special case, folder has no ACLs assigned and update was requested, # remove all existing ACL entries if update is True and isinstance(acls, list) and len(acls) == 0: acls = self.list_acls(folder) for subject in acls: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ "%s") % (acls[subject], subject, folder), level=8) self.set_acl(folder, subject, '') return if isinstance(acls, basestring): acls = [ acls ] old_acls = None for acl in acls: exec("acl = %s" % (acl)) subject = acl[0] rights = acl[1] if len(acl) == 3: epoch = acl[2] else: epoch = (int)(time.time()) + 3600 # update mode, check existing entries if update is True: if old_acls is None: old_acls = self.list_acls(folder) for old_subject in old_acls: old_acls[old_subject] = old_acls[old_subject] if subject in old_acls: old_acls[subject] = None if epoch > (int)(time.time()): log.debug( _("Setting ACL rights %s for subject %s on folder " + \ "%s") % (rights,subject,folder), level=8) self.set_acl( folder, "%s" % (subject), "%s" % (rights) ) else: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ "%s") % (rights,subject,folder), level=8) self.set_acl( folder, "%s" % (subject), "" ) # update mode, unset removed ACL entries if old_acls is not None: for subject in old_acls: if old_acls[subject] is not None: log.debug( _("Removing ACL rights %s for subject %s on folder " + \ "%s") % (old_acls[subject], subject, folder), level=8) self.set_acl(folder, subject, '') pass """ Blah functions """ def move_user_folders(self, users=[], domain=None): for user in users: if type(user) == dict: if user.has_key('old_mail'): inbox = "user/%s" % (user['mail']) old_inbox = "user/%s" % (user['old_mail']) if self.has_folder(old_inbox): log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) if not self.has_folder(inbox): log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) self.imap.rename(old_inbox,inbox) self.inbox_folders.append(inbox) else: log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) else: log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) else: log.debug(_("Value for user is not a dictionary"), level=8) def set_quota(self, folder, quota): i = 0 while i < 10: try: self.imap._setquota(folder, quota) i = 10 except: self.disconnect() self.connect() i += 1 def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): """ Sets the quota in IMAP using the authentication and authorization database 'quota' attribute for the users listed in parameter 'users' """ if conf.has_option(primary_domain, 'quota_attribute'): _quota_attr = conf.get(primary_domain, 'quota_attribute') else: auth_mechanism = conf.get('kolab', 'auth_mechanism') _quota_attr = conf.get(auth_mechanism, 'quota_attribute') _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') default_quota = auth.domain_default_quota(primary_domain) if default_quota == "" or default_quota == None: default_quota = 0 if len(users) == 0: users = auth.list_users(primary_domain) for user in users: quota = None if type(user) == dict: if user.has_key(_quota_attr): if type(user[_quota_attr]) == list: quota = user[_quota_attr].pop(0) elif type(user[_quota_attr]) == str: quota = user[_quota_attr] else: _quota = auth.get_user_attribute(primary_domain, user, _quota_attr) if _quota == None: quota = 0 else: quota = _quota if not user.has_key(_inbox_folder_attr): continue else: if type(user[_inbox_folder_attr]) == list: folder = "user/%s" % user[_inbox_folder_attr].pop(0) elif type(user[_inbox_folder_attr]) == str: folder = "user/%s" % user[_inbox_folder_attr] elif type(user) == str: quota = auth.get_user_attribute(user, _quota_attr) folder = "user/%s" % (user) folder = folder.lower() try: (used,current_quota) = self.imap.lq(folder) except: # TODO: Go in fact correct the quota. log.warning(_("Cannot get current IMAP quota for folder %s") % (folder)) used = 0 current_quota = 0 new_quota = conf.plugins.exec_hook("set_user_folder_quota", kw={ 'used': used, 'current_quota': current_quota, 'new_quota': (int)(quota), 'default_quota': (int)(default_quota), 'user': user } ) log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7) if new_quota == None: continue if not int(new_quota) == int(quota): log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota))) quota = int(new_quota) auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) if not int(current_quota) == int(quota): log.info(_("Correcting quota for %s to %s (currently %s)") % (folder, quota, current_quota)) self.imap._setquota(folder, quota) def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): if conf.has_option(primary_domain, 'mailserver_attribute'): _mailserver_attr = conf.get(primary_domain, 'mailserver_attribute') else: auth_mechanism = conf.get('kolab', 'auth_mechanism') _mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute') _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') if len(users) == 0: users = auth.list_users(primary_domain) for user in users: mailhost = None if type(user) == dict: if user.has_key(_mailserver_attr): if type(user[_mailserver_attr]) == list: _mailserver = user[_mailserver_attr].pop(0) elif type(user[_mailserver_attr]) == str: _mailserver = user[_mailserver_attr] else: _mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr) if not user.has_key(_inbox_folder_attr): continue else: if type(user[_inbox_folder_attr]) == list: folder = "user/%s" % user[_inbox_folder_attr].pop(0) elif type(user[_inbox_folder_attr]) == str: folder = "user/%s" % user[_inbox_folder_attr] elif type(user) == str: _mailserver = auth.get_user_attribute(user, _mailserver_attr) folder = "user/%s" % (user) folder = folder.lower() _current_mailserver = self.imap.find_mailfolder_server(folder) if not _mailserver == None: # TODO: if not _current_mailserver == _mailserver: self.imap._xfer(folder, _current_mailserver, _mailserver) else: auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver) def parse_mailfolder(self, mailfolder): return self.imap.parse_mailfolder(mailfolder) def expunge_user_folders(self, inbox_folders=None): """ Delete folders that have no equivalent user qualifier in the list of users passed to this function, ... TODO: Explain the domain parameter, and actually get it to work properly. This also relates to threading for multi-domain deployments. Parameters: users A list of users. Can be a list of user qualifiers, e.g. [ 'user1', 'user2' ] or a list of user attribute dictionaries, e.g. [ { 'user1': { 'attr': 'value' } } ] primary_domain, secondary_domains """ if inbox_folders == None: inbox_folders = [] folders = self.list_user_folders() for folder in folders: log.debug(_("Checking folder: %s") % (folder), level=1) try: if inbox_folders.index(folder) > -1: continue else: log.info(_("Folder has no corresponding user (1): %s") % (folder)) self.delete_mailfolder("user/%s" % (folder)) except: log.info(_("Folder has no corresponding user (2): %s") % (folder)) try: self.delete_mailfolder("user/%s" % (folder)) except: pass def delete_mailfolder(self, mailfolder_path): """ Deletes a mail folder described by mailfolder_path. """ mbox_parts = self.parse_mailfolder(mailfolder_path) if mbox_parts == None: # We got user identifier only log.error(_("Please don't give us just a user identifier")) return log.info(_("Deleting folder %s") % (mailfolder_path)) self.imap.dm(self.folder_utf7(mailfolder_path)) def get_quota(self, mailfolder_path): try: return self.lq(self.folder_utf7(mailfolder_path)) except: return def get_quota_root(self, mailfolder_path): return self.lqr(self.folder_utf7(mailfolder_path)) def list_acls(self, folder): """ List the ACL entries on a folder """ return self.imap.lam(self.folder_utf7(folder)) def list_folders(self, pattern): return [self.folder_utf8(x) for x in self.lm(self.folder_utf7(pattern))] def list_user_folders(self, primary_domain=None, secondary_domains=[]): """ List the INBOX folders in the IMAP backend. Returns a list of unique base folder names. """ _folders = self.imap.lm("user/%") # TODO: Replace the .* below with a regex representing acceptable DNS # domain names. domain_re = ".*\.?%s$" acceptable_domain_name_res = [] if not primary_domain == None: for domain in [ primary_domain ] + secondary_domains: acceptable_domain_name_res.append(domain_re % (domain)) folders = [] for folder in _folders: folder_name = None if len(folder.split('@')) > 1: # TODO: acceptable domain name spaces #acceptable = False #for domain_name_re in acceptable_domain_name_res: #prog = re.compile(domain_name_re) #if prog.match(folder.split('@')[1]): #print "Acceptable indeed" #acceptable = True #if not acceptable: #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1],folder,domain_name_re) #if acceptable: #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0],folder.split('@')[1]) folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0],folder.split('@')[1]) else: folder_name = "%s" % (folder.split(self.get_separator())[1]) if not folder_name == None: if not folder_name in folders: folders.append(folder_name) return folders def lm(self, *args, **kw): return self.imap.lm(*args, **kw) def lq(self, *args, **kw): return self.imap.lq(*args, **kw) def lqr(self, *args, **kw): try: return self.imap.lqr(*args, **kw) except: return (None, None, None) def undelete_mailfolder(self, *args, **kw): self.imap.undelete_mailfolder(*args, **kw) diff --git a/pykolab/imap/dovecot.py b/pykolab/imap/dovecot.py index 226e6f1..8d5c0a5 100644 --- a/pykolab/imap/dovecot.py +++ b/pykolab/imap/dovecot.py @@ -1,632 +1,634 @@ # -*- coding: utf-8 -*- # Copyright 2015 Instituto Tecnológico de Informática (http://www.iti.es) # # Sergio Talens-Oliag (ITI) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # ----- # Note: # # This file is based on the original cyrus.py driver from Kolab, # replacing annotation related functions with metadata functions; to use it # on a debian installation it can be copied to the path: # # /usr/share/pyshared/pykolab/imap/dovecot.py # # The file needs some review, as some functions have been modified to behave # as we want, but the real changes should be done on other places. # # As an example, with annotations you can get all existing annotations with # one call, but if we use metadatata we have to ask for specific variables, # there is no function to get all of them at once (at least on the RFC); in # our case when a pattern like '*' is received we look for fields of the form # 'vendor/kolab/folder-type', as we know they are the fields the functions we # are using need. # ----- import cyruslib import imaplib import sys import time from urlparse import urlparse import pykolab from pykolab.imap import IMAP from pykolab.translate import _ log = pykolab.getLogger('pykolab.imap') conf = pykolab.getConf() # BEG: Add GETMETADATA and SETMETADATA support to the cyruslib IMAP objects Commands = { 'GETMETADATA': ('AUTH',), 'SETMETADATA': ('AUTH',), } imaplib.Commands.update(Commands) def imap_getmetadata(self, mailbox, pattern='*', shared=None): # If pattern is '*' clean pattern and search all entries under /shared # and/or /private (depens on the shared parameter value) to emulate the # ANNOTATEMORE behaviour if pattern == '*': pattern = '' options = '(DEPTH infinity)' else: options = '(DEPTH 0)' if shared == None: entries = '( /shared%s /private%s )' % (pattern, pattern) elif shared: entries = "/shared%s" % pattern else: entries = " /private%s" % pattern typ, dat = self._simple_command('GETMETADATA', options, mailbox, entries) + return self._untagged_response(typ, dat, 'METADATA') def imap_setmetadata(self, mailbox, desc, value, shared=False): if value: - value = quote(value) + value = value.join(['"', '"']) else: value = "NIL" if shared: typ, dat = self._simple_command('SETMETADATA', mailbox, - "(/shared/%s %s)" % (desc,value)) + "(/shared%s %s)" % (desc,value)) else: typ, dat = self._simple_command('SETMETADATA', mailbox, - "(/private/%s %s)" % (desc,value)) + "(/private%s %s)" % (desc,value)) + return self._untagged_response(typ, dat, 'METADATA') # Bind the new methods to the cyruslib IMAP4 and IMAP4_SSL objects from types import MethodType cyruslib.IMAP4.getmetadata = MethodType(imap_getmetadata, None, cyruslib.IMAP4) cyruslib.IMAP4.setmetadata = MethodType(imap_setmetadata, None, cyruslib.IMAP4) cyruslib.IMAP4_SSL.getmetadata = MethodType(imap_getmetadata, None, cyruslib.IMAP4_SSL) cyruslib.IMAP4_SSL.setmetadata = MethodType(imap_setmetadata, None, cyruslib.IMAP4_SSL) # END: Add GETMETADATA and SETMETADATA support to the cyruslib IMAP objects # Auxiliary functions def _get_line_entries(lines): """Function to get metadata entries """ entries = {} name = None value = "" vlen = 0 for line in lines: line_len = len(line) i = 0 while i < line_len: if name == None: if line[i] == '/': j = i while j < line_len: if line[j] == ' ': break j += 1 name = line[i:j] i = j elif vlen != 0: j = i + vlen if j > line_len: value += line[i:line_len] vlen -= line_len - i else: value += line[i:i+vlen] if value in ('', 'NIL'): entries[name] = "" else: entries[name] = value name = None value = "" vlen = 0 elif line[i] == '{': j = i while j < line_len: if line[j] == '}': vlen = int(line[i+1:j]) break j += 1 i = j elif line[i] != ' ': j = i if line[i] == '"': while j < line_len: # Skip quoted text if line[j] == '\\': j += 2 continue elif line[j] == '"': break j += 1 else: while j < line_len: if line[j] == ' ' or line[j] == ')': break j += 1 value = line[i:j] if value in ('', 'NIL'): entries[name] = "" else: entries[name] = value name = None value = "" i = j i += 1 return entries class Dovecot(cyruslib.CYRUS): """ Abstraction class for some common actions to do exclusively in Dovecot. Initially based on the Cyrus driver, will remove dependencies on cyruslib later; right now this module has only been tested to use the dovecot metadata support (no quota or folder operations tests have been performed). """ setquota = cyruslib.CYRUS.sq def __init__(self, uri): """ Initialize this class, but do not connect yet. """ port = None result = urlparse(uri) if hasattr(result, 'hostname'): scheme = result.scheme hostname = result.hostname port = result.port else: scheme = uri.split(':')[0] (hostname, port) = uri.split('/')[2].split(':') if not port: if scheme == 'imap': port = 143 else: port = 993 self.server = hostname self.uri = "%s://%s:%s" % (scheme,hostname,port) while 1: try: cyruslib.CYRUS.__init__(self, self.uri) break except cyruslib.CYRUSError: log.warning(_("Could not connect to Dovecot IMAP server %r") % (self.uri)) time.sleep(10) if conf.debuglevel > 8: self.VERBOSE = True self.m.debug = 5 # Initialize our variables self.separator = self.SEP # Placeholder for known mailboxes on known servers self.mbox = {} # By default don't assume that we have metadata support self.metadata = False def __del__(self): pass def __verbose(self, msg): if self.VERBOSE: print >> self.LOGFD, msg def connect(self, uri): """ Dummy connect function that checks if the server that we want to connect to is actually the server we are connected to. Uses pykolab.imap.IMAP.connect() in the background. """ port = None result = urlparse(uri) if hasattr(result, 'hostname'): scheme = result.scheme hostname = result.hostname port = result.port else: scheme = uri.split(':')[0] (hostname, port) = uri.split('/')[2].split(':') if not port: if scheme == 'imap': port = 143 else: port = 993 if hostname == self.server: return imap = IMAP() imap.connect(uri=uri) if not self.SEP == self.separator: self.separator = self.SEP def login(self, *args, **kw): """ Login to the Dovecot IMAP server through cyruslib.CYRUS, but set our hierarchy separator. """ cyruslib.CYRUS.login(self, *args, **kw) self.separator = self.SEP log.debug(_("Continuing with separator: %r") % (self.separator), level=8) # Check if we have metadata support or not self.metadata = False typ, dat = self.m.capability() for capability in tuple(dat[-1].upper().split()): if capability.startswith("METADATA"): log.debug(_("Detected METADATA support"), level=8) self.metadata = True if not self.metadata: log.debug(_("This system does not support METADATA: '%s'" % ','.join(self.m.capabilities)), level=8) def find_mailfolder_server(self, mailfolder): # Nothing to do in dovecot, returns the current server return self.server def folder_utf7(self, folder): from pykolab import imap_utf7 return imap_utf7.encode(folder) def folder_utf8(self, folder): from pykolab import imap_utf7 return imap_utf7.decode(folder) def _setquota(self, mailfolder, quota): # Removed server reconnection for dovecot, we only have one server log.debug(_("Setting quota for folder %s to %s") % (mailfolder,quota), level=8) try: self.m.setquota(mailfolder, quota) except: log.error(_("Could not set quota for mailfolder %s") % (mailfolder)) def _rename(self, from_mailfolder, to_mailfolder, partition=None): # Removed server reconnection for dovecot, we only have one server if not partition == None: log.debug(_("Moving INBOX folder %s to %s on partition %s") % (from_mailfolder,to_mailfolder, partition), level=8) else: log.debug(_("Moving INBOX folder %s to %s") % (from_mailfolder,to_mailfolder), level=8) self.m.rename(self.folder_utf7(from_mailfolder), self.folder_utf7(to_mailfolder), '"%s"' % (partition)) # BEG: METADATA support functions ... quite similar to annotations, really def _getmetadata(self, mailbox, pattern='*', shared=None): """Get Metadata""" # This test needs to be reviewed #if not self.metadata: # return {} # Annotations vs. Metadata fix ... we set a pattern that we know is # good enough for our purposes for now, but the fact is that the # calling programs should be fixed instead. res, data = self.m.getmetadata(self.decode(mailbox), pattern, shared) if (len(data) == 1) and data[0] is None: self.__verbose( '[GETMETADATA %s] No results' % (mailbox) ) return {} # Get the first response line (it can be a string or a tuple) if isinstance(data[0], tuple): fline = data[0][0] else: fline = data[0] # Find the folder name fbeg = 0 fend = -1 if fline[0] == '"': # Quoted name fbeg = 1 i = 1 while i < len(fline): if fline[i] == '"': # folder name ended unless the previous char is \ (we # should test more, this test would fail if we had a \ # at the end of the folder name, but we leave it at that # right now if fline[i-1] != '\\': fend = i break i += 1 else: # For unquoted names the first word is the folder name fend = fline.find(' ') # No mailbox found if fend < 0: self.__verbose( '[GETMETADATA %s] Mailbox not found in results' % (mailbox) ) return {} # Folder name folder = fline[fbeg:fend] # Check mailbox name against the folder name if folder != mailbox: quoted_mailbox = "\"%s\"" % (mailbox) if folder != quoted_mailbox: self.__verbose( '[GETMETADATA %s] Mailbox \'%s\' is not the same as \'%s\'' \ % (mailbox, quoted_mailbox, folder) ) return {} # Process the rest of the first line, the first value will be # available after the first '(' found i=fend ebeg = -1 while i < len(fline): if fline[i] == '(': ebeg = i+1 break i += 1 if ebeg < 0: self.__verbose( '[GETMETADATA %s] Mailbox has no values, skipping' % (mailbox) ) return {} # This variable will start with an entry name and will continue with # the value lenght or the value nfline = fline[ebeg:] if isinstance(data[0], tuple): entries = _get_line_entries((nfline,) + data[0][1:]) else: entries = _get_line_entries((nfline,)) for line in data[1:]: if isinstance(line, tuple): lentries = _get_line_entries(line) else: lentries = _get_line_entries([line,]) if lentries != None and lentries != {}: entries.update(lentries) mdat = { mailbox: entries }; return mdat def _setmetadata(self, mailbox, desc, value, shared=False): """Set METADADATA""" res, msg = self.m.setmetadata(self.decode(mailbox), desc, value, shared) self.__verbose( '[SETMETADATA %s] %s: %s' % (mailbox, res, msg[0]) ) # Use metadata instead of annotations def _getannotation(self, *args, **kw): return self._getmetadata(*args, **kw) def getannotation(self, *args, **kw): return self._getmetadata(*args, **kw) # Use metadata instead of annotations def _setannotation(self, *args, **kw): return self._setmetadata(*args, **kw) def setannotation(self, *args, **kw): return self._setmetadata(*args, **kw) # END: METADATA / Annotations # The functions that follow are the same ones used with Cyrus, probably a # review is needed def _xfer(self, mailfolder, current_server, new_server): self.connect(self.uri.replace(self.server,current_server)) log.debug(_("Transferring folder %s from %s to %s") % (mailfolder, current_server, new_server), level=8) self.xfer(mailfolder, new_server) def undelete_mailfolder(self, mailfolder, to_mailfolder=None, recursive=True): """ Login to the actual backend server, then "undelete" the mailfolder. 'mailfolder' may be a string representing either of the following two options; - the fully qualified pathof the deleted folder in its current location, such as, for a deleted INBOX folder originally known as "user/userid[@domain]"; "DELETED/user/userid/hex[@domain]" - the original folder name, such as; "user/userid[@domain]" 'to_mailfolder' may be the target folder to "undelete" the deleted folder to. If not specified, the original folder name is used. """ # Placeholder for folders we have recovered already. target_folders = [] mailfolder = self.parse_mailfolder(mailfolder) undelete_folders = self._find_deleted_folder(mailfolder) if not to_mailfolder == None: target_mbox = self.parse_mailfolder(to_mailfolder) else: target_mbox = mailfolder for undelete_folder in undelete_folders: undelete_mbox = self.parse_mailfolder(undelete_folder) prefix = undelete_mbox['path_parts'].pop(0) mbox = undelete_mbox['path_parts'].pop(0) if to_mailfolder == None: target_folder = self.separator.join([prefix,mbox]) else: target_folder = self.separator.join(target_mbox['path_parts']) if not to_mailfolder == None: target_folder = "%s%s%s" % (target_folder,self.separator,mbox) if not len(undelete_mbox['path_parts']) == 0: target_folder = "%s%s%s" % (target_folder,self.separator,self.separator.join(undelete_mbox['path_parts'])) if target_folder in target_folders: target_folder = "%s%s%s" % (target_folder,self.separator,undelete_mbox['hex_timestamp']) target_folders.append(target_folder) if not target_mbox['domain'] == None: target_folder = "%s@%s" % (target_folder,target_mbox['domain']) log.info(_("Undeleting %s to %s") % (undelete_folder,target_folder)) target_server = self.find_mailfolder_server(target_folder) if hasattr(conf,'dry_run') and not conf.dry_run: if not target_server == self.server: self.xfer(undelete_folder,target_server) self.rename(undelete_folder,target_folder) else: if not target_server == self.server: print >> sys.stdout, _("Would have transfered %s from %s to %s") % (undelete_folder, self.server, target_server) print >> sys.stdout, _("Would have renamed %s to %s") % (undelete_folder, target_folder) def parse_mailfolder(self, mailfolder): """ Parse a mailfolder name to it's parts. Takes a fully qualified mailfolder or mailfolder sub-folder. """ mbox = { 'domain': None } if len(mailfolder.split('/')) > 1: self.separator = '/' # Split off the virtual domain identifier, if any if len(mailfolder.split('@')) > 1: mbox['domain'] = mailfolder.split('@')[1] mbox['path_parts'] = mailfolder.split('@')[0].split(self.separator) else: mbox['path_parts'] = mailfolder.split(self.separator) # See if the path that has been specified is the current location for # the deleted folder, or the original location, we have to find the deleted # folder for. if not mbox['path_parts'][0] in [ 'user', 'shared' ]: deleted_prefix = mbox['path_parts'].pop(0) # See if the hexadecimal timestamp is actually hexadecimal. # This prevents "DELETED/user/userid/Sent", but not # "DELETED/user/userid/FFFFFF" from being specified. try: epoch = int(mbox['path_parts'][(len(mbox['path_parts'])-1)], 16) try: timestamp = time.asctime(time.gmtime(epoch)) except: return None except: return None # Verify that the input for the deleted folder is actually a # deleted folder. verify_folder_search = "%(dp)s%(sep)s%(mailfolder)s" % { 'dp': deleted_prefix, 'sep': self.separator, 'mailfolder': self.separator.join(mbox['path_parts']) } if not mbox['domain'] == None: verify_folder_search = "%s@%s" % (verify_folder_search, mbox['domain']) if ' ' in verify_folder_search: folders = self.lm('"%s"' % self.folder_utf7(verify_folder_search)) else: folders = self.lm(self.folder_utf7(verify_folder_search)) # NOTE: Case also covered is valid hexadecimal folders; won't be the # actual check as intended, but doesn't give you anyone else's data # unless... See the following: # # TODO: Case not covered is usernames that are hexadecimal. # # We could probably attempt to convert the int(hex) into a time.gmtime(), # but it still would not cover all cases. # # If no folders were found... well... then there you go. if len(folders) < 1: return None # Pop off the hex timestamp, which turned out to be valid mbox['hex_timestamp'] = mbox['path_parts'].pop() return mbox def _find_deleted_folder(self, mbox): """ Give me the parts that are in an original mailfolder name and I'll find the deleted folder name. TODO: It finds virtdomain folders for non-virtdomain searches. """ deleted_folder_search = "%(deleted_prefix)s%(separator)s%(mailfolder)s%(separator)s*" % { # TODO: The prefix used is configurable 'deleted_prefix': "DELETED", 'mailfolder': self.separator.join(mbox['path_parts']), 'separator': self.separator, } if not mbox['domain'] == None: deleted_folder_search = "%s@%s" % (deleted_folder_search,mbox['domain']) folders = self.lm(deleted_folder_search) # The folders we have found at this stage include virtdomain folders. # # For example, having searched for user/userid, it will also find # user/userid@example.org # # Here, we explicitely remove any virtdomain folders. if mbox['domain'] == None: _folders = [] for folder in folders: if len(folder.split('@')) < 2: _folders.append(folder) folders = _folders return folders