Changeset View
Standalone View
pykolab/imap/__init__.py
Show All 26 Lines | |||||
import pykolab | import pykolab | ||||
from pykolab import utils | from pykolab import utils | ||||
from pykolab.translate import _ | from pykolab.translate import _ | ||||
log = pykolab.getLogger('pykolab.imap') | log = pykolab.getLogger('pykolab.imap') | ||||
conf = pykolab.getConf() | conf = pykolab.getConf() | ||||
class IMAP(object): | class IMAP(object): | ||||
Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
def __init__(self): | def __init__(self): | ||||
# Pool of named IMAP connections, by hostname | # Pool of named IMAP connections, by hostname | ||||
self._imap = {} | self._imap = {} | ||||
# Place holder for the current IMAP connection | # Place holder for the current IMAP connection | ||||
self.imap = None | self.imap = None | ||||
def cleanup_acls(self, aci_subject): | def cleanup_acls(self, aci_subject): | ||||
lm_suffix = "" | lm_suffix = "" | ||||
log.info(_("Cleaning up ACL entries for %s across all folders") % (aci_subject)) | log.info(_("Cleaning up ACL entries for %s across all folders") % (aci_subject)) | ||||
Lint: PEP8 E501 line too long (88 > 79 characters) Lint: PEP8 E501: line too long (88 > 79 characters) | |||||
if len(aci_subject.split('@')) > 1: | if len(aci_subject.split('@')) > 1: | ||||
lm_suffix = "@%s" % (aci_subject.split('@')[1]) | lm_suffix = "@%s" % (aci_subject.split('@')[1]) | ||||
shared_folders = self.imap.lm( | shared_folders = self.imap.lm( | ||||
Lint: PEP8 E303 too many blank lines (2) Lint: PEP8 E303: too many blank lines (2) | |||||
"shared/*%s" % (lm_suffix) | "shared/*%s" % (lm_suffix) | ||||
) | ) | ||||
user_folders = self.imap.lm( | user_folders = self.imap.lm( | ||||
"user/*%s" % (lm_suffix) | "user/*%s" % (lm_suffix) | ||||
) | ) | ||||
log.debug( | log.debug( | ||||
Show All 10 Lines | def cleanup_acls(self, aci_subject): | ||||
# ... loop through them and ... | # ... loop through them and ... | ||||
for folder in folders: | for folder in folders: | ||||
# ... list the ACL entries | # ... list the ACL entries | ||||
acls = self.imap.lam(folder) | acls = self.imap.lam(folder) | ||||
# For each ACL entry, see if we think it is a current, valid entry | # For each ACL entry, see if we think it is a current, valid entry | ||||
for acl_entry in acls.keys(): | for acl_entry in acls.keys(): | ||||
# If the key 'acl_entry' does not exist in the dictionary of valid | # If the key 'acl_entry' does not exist in the dictionary of valid | ||||
Lint: PEP8 E501 line too long (82 > 79 characters) Lint: PEP8 E501: line too long (82 > 79 characters) | |||||
# ACL entries, this ACL entry has got to go. | # ACL entries, this ACL entry has got to go. | ||||
if acl_entry == aci_subject: | if acl_entry == aci_subject: | ||||
# Set the ACL to '' (effectively deleting the ACL entry) | # Set the ACL to '' (effectively deleting the ACL entry) | ||||
log.debug(_("Removing acl %r for subject %r from folder %r") % (acls[acl_entry],acl_entry,folder), level=8) | log.debug(_("Removing acl %r for subject %r from folder %r") % (acls[acl_entry],acl_entry,folder), level=8) | ||||
Lint: PEP8 E501 line too long (127 > 79 characters) Lint: PEP8 E501: line too long (127 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.set_acl(folder, acl_entry, '') | self.set_acl(folder, acl_entry, '') | ||||
def connect(self, uri=None, server=None, domain=None, login=True): | def connect(self, uri=None, server=None, domain=None, login=True): | ||||
""" | """ | ||||
Connect to the appropriate IMAP backend. | Connect to the appropriate IMAP backend. | ||||
Supply a domain (name space) configured in the configuration file | Supply a domain (name space) configured in the configuration file | ||||
as a section, with a setting 'imap_uri' to connect to a domain | as a section, with a setting 'imap_uri' to connect to a domain | ||||
specific IMAP server, or specify an URI to connect to that | specific IMAP server, or specify an URI to connect to that | ||||
particular IMAP server (in that order). | particular IMAP server (in that order). | ||||
Routines sitting behind this will take into account Cyrus IMAP | Routines sitting behind this will take into account Cyrus IMAP | ||||
Murder capabilities, brokering actions to take place against the | Murder capabilities, brokering actions to take place against the | ||||
correct server (such as a 'xfer' which needs to happen against the | correct server (such as a 'xfer' which needs to happen against the | ||||
source backend). | source backend). | ||||
""" | """ | ||||
# TODO: We are currently compatible with one IMAP backend technology per | # TODO: We are currently compatible with one IMAP backend technology per | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
# deployment. | # deployment. | ||||
backend = conf.get('kolab', 'imap_backend') | backend = conf.get('kolab', 'imap_backend') | ||||
if not domain == None: | if not domain == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
self.domain = domain | self.domain = domain | ||||
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): | if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): | ||||
Lint: PEP8 E501 line too long (84 > 79 characters) Lint: PEP8 E501: line too long (84 > 79 characters) | |||||
backend = conf.get(domain, 'imap_backend') | backend = conf.get(domain, 'imap_backend') | ||||
if uri == None: | if uri == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): | if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): | ||||
Lint: PEP8 E501 line too long (84 > 79 characters) Lint: PEP8 E501: line too long (84 > 79 characters) | |||||
uri = conf.get(domain, 'imap_uri') | uri = conf.get(domain, 'imap_uri') | ||||
else: | else: | ||||
self.domain = None | self.domain = None | ||||
scheme = None | scheme = None | ||||
hostname = None | hostname = None | ||||
port = None | port = None | ||||
if uri == None: | if uri == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
uri = conf.get(backend, 'uri') | uri = conf.get(backend, 'uri') | ||||
result = urlparse(uri) | result = urlparse(uri) | ||||
if hasattr(result, 'netloc'): | if hasattr(result, 'netloc'): | ||||
scheme = result.scheme | scheme = result.scheme | ||||
if len(result.netloc.split(':')) > 1: | if len(result.netloc.split(':')) > 1: | ||||
hostname = result.netloc.split(':')[0] | hostname = result.netloc.split(':')[0] | ||||
port = result.netloc.split(':')[1] | port = result.netloc.split(':')[1] | ||||
else: | else: | ||||
hostname = result.netloc | hostname = result.netloc | ||||
elif hasattr(result, 'hostname'): | elif hasattr(result, 'hostname'): | ||||
hostname = result.hostname | hostname = result.hostname | ||||
else: | else: | ||||
scheme = uri.split(':')[0] | scheme = uri.split(':')[0] | ||||
(hostname, port) = uri.split('/')[2].split(':') | (hostname, port) = uri.split('/')[2].split(':') | ||||
if not server == None: | if not server == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
hostname = server | hostname = server | ||||
if scheme == None or scheme == "": | if scheme == None or scheme == "": | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
scheme = 'imaps' | scheme = 'imaps' | ||||
if port == None: | if port == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if scheme == "imaps": | if scheme == "imaps": | ||||
port = 993 | port = 993 | ||||
elif scheme == "imap": | elif scheme == "imap": | ||||
port = 143 | port = 143 | ||||
else: | else: | ||||
port = 993 | port = 993 | ||||
uri = '%s://%s:%s' % (scheme, hostname, port) | uri = '%s://%s:%s' % (scheme, hostname, port) | ||||
# Get the credentials | # Get the credentials | ||||
admin_login = conf.get(backend, 'admin_login') | admin_login = conf.get(backend, 'admin_login') | ||||
admin_password = conf.get(backend, 'admin_password') | admin_password = conf.get(backend, 'admin_password') | ||||
if admin_password == None or admin_password == '': | if admin_password == None or admin_password == '': | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
log.error(_("No administrator password is available.")) | log.error(_("No administrator password is available.")) | ||||
if not self._imap.has_key(hostname): | if not self._imap.has_key(hostname): | ||||
if backend == 'cyrus-imap': | if backend == 'cyrus-imap': | ||||
import cyrus | import cyrus | ||||
self._imap[hostname] = cyrus.Cyrus(uri) | self._imap[hostname] = cyrus.Cyrus(uri) | ||||
# Actually connect | # Actually connect | ||||
if login: | if login: | ||||
log.debug(_("Logging on to Cyrus IMAP server %s") % (hostname), level=8) | log.debug(_("Logging on to Cyrus IMAP server %s") % (hostname), level=8) | ||||
Lint: PEP8 E501 line too long (92 > 79 characters) Lint: PEP8 E501: line too long (92 > 79 characters) | |||||
self._imap[hostname].login(admin_login, admin_password) | self._imap[hostname].login(admin_login, admin_password) | ||||
self._imap[hostname].logged_in = True | self._imap[hostname].logged_in = True | ||||
elif backend == 'dovecot': | elif backend == 'dovecot': | ||||
import dovecot | import dovecot | ||||
self._imap[hostname] = dovecot.Dovecot(uri) | self._imap[hostname] = dovecot.Dovecot(uri) | ||||
# Actually connect | # Actually connect | ||||
if login: | if login: | ||||
log.debug(_("Logging on to Dovecot IMAP server %s") % (hostname), level=8) | log.debug(_("Logging on to Dovecot IMAP server %s") % (hostname), level=8) | ||||
Lint: PEP8 E501 line too long (94 > 79 characters) Lint: PEP8 E501: line too long (94 > 79 characters) | |||||
self._imap[hostname].login(admin_login, admin_password) | self._imap[hostname].login(admin_login, admin_password) | ||||
self._imap[hostname].logged_in = True | self._imap[hostname].logged_in = True | ||||
else: | else: | ||||
import imaplib | import imaplib | ||||
self._imap[hostname] = imaplib.IMAP4(hostname, port) | self._imap[hostname] = imaplib.IMAP4(hostname, port) | ||||
# Actually connect | # Actually connect | ||||
if login: | if login: | ||||
log.debug(_("Logging on to generic IMAP server %s") % (hostname), level=8) | log.debug(_("Logging on to generic IMAP server %s") % (hostname), level=8) | ||||
Lint: PEP8 E501 line too long (94 > 79 characters) Lint: PEP8 E501: line too long (94 > 79 characters) | |||||
self._imap[hostname].login(admin_login, admin_password) | self._imap[hostname].login(admin_login, admin_password) | ||||
self._imap[hostname].logged_in = True | self._imap[hostname].logged_in = True | ||||
else: | else: | ||||
if not login: | if not login: | ||||
self.disconnect(hostname) | self.disconnect(hostname) | ||||
self.connect(uri=uri,login=False) | self.connect(uri=uri,login=False) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
elif login and not hasattr(self._imap[hostname],'logged_in'): | elif login and not hasattr(self._imap[hostname],'logged_in'): | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.disconnect(hostname) | self.disconnect(hostname) | ||||
self.connect(uri=uri) | self.connect(uri=uri) | ||||
else: | else: | ||||
try: | try: | ||||
if hasattr(self._imap[hostname], 'm'): | if hasattr(self._imap[hostname], 'm'): | ||||
self._imap[hostname].m.noop() | self._imap[hostname].m.noop() | ||||
elif hasattr(self._imap[hostname], 'noop') and callable(self._imap[hostname].noop): | elif hasattr(self._imap[hostname], 'noop') and callable(self._imap[hostname].noop): | ||||
Lint: PEP8 E501 line too long (103 > 79 characters) Lint: PEP8 E501: line too long (103 > 79 characters) | |||||
self._imap[hostname].noop() | self._imap[hostname].noop() | ||||
log.debug(_("Reusing existing IMAP server connection to %s") % (hostname), level=8) | log.debug(_("Reusing existing IMAP server connection to %s") % (hostname), level=8) | ||||
Lint: PEP8 E501 line too long (103 > 79 characters) Lint: PEP8 E501: line too long (103 > 79 characters) | |||||
except: | except: | ||||
log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8) | log.debug(_("Reconnecting to IMAP server %s") % (hostname), level=8) | ||||
Lint: PEP8 E501 line too long (88 > 79 characters) Lint: PEP8 E501: line too long (88 > 79 characters) | |||||
self.disconnect(hostname) | self.disconnect(hostname) | ||||
self.connect() | self.connect() | ||||
# Set the newly created technology specific IMAP library as the current | # Set the newly created technology specific IMAP library as the current | ||||
# IMAP connection to be used. | # IMAP connection to be used. | ||||
self.imap = self._imap[hostname] | self.imap = self._imap[hostname] | ||||
def disconnect(self, server=None): | def disconnect(self, server=None): | ||||
if server == None: | if server == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
# No server specified, but make sure self.imap is None anyways | # No server specified, but make sure self.imap is None anyways | ||||
if hasattr(self, 'imap'): | if hasattr(self, 'imap'): | ||||
del self.imap | del self.imap | ||||
else: | else: | ||||
if self._imap.has_key(server): | if self._imap.has_key(server): | ||||
del self._imap[server] | del self._imap[server] | ||||
else: | else: | ||||
log.warning(_("Called imap.disconnect() on a server that we had no connection to.")) | log.warning(_("Called imap.disconnect() on a server that we had no connection to.")) | ||||
Lint: PEP8 E501 line too long (100 > 79 characters) Lint: PEP8 E501: line too long (100 > 79 characters) | |||||
def create_folder(self, folder_path, server=None, partition=None): | def create_folder(self, folder_path, server=None, partition=None): | ||||
folder_path = self.folder_utf7(folder_path) | folder_path = self.folder_utf7(folder_path) | ||||
if not server == None: | if not server == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
self.connect(server=server) | self.connect(server=server) | ||||
try: | try: | ||||
self._imap[server].cm(folder_path, partition=partition) | self._imap[server].cm(folder_path, partition=partition) | ||||
return True | return True | ||||
except: | except: | ||||
log.error( | log.error( | ||||
_("Could not create folder %r on server %r") % ( | _("Could not create folder %r on server %r") % ( | ||||
Show All 12 Lines | class IMAP(object): | ||||
def __getattr__(self, name): | def __getattr__(self, name): | ||||
if hasattr(self.imap, name): | if hasattr(self.imap, name): | ||||
return getattr(self.imap, name) | return getattr(self.imap, name) | ||||
elif hasattr(self.imap, 'm'): | elif hasattr(self.imap, 'm'): | ||||
if hasattr(self.imap.m, name): | if hasattr(self.imap.m, name): | ||||
return getattr(self.imap.m, name) | return getattr(self.imap.m, name) | ||||
else: | else: | ||||
raise AttributeError, _("%r has no attribute %s") % (self,name) | raise AttributeError, _("%r has no attribute %s") % (self,name) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
else: | else: | ||||
raise AttributeError, _("%r has no attribute %s") % (self,name) | raise AttributeError, _("%r has no attribute %s") % (self,name) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
def folder_utf7(self, folder): | def folder_utf7(self, folder): | ||||
from pykolab import imap_utf7 | from pykolab import imap_utf7 | ||||
return imap_utf7.encode(folder) | return imap_utf7.encode(folder) | ||||
def folder_utf8(self, folder): | def folder_utf8(self, folder): | ||||
from pykolab import imap_utf7 | from pykolab import imap_utf7 | ||||
return imap_utf7.decode(folder) | return imap_utf7.decode(folder) | ||||
def folder_quote(self, folder): | def folder_quote(self, folder): | ||||
return u'"' + str(folder).strip('"') + '"' | return u'"' + str(folder).strip('"') + '"' | ||||
def get_metadata(self, folder): | def get_metadata(self, folder): | ||||
""" | """ | ||||
Obtain all metadata entries on a folder | Obtain all metadata entries on a folder | ||||
""" | """ | ||||
metadata = {} | metadata = {} | ||||
_metadata = self.imap.getannotation(self.folder_utf7(folder), '*') | _metadata = self.imap.getannotation(self.folder_utf7(folder), '*') | ||||
for (k,v) in _metadata.items(): | for (k,v) in _metadata.items(): | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
metadata[self.folder_utf8(k)] = v | metadata[self.folder_utf8(k)] = v | ||||
return metadata | return metadata | ||||
def get_separator(self): | def get_separator(self): | ||||
if not hasattr(self, 'imap') or self.imap == None: | if not hasattr(self, 'imap') or self.imap == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
self.connect() | self.connect() | ||||
if hasattr(self.imap, 'separator'): | if hasattr(self.imap, 'separator'): | ||||
return self.imap.separator | return self.imap.separator | ||||
elif hasattr(self.imap, 'm') and hasattr(self.imap.m, 'separator'): | elif hasattr(self.imap, 'm') and hasattr(self.imap.m, 'separator'): | ||||
return self.imap.m.separator | return self.imap.m.separator | ||||
else: | else: | ||||
return '/' | return '/' | ||||
Show All 21 Lines | def namespaces(self): | ||||
if len(_namespaces) == 1: | if len(_namespaces) == 1: | ||||
_namespaces = _namespaces[0] | _namespaces = _namespaces[0] | ||||
_namespaces = re.split(r"\)\)\s\(\(", _namespaces) | _namespaces = re.split(r"\)\)\s\(\(", _namespaces) | ||||
if len(_namespaces) >= 3: | if len(_namespaces) >= 3: | ||||
_shared = [] | _shared = [] | ||||
_shared.append(' '.join(_namespaces[2].replace('((','').replace('))','').split()[:-1]).replace('"', '')) | _shared.append(' '.join(_namespaces[2].replace('((','').replace('))','').split()[:-1]).replace('"', '')) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E501 line too long (116 > 79 characters) Lint: PEP8 E501: line too long (116 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
if len(_namespaces) >= 2: | if len(_namespaces) >= 2: | ||||
_other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '') | _other_users = ' '.join(_namespaces[1].replace('((','').replace('))','').split()[:-1]).replace('"', '') | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E501 line too long (115 > 79 characters) Lint: PEP8 E501: line too long (115 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
if len(_namespaces) >= 1: | if len(_namespaces) >= 1: | ||||
_personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '') | _personal = _namespaces[0].replace('((','').replace('))','').split()[0].replace('"', '') | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E501 line too long (100 > 79 characters) Lint: PEP8 E501: line too long (100 > 79 characters) | |||||
return (_personal, _other_users, _shared) | return (_personal, _other_users, _shared) | ||||
def set_acl(self, folder, identifier, acl): | def set_acl(self, folder, identifier, acl): | ||||
""" | """ | ||||
Set an ACL entry on a folder. | Set an ACL entry on a folder. | ||||
""" | """ | ||||
_acl = '' | _acl = '' | ||||
Show All 12 Lines | def set_acl(self, folder, identifier, acl): | ||||
if short_rights.has_key(acl): | if short_rights.has_key(acl): | ||||
acl = short_rights[acl] | acl = short_rights[acl] | ||||
else: | else: | ||||
for char in acl: | for char in acl: | ||||
if char in "-+": | if char in "-+": | ||||
continue | continue | ||||
if not char in short_rights['all']: | if not char in short_rights['all']: | ||||
Lint: PEP8 E713 test for membership should be 'not in' Lint: PEP8 E713: test for membership should be 'not in' | |||||
log.error(_("Invalid access identifier %r for subject %r") % (acl, identifier)) | log.error(_("Invalid access identifier %r for subject %r") % (acl, identifier)) | ||||
Lint: PEP8 E501 line too long (99 > 79 characters) Lint: PEP8 E501: line too long (99 > 79 characters) | |||||
return False | return False | ||||
# Special treatment for '-' and '+' characters | # Special treatment for '-' and '+' characters | ||||
if '+' in acl or '-' in acl: | if '+' in acl or '-' in acl: | ||||
acl_map = { | acl_map = { | ||||
'set': '', | 'set': '', | ||||
'subtract': '', | 'subtract': '', | ||||
'add': '' | 'add': '' | ||||
} | } | ||||
mode = 'set' | mode = 'set' | ||||
for char in acl: | for char in acl: | ||||
if char == '-': | if char == '-': | ||||
mode = 'subtract' | mode = 'subtract' | ||||
continue | continue | ||||
if char == '+': | if char == '+': | ||||
continue | |||||
mode = 'add' | mode = 'add' | ||||
continue | |||||
acl_map[mode] += char | acl_map[mode] += char | ||||
current_acls = self.imap.lam(self.folder_utf7(folder)) | current_acls = self.imap.lam(self.folder_utf7(folder)) | ||||
for current_acl in current_acls.keys(): | for current_acl in current_acls.keys(): | ||||
if current_acl == identifier: | if current_acl == identifier: | ||||
_acl = current_acls[current_acl] | _acl = current_acls[current_acl] | ||||
break | break | ||||
_acl = _acl + acl_map['set'] + acl_map['add'] | _acl = _acl + acl_map['set'] + acl_map['add'] | ||||
_acl = [x for x in _acl.split() if x not in acl_map['subtract'].split()] | _acl = [x for x in _acl.split() if x not in acl_map['subtract'].split()] | ||||
Lint: PEP8 E501 line too long (84 > 79 characters) Lint: PEP8 E501: line too long (84 > 79 characters) | |||||
acl = ''.join(list(set(_acl))) | acl = ''.join(list(set(_acl))) | ||||
try: | try: | ||||
self.imap.sam(self.folder_utf7(folder), identifier, acl) | self.imap.sam(self.folder_utf7(folder), identifier, acl) | ||||
except Exception, errmsg: | except Exception, errmsg: | ||||
log.error( | log.error( | ||||
_("Could not set ACL for %s on folder %s: %r") % ( | _("Could not set ACL for %s on folder %s: %r") % ( | ||||
identifier, | identifier, | ||||
Show All 11 Lines | def set_metadata(self, folder, metadata_path, metadata_value, shared=True): | ||||
shared = True | shared = True | ||||
if metadata_path.startswith('/shared/'): | if metadata_path.startswith('/shared/'): | ||||
metadata_path = metadata_path.replace('/shared/', '/') | metadata_path = metadata_path.replace('/shared/', '/') | ||||
elif metadata_path.startswith('/private/'): | elif metadata_path.startswith('/private/'): | ||||
shared = False | shared = False | ||||
metadata_path = metadata_path.replace('/private/', '/') | metadata_path = metadata_path.replace('/private/', '/') | ||||
self.imap._setannotation(self.folder_utf7(folder), metadata_path, metadata_value, shared) | self.imap._setannotation(self.folder_utf7(folder), metadata_path, metadata_value, shared) | ||||
Lint: PEP8 E501 line too long (97 > 79 characters) Lint: PEP8 E501: line too long (97 > 79 characters) | |||||
def shared_folder_create(self, folder_path, server=None): | def shared_folder_create(self, folder_path, server=None): | ||||
""" | """ | ||||
Create a shared folder. | Create a shared folder. | ||||
""" | """ | ||||
folder_name = "shared%s%s" % (self.get_separator(), folder_path) | folder_name = "shared%s%s" % (self.get_separator(), folder_path) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
log.info(_("Creating new shared folder %s") %(folder_name)) | log.info(_("Creating new shared folder %s") %(folder_name)) | ||||
Lint: PEP8 E225 missing whitespace around operator Lint: PEP8 E225: missing whitespace around operator | |||||
self.create_folder(folder_name, server) | self.create_folder(folder_name, server) | ||||
def shared_folder_exists(self, folder_path): | def shared_folder_exists(self, folder_path): | ||||
""" | """ | ||||
Check if a shared mailbox exists. | Check if a shared mailbox exists. | ||||
""" | """ | ||||
folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
return self.has_folder(folder_name) | return self.has_folder(folder_name) | ||||
def shared_folder_set_type(self, folder_path, folder_type): | def shared_folder_set_type(self, folder_path, folder_type): | ||||
folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | folder_name = 'shared%s%s' % (self.get_separator(), folder_path) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
self.set_metadata(folder_name, '/shared/vendor/kolab/folder-type', folder_type) | self.set_metadata(folder_name, '/shared/vendor/kolab/folder-type', folder_type) | ||||
Lint: PEP8 E501 line too long (87 > 79 characters) Lint: PEP8 E501: line too long (87 > 79 characters) | |||||
def shared_mailbox_create(self, mailbox_base_name, server=None): | def shared_mailbox_create(self, mailbox_base_name, server=None): | ||||
""" | """ | ||||
Create a shared folder. | Create a shared folder. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
log.info(_("Creating new shared folder %s") %(mailbox_base_name)) | log.info(_("Creating new shared folder %s") %(mailbox_base_name)) | ||||
Lint: PEP8 E225 missing whitespace around operator Lint: PEP8 E225: missing whitespace around operator | |||||
self.create_folder(folder_name, server) | self.create_folder(folder_name, server) | ||||
def shared_mailbox_exists(self, mailbox_base_name): | def shared_mailbox_exists(self, mailbox_base_name): | ||||
""" | """ | ||||
Check if a shared mailbox exists. | Check if a shared mailbox exists. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | folder_name = "shared%s%s" % (self.get_separator(), mailbox_base_name) | ||||
# Correct folder_path being supplied with "shared/shared/" for example | # Correct folder_path being supplied with "shared/shared/" for example | ||||
if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | if folder_name.startswith("shared%s" % (self.get_separator()) * 2): | ||||
folder_name = folder_name[7:] | folder_name = folder_name[7:] | ||||
return self.has_folder(folder_name) | return self.has_folder(folder_name) | ||||
def user_mailbox_create(self, mailbox_base_name, server=None): | def user_mailbox_create(self, mailbox_base_name, server=None): | ||||
""" | """ | ||||
Create a user mailbox. | Create a user mailbox. | ||||
Returns the full path to the new mailbox folder. | Returns the full path to the new mailbox folder. | ||||
""" | """ | ||||
# TODO: Whether or not to lowercase the mailbox name is really up to the | # TODO: Whether or not to lowercase the mailbox name is really up to the | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
# IMAP server setting username_tolower (normalize_uid, lmtp_downcase_rcpt). | # IMAP server setting username_tolower (normalize_uid, lmtp_downcase_rcpt). | ||||
Lint: PEP8 E501 line too long (83 > 79 characters) Lint: PEP8 E501: line too long (83 > 79 characters) | |||||
self.connect() | self.connect() | ||||
if not mailbox_base_name == mailbox_base_name.lower(): | if not mailbox_base_name == mailbox_base_name.lower(): | ||||
log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) | log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) | ||||
mailbox_base_name = mailbox_base_name.lower() | mailbox_base_name = mailbox_base_name.lower() | ||||
folder_name = "user%s%s" % (self.get_separator(), mailbox_base_name) | folder_name = "user%s%s" % (self.get_separator(), mailbox_base_name) | ||||
log.info(_("Creating new mailbox for user %s") %(mailbox_base_name)) | log.info(_("Creating new mailbox for user %s") %(mailbox_base_name)) | ||||
Lint: PEP8 E225 missing whitespace around operator Lint: PEP8 E225: missing whitespace around operator | |||||
self.create_folder(folder_name, server) | self.create_folder(folder_name, server) | ||||
# In a Cyrus IMAP Murder topology, wait for the murder to have settled | # In a Cyrus IMAP Murder topology, wait for the murder to have settled | ||||
if self.imap_murder(): | if self.imap_murder(): | ||||
self.disconnect() | self.disconnect() | ||||
self.connect() | self.connect() | ||||
created = False | created = False | ||||
last_log = time.time() | last_log = time.time() | ||||
while not created: | while not created: | ||||
created = self.has_folder(folder_name) | created = self.has_folder(folder_name) | ||||
if not created: | if not created: | ||||
if time.time() - last_log > 5: | if time.time() - last_log > 5: | ||||
log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) | log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) | ||||
Lint: PEP8 E501 line too long (85 > 79 characters) Lint: PEP8 E501: line too long (85 > 79 characters) | |||||
last_log = time.time() | last_log = time.time() | ||||
time.sleep(0.5) | time.sleep(0.5) | ||||
_additional_folders = None | _additional_folders = None | ||||
if not hasattr(self, 'domain'): | if not hasattr(self, 'domain'): | ||||
self.domain == None | self.domain = None | ||||
if self.domain == None and len(mailbox_base_name.split('@')) > 1: | if self.domain == None and len(mailbox_base_name.split('@')) > 1: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
self.domain = mailbox_base_name.split('@')[1] | self.domain = mailbox_base_name.split('@')[1] | ||||
if not self.domain == None: | if not self.domain == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if conf.has_option(self.domain, "autocreate_folders"): | if conf.has_option(self.domain, "autocreate_folders"): | ||||
_additional_folders = conf.get_raw( | _additional_folders = conf.get_raw( | ||||
self.domain, | self.domain, | ||||
"autocreate_folders" | "autocreate_folders" | ||||
) | ) | ||||
else: | else: | ||||
from pykolab.auth import Auth | from pykolab.auth import Auth | ||||
Show All 9 Lines | def user_mailbox_create(self, mailbox_base_name, server=None): | ||||
primary = domains[self.domain] | primary = domains[self.domain] | ||||
if conf.has_option(primary, "autocreate_folders"): | if conf.has_option(primary, "autocreate_folders"): | ||||
_additional_folders = conf.get_raw( | _additional_folders = conf.get_raw( | ||||
primary, | primary, | ||||
"autocreate_folders" | "autocreate_folders" | ||||
) | ) | ||||
if _additional_folders == None: | if _additional_folders == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if conf.has_option('kolab', "autocreate_folders"): | if conf.has_option('kolab', "autocreate_folders"): | ||||
_additional_folders = conf.get_raw( | _additional_folders = conf.get_raw( | ||||
'kolab', | 'kolab', | ||||
"autocreate_folders" | "autocreate_folders" | ||||
) | ) | ||||
additional_folders = conf.plugins.exec_hook( | additional_folders = conf.plugins.exec_hook( | ||||
"create_user_folders", | "create_user_folders", | ||||
kw={ | kw={ | ||||
'folder': folder_name, | 'folder': folder_name, | ||||
'additional_folders': _additional_folders | 'additional_folders': _additional_folders | ||||
} | } | ||||
) | ) | ||||
if not additional_folders == None: | if not additional_folders == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
self.user_mailbox_create_additional_folders( | self.user_mailbox_create_additional_folders( | ||||
mailbox_base_name, | mailbox_base_name, | ||||
additional_folders | additional_folders | ||||
) | ) | ||||
if not self.domain == None: | if not self.domain == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if conf.has_option(self.domain, "sieve_mgmt"): | if conf.has_option(self.domain, "sieve_mgmt"): | ||||
sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') | sieve_mgmt_enabled = conf.get(self.domain, 'sieve_mgmt') | ||||
if utils.true_or_false(sieve_mgmt_enabled): | if utils.true_or_false(sieve_mgmt_enabled): | ||||
conf.plugins.exec_hook( | conf.plugins.exec_hook( | ||||
'sieve_mgmt_refresh', | 'sieve_mgmt_refresh', | ||||
kw={ | kw={ | ||||
'user': mailbox_base_name | 'user': mailbox_base_name | ||||
} | } | ||||
Show All 26 Lines | def user_mailbox_create_additional_folders(self, user, additional_folders): | ||||
self.disconnect() | self.disconnect() | ||||
self.connect(login=False, server=server) | self.connect(login=False, server=server) | ||||
self.login_plain(admin_login, admin_password, user) | self.login_plain(admin_login, admin_password, user) | ||||
(personal, other, shared) = self.namespaces() | (personal, other, shared) = self.namespaces() | ||||
success = True | success = True | ||||
except Exception, errmsg: | except Exception, errmsg: | ||||
if time.time() - last_log > 5 and self.imap_murder(): | if time.time() - last_log > 5 and self.imap_murder(): | ||||
log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg)) | log.debug(_("Waiting for the Cyrus murder to settle... %r") % (errmsg)) | ||||
Lint: PEP8 E501 line too long (91 > 79 characters) Lint: PEP8 E501: line too long (91 > 79 characters) | |||||
last_log = time.time() | last_log = time.time() | ||||
if conf.debuglevel > 8: | if conf.debuglevel > 8: | ||||
import traceback | import traceback | ||||
traceback.print_exc() | traceback.print_exc() | ||||
time.sleep(0.5) | time.sleep(0.5) | ||||
for additional_folder in additional_folders.keys(): | for additional_folder in additional_folders.keys(): | ||||
_add_folder = {} | _add_folder = {} | ||||
folder_name = additional_folder | folder_name = additional_folder | ||||
if not folder_name.startswith(personal): | if not folder_name.startswith(personal): | ||||
log.error(_("Correcting additional folder name from %r to %r") % (folder_name, "%s%s" % (personal, folder_name))) | log.error(_("Correcting additional folder name from %r to %r") % (folder_name, "%s%s" % (personal, folder_name))) | ||||
Lint: PEP8 E501 line too long (129 > 79 characters) Lint: PEP8 E501: line too long (129 > 79 characters) | |||||
folder_name = "%s%s" % (personal, folder_name) | folder_name = "%s%s" % (personal, folder_name) | ||||
try: | try: | ||||
self.create_folder(folder_name) | self.create_folder(folder_name) | ||||
created = False | created = False | ||||
last_log = time.time() | last_log = time.time() | ||||
while not created: | while not created: | ||||
created = self.has_folder(folder_name) | created = self.has_folder(folder_name) | ||||
if not created: | if not created: | ||||
if time.time() - last_log > 5: | if time.time() - last_log > 5: | ||||
log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) | log.info(_("Waiting for the Cyrus IMAP Murder to settle...")) | ||||
Lint: PEP8 E501 line too long (89 > 79 characters) Lint: PEP8 E501: line too long (89 > 79 characters) | |||||
last_log = time.time() | last_log = time.time() | ||||
time.sleep(0.5) | time.sleep(0.5) | ||||
except: | except: | ||||
log.warning(_("Mailbox already exists: %s") % (folder_name)) | log.warning(_("Mailbox already exists: %s") % (folder_name)) | ||||
if conf.debuglevel > 8: | if conf.debuglevel > 8: | ||||
import traceback | import traceback | ||||
traceback.print_exc() | traceback.print_exc() | ||||
continue | continue | ||||
if additional_folders[additional_folder].has_key("annotations"): | if additional_folders[additional_folder].has_key("annotations"): | ||||
for annotation in additional_folders[additional_folder]["annotations"].keys(): | for annotation in additional_folders[additional_folder]["annotations"].keys(): | ||||
Lint: PEP8 E501 line too long (94 > 79 characters) Lint: PEP8 E501: line too long (94 > 79 characters) | |||||
self.set_metadata( | self.set_metadata( | ||||
folder_name, | folder_name, | ||||
"%s" % (annotation), | "%s" % (annotation), | ||||
"%s" % (additional_folders[additional_folder]["annotations"][annotation]) | "%s" % (additional_folders[additional_folder]["annotations"][annotation]) | ||||
Lint: PEP8 E501 line too long (101 > 79 characters) Lint: PEP8 E501: line too long (101 > 79 characters) | |||||
) | ) | ||||
if additional_folders[additional_folder].has_key("acls"): | if additional_folders[additional_folder].has_key("acls"): | ||||
for acl in additional_folders[additional_folder]["acls"].keys(): | for acl in additional_folders[additional_folder]["acls"].keys(): | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
self.set_acl( | self.set_acl( | ||||
folder_name, | folder_name, | ||||
"%s" % (acl), | "%s" % (acl), | ||||
"%s" % (additional_folders[additional_folder]["acls"][acl]) | "%s" % (additional_folders[additional_folder]["acls"][acl]) | ||||
Lint: PEP8 E501 line too long (87 > 79 characters) Lint: PEP8 E501: line too long (87 > 79 characters) | |||||
) | ) | ||||
if len(user.split('@')) > 1: | if len(user.split('@')) > 1: | ||||
localpart = user.split('@')[0] | localpart = user.split('@')[0] | ||||
domain = user.split('@')[1] | domain = user.split('@')[1] | ||||
domain_suffix = "@%s" % (domain) | domain_suffix = "@%s" % (domain) | ||||
else: | else: | ||||
localpart = user | localpart = user | ||||
domain = None | domain = None | ||||
domain_suffix = "" | domain_suffix = "" | ||||
if not domain == None: | if not domain == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): | if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'): | ||||
Lint: PEP8 E501 line too long (84 > 79 characters) Lint: PEP8 E501: line too long (84 > 79 characters) | |||||
backend = conf.get(domain, 'imap_backend') | backend = conf.get(domain, 'imap_backend') | ||||
if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): | if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'): | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
uri = conf.get(domain, 'imap_uri') | uri = conf.get(domain, 'imap_uri') | ||||
else: | else: | ||||
uri = None | uri = None | ||||
log.debug(_("Subscribing user to the additional folders"), level=8) | log.debug(_("Subscribing user to the additional folders"), level=8) | ||||
_tests = [] | _tests = [] | ||||
# Subscribe only to personal folders | # Subscribe only to personal folders | ||||
(personal, other, shared) = self.namespaces() | (personal, other, shared) = self.namespaces() | ||||
if not other == None: | if not other == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
_tests.append(other) | _tests.append(other) | ||||
if not shared == None: | if not shared == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
for _shared in shared: | for _shared in shared: | ||||
_tests.append(_shared) | _tests.append(_shared) | ||||
log.debug(_("Using the following tests for folder subscriptions:"), level=8) | log.debug(_("Using the following tests for folder subscriptions:"), level=8) | ||||
Lint: PEP8 E501 line too long (84 > 79 characters) Lint: PEP8 E501: line too long (84 > 79 characters) | |||||
for _test in _tests: | for _test in _tests: | ||||
log.debug(_(" %r") % (_test), level=8) | log.debug(_(" %r") % (_test), level=8) | ||||
for _folder in self.lm(): | for _folder in self.lm(): | ||||
log.debug(_("Folder %s") % (_folder), level=8) | log.debug(_("Folder %s") % (_folder), level=8) | ||||
_subscribe = True | _subscribe = True | ||||
for _test in _tests: | for _test in _tests: | ||||
if not _subscribe: | if not _subscribe: | ||||
continue | continue | ||||
if _folder.startswith(_test): | if _folder.startswith(_test): | ||||
_subscribe = False | _subscribe = False | ||||
if _subscribe: | if _subscribe: | ||||
log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8) | log.debug(_("Subscribing %s to folder %s") % (user, _folder), level=8) | ||||
Lint: PEP8 E501 line too long (86 > 79 characters) Lint: PEP8 E501: line too long (86 > 79 characters) | |||||
try: | try: | ||||
self.subscribe(_folder) | self.subscribe(_folder) | ||||
except Exception, errmsg: | except Exception, errmsg: | ||||
log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) | log.error(_("Subscribing %s to folder %s failed: %r") % (user, _folder, errmsg)) | ||||
Lint: PEP8 E501 line too long (100 > 79 characters) Lint: PEP8 E501: line too long (100 > 79 characters) | |||||
self.logout() | self.logout() | ||||
self.connect(domain=self.domain) | self.connect(domain=self.domain) | ||||
for additional_folder in additional_folders.keys(): | for additional_folder in additional_folders.keys(): | ||||
if additional_folder.startswith(personal) and not personal == '': | if additional_folder.startswith(personal) and not personal == '': | ||||
folder_name = additional_folder.replace(personal, '') | folder_name = additional_folder.replace(personal, '') | ||||
else: | else: | ||||
Show All 13 Lines | def user_mailbox_create_additional_folders(self, user, additional_folders): | ||||
additional_folders[additional_folder]['quota'] | additional_folders[additional_folder]['quota'] | ||||
) | ) | ||||
if additional_folders[additional_folder].has_key("partition"): | if additional_folders[additional_folder].has_key("partition"): | ||||
partition = additional_folders[additional_folder]["partition"] | partition = additional_folders[additional_folder]["partition"] | ||||
try: | try: | ||||
self.imap._rename(folder_name, folder_name, partition) | self.imap._rename(folder_name, folder_name, partition) | ||||
except: | except: | ||||
log.error(_("Could not rename %s to reside on partition %s") % (folder_name, partition)) | log.error(_("Could not rename %s to reside on partition %s") % (folder_name, partition)) | ||||
Lint: PEP8 E501 line too long (108 > 79 characters) Lint: PEP8 E501: line too long (108 > 79 characters) | |||||
def user_mailbox_delete(self, mailbox_base_name): | def user_mailbox_delete(self, mailbox_base_name): | ||||
""" | """ | ||||
Delete a user mailbox. | Delete a user mailbox. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
folder = "user%s%s" %(self.get_separator(),mailbox_base_name) | folder = "user%s%s" %(self.get_separator(),mailbox_base_name) | ||||
Lint: PEP8 E225 missing whitespace around operator Lint: PEP8 E225: missing whitespace around operator | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.delete_mailfolder(folder) | self.delete_mailfolder(folder) | ||||
self.cleanup_acls(mailbox_base_name) | self.cleanup_acls(mailbox_base_name) | ||||
def user_mailbox_exists(self, mailbox_base_name): | def user_mailbox_exists(self, mailbox_base_name): | ||||
""" | """ | ||||
Check if a user mailbox exists. | Check if a user mailbox exists. | ||||
""" | """ | ||||
self.connect() | self.connect() | ||||
if not mailbox_base_name == mailbox_base_name.lower(): | if not mailbox_base_name == mailbox_base_name.lower(): | ||||
log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) | log.warning(_("Downcasing mailbox name %r") % (mailbox_base_name)) | ||||
mailbox_base_name = mailbox_base_name.lower() | mailbox_base_name = mailbox_base_name.lower() | ||||
return self.has_folder('user%s%s' %(self.get_separator(), mailbox_base_name)) | return self.has_folder('user%s%s' %(self.get_separator(), mailbox_base_name)) | ||||
Lint: PEP8 E225 missing whitespace around operator Lint: PEP8 E225: missing whitespace around operator | |||||
Lint: PEP8 E501 line too long (85 > 79 characters) Lint: PEP8 E501: line too long (85 > 79 characters) | |||||
def user_mailbox_quota(self, mailbox_quota): | def user_mailbox_quota(self, mailbox_quota): | ||||
pass | pass | ||||
def user_mailbox_rename(self, old_name, new_name, partition=None): | def user_mailbox_rename(self, old_name, new_name, partition=None): | ||||
self.connect() | self.connect() | ||||
old_name = "user%s%s" % (self.get_separator(),old_name) | old_name = "user%s%s" % (self.get_separator(),old_name) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
new_name = "user%s%s" % (self.get_separator(),new_name) | new_name = "user%s%s" % (self.get_separator(),new_name) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
if old_name == new_name and partition == None: | if old_name == new_name and partition == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
return | return | ||||
if not self.has_folder(old_name): | if not self.has_folder(old_name): | ||||
log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) | log.error(_("INBOX folder to rename (%s) does not exist") % (old_name)) | ||||
Lint: PEP8 E501 line too long (83 > 79 characters) Lint: PEP8 E501: line too long (83 > 79 characters) | |||||
if not self.has_folder(new_name) or not partition == None: | if not self.has_folder(new_name) or not partition == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) | log.info(_("Renaming INBOX from %s to %s") % (old_name,new_name)) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
try: | try: | ||||
self.imap.rename(old_name,new_name,partition) | self.imap.rename(old_name,new_name,partition) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
except: | except: | ||||
log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name)) | log.error(_("Could not rename INBOX folder %s to %s") % (old_name,new_name)) | ||||
Lint: PEP8 E501 line too long (92 > 79 characters) Lint: PEP8 E501: line too long (92 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
else: | else: | ||||
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) | log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_name,new_name)) | ||||
Lint: PEP8 E501 line too long (123 > 79 characters) Lint: PEP8 E501: line too long (123 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
def user_mailbox_server(self, mailbox): | def user_mailbox_server(self, mailbox): | ||||
server = self.imap.find_mailfolder_server(mailbox.lower()).lower() | server = self.imap.find_mailfolder_server(mailbox.lower()).lower() | ||||
log.debug(_("Server for mailbox %r is %r") % (mailbox, server), level=8) | log.debug(_("Server for mailbox %r is %r") % (mailbox, server), level=8) | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
return server | return server | ||||
def has_folder(self, folder): | def has_folder(self, folder): | ||||
""" | """ | ||||
Check if the environment has a folder named folder. | Check if the environment has a folder named folder. | ||||
""" | """ | ||||
folders = self.imap.lm(self.folder_utf7(folder)) | folders = self.imap.lm(self.folder_utf7(folder)) | ||||
log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8) | log.debug(_("Looking for folder '%s', we found folders: %r") % (folder,[self.folder_utf8(x) for x in folders]), level=8) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E501 line too long (128 > 79 characters) Lint: PEP8 E501: line too long (128 > 79 characters) | |||||
# Greater then one, this folder may have subfolders. | # Greater then one, this folder may have subfolders. | ||||
if len(folders) > 0: | if len(folders) > 0: | ||||
return True | return True | ||||
else: | else: | ||||
return False | return False | ||||
def _set_kolab_mailfolder_acls(self, acls): | def _set_kolab_mailfolder_acls(self, acls): | ||||
if isinstance(acls, basestring): | if isinstance(acls, basestring): | ||||
acls = [ acls ] | acls = [ acls ] | ||||
Lint: PEP8 E201 whitespace after '[' Lint: PEP8 E201: whitespace after '[' | |||||
Lint: PEP8 E202 whitespace before ']' Lint: PEP8 E202: whitespace before ']' | |||||
for acl in acls: | for acl in acls: | ||||
exec("acl = %s" % (acl)) | exec("acl = %s" % (acl)) | ||||
folder = acl[0] | folder = acl[0] | ||||
subject = acl[1] | subject = acl[1] | ||||
rights = acl[2] | rights = acl[2] | ||||
if len(acl) == 4: | if len(acl) == 4: | ||||
epoch = acl[3] | epoch = acl[3] | ||||
else: | else: | ||||
epoch = (int)(time.time()) + 3600 | epoch = (int)(time.time()) + 3600 | ||||
if epoch > (int)(time.time()): | if epoch > (int)(time.time()): | ||||
log.debug( | log.debug( | ||||
_("Setting ACL rights %s for subject %s on folder " + \ | _("Setting ACL rights %s for subject %s on folder " + \ | ||||
Lint: PEP8 E502 the backslash is redundant between brackets Lint: PEP8 E502: the backslash is redundant between brackets | |||||
"%s") % (rights,subject,folder), level=8) | "%s") % (rights,subject,folder), level=8) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.set_acl( | self.set_acl( | ||||
folder, | folder, | ||||
"%s" % (subject), | "%s" % (subject), | ||||
"%s" % (rights) | "%s" % (rights) | ||||
) | ) | ||||
else: | else: | ||||
log.debug( | log.debug( | ||||
_("Removing ACL rights %s for subject %s on folder " + \ | _("Removing ACL rights %s for subject %s on folder " + \ | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
Lint: PEP8 E502 the backslash is redundant between brackets Lint: PEP8 E502: the backslash is redundant between brackets | |||||
"%s") % (rights,subject,folder), level=8) | "%s") % (rights,subject,folder), level=8) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.set_acl( | self.set_acl( | ||||
folder, | folder, | ||||
"%s" % (subject), | "%s" % (subject), | ||||
"" | "" | ||||
) | ) | ||||
pass | pass | ||||
""" Blah functions """ | """ Blah functions """ | ||||
def move_user_folders(self, users=[], domain=None): | def move_user_folders(self, users=[], domain=None): | ||||
for user in users: | for user in users: | ||||
if type(user) == dict: | if type(user) == dict: | ||||
if user.has_key('old_mail'): | if user.has_key('old_mail'): | ||||
inbox = "user/%s" % (user['mail']) | inbox = "user/%s" % (user['mail']) | ||||
old_inbox = "user/%s" % (user['old_mail']) | old_inbox = "user/%s" % (user['old_mail']) | ||||
if self.has_folder(old_inbox): | if self.has_folder(old_inbox): | ||||
log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) | log.debug(_("Found old INBOX folder %s") % (old_inbox), level=8) | ||||
Lint: PEP8 E501 line too long (88 > 79 characters) Lint: PEP8 E501: line too long (88 > 79 characters) | |||||
if not self.has_folder(inbox): | if not self.has_folder(inbox): | ||||
log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) | log.info(_("Renaming INBOX from %s to %s") % (old_inbox,inbox)) | ||||
Lint: PEP8 E501 line too long (91 > 79 characters) Lint: PEP8 E501: line too long (91 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.imap.rename(old_inbox,inbox) | self.imap.rename(old_inbox,inbox) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
self.inbox_folders.append(inbox) | self.inbox_folders.append(inbox) | ||||
else: | else: | ||||
log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) | log.warning(_("Moving INBOX folder %s won't succeed as target folder %s already exists") % (old_inbox,inbox)) | ||||
Lint: PEP8 E501 line too long (137 > 79 characters) Lint: PEP8 E501: line too long (137 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
else: | else: | ||||
log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) | log.debug(_("Did not find old folder user/%s to rename") % (user['old_mail']), level=8) | ||||
Lint: PEP8 E501 line too long (111 > 79 characters) Lint: PEP8 E501: line too long (111 > 79 characters) | |||||
else: | else: | ||||
log.debug(_("Value for user is not a dictionary"), level=8) | log.debug(_("Value for user is not a dictionary"), level=8) | ||||
def set_quota(self, folder, quota): | def set_quota(self, folder, quota): | ||||
i = 0 | i = 0 | ||||
while i < 10: | while i < 10: | ||||
try: | try: | ||||
self.imap._setquota(folder, quota) | self.imap._setquota(folder, quota) | ||||
i = 10 | i = 10 | ||||
except: | except: | ||||
self.disconnect() | self.disconnect() | ||||
self.connect() | self.connect() | ||||
i += 1 | i += 1 | ||||
def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | def set_user_folder_quota(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | ||||
Lint: PEP8 E501 line too long (100 > 79 characters) Lint: PEP8 E501: line too long (100 > 79 characters) | |||||
""" | """ | ||||
Sets the quota in IMAP using the authentication and authorization | Sets the quota in IMAP using the authentication and authorization | ||||
database 'quota' attribute for the users listed in parameter 'users' | database 'quota' attribute for the users listed in parameter 'users' | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
""" | """ | ||||
if conf.has_option(primary_domain, 'quota_attribute'): | if conf.has_option(primary_domain, 'quota_attribute'): | ||||
_quota_attr = conf.get(primary_domain, 'quota_attribute') | _quota_attr = conf.get(primary_domain, 'quota_attribute') | ||||
else: | else: | ||||
auth_mechanism = conf.get('kolab', 'auth_mechanism') | auth_mechanism = conf.get('kolab', 'auth_mechanism') | ||||
_quota_attr = conf.get(auth_mechanism, 'quota_attribute') | _quota_attr = conf.get(auth_mechanism, 'quota_attribute') | ||||
_inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') | _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') | ||||
default_quota = auth.domain_default_quota(primary_domain) | default_quota = auth.domain_default_quota(primary_domain) | ||||
if default_quota == "" or default_quota == None: | if default_quota == "" or default_quota == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
default_quota = 0 | default_quota = 0 | ||||
if len(users) == 0: | if len(users) == 0: | ||||
users = auth.list_users(primary_domain) | users = auth.list_users(primary_domain) | ||||
for user in users: | for user in users: | ||||
quota = None | quota = None | ||||
if type(user) == dict: | if type(user) == dict: | ||||
if user.has_key(_quota_attr): | if user.has_key(_quota_attr): | ||||
if type(user[_quota_attr]) == list: | if type(user[_quota_attr]) == list: | ||||
quota = user[_quota_attr].pop(0) | quota = user[_quota_attr].pop(0) | ||||
elif type(user[_quota_attr]) == str: | elif type(user[_quota_attr]) == str: | ||||
quota = user[_quota_attr] | quota = user[_quota_attr] | ||||
else: | else: | ||||
_quota = auth.get_user_attribute(primary_domain, user, _quota_attr) | _quota = auth.get_user_attribute(primary_domain, user, _quota_attr) | ||||
Lint: PEP8 E501 line too long (87 > 79 characters) Lint: PEP8 E501: line too long (87 > 79 characters) | |||||
if _quota == None: | if _quota == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
quota = 0 | quota = 0 | ||||
else: | else: | ||||
quota = _quota | quota = _quota | ||||
if not user.has_key(_inbox_folder_attr): | if not user.has_key(_inbox_folder_attr): | ||||
continue | continue | ||||
else: | else: | ||||
if type(user[_inbox_folder_attr]) == list: | if type(user[_inbox_folder_attr]) == list: | ||||
folder = "user/%s" % user[_inbox_folder_attr].pop(0) | folder = "user/%s" % user[_inbox_folder_attr].pop(0) | ||||
elif type(user[_inbox_folder_attr]) == str: | elif type(user[_inbox_folder_attr]) == str: | ||||
folder = "user/%s" % user[_inbox_folder_attr] | folder = "user/%s" % user[_inbox_folder_attr] | ||||
elif type(user) == str: | elif type(user) == str: | ||||
quota = auth.get_user_attribute(user, _quota_attr) | quota = auth.get_user_attribute(user, _quota_attr) | ||||
folder = "user/%s" % (user) | folder = "user/%s" % (user) | ||||
folder = folder.lower() | folder = folder.lower() | ||||
try: | try: | ||||
(used,current_quota) = self.imap.lq(folder) | (used,current_quota) = self.imap.lq(folder) | ||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
except: | except: | ||||
# TODO: Go in fact correct the quota. | # TODO: Go in fact correct the quota. | ||||
log.warning(_("Cannot get current IMAP quota for folder %s") % (folder)) | log.warning(_("Cannot get current IMAP quota for folder %s") % (folder)) | ||||
Lint: PEP8 E501 line too long (88 > 79 characters) Lint: PEP8 E501: line too long (88 > 79 characters) | |||||
used = 0 | used = 0 | ||||
current_quota = 0 | current_quota = 0 | ||||
new_quota = conf.plugins.exec_hook("set_user_folder_quota", kw={ | new_quota = conf.plugins.exec_hook("set_user_folder_quota", kw={ | ||||
'used': used, | 'used': used, | ||||
'current_quota': current_quota, | 'current_quota': current_quota, | ||||
'new_quota': (int)(quota), | 'new_quota': (int)(quota), | ||||
'default_quota': (int)(default_quota), | 'default_quota': (int)(default_quota), | ||||
'user': user | 'user': user | ||||
} | } | ||||
) | ) | ||||
log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7) | log.debug(_("Quota for %s currently is %s") % (folder, current_quota), level=7) | ||||
Lint: PEP8 E501 line too long (91 > 79 characters) Lint: PEP8 E501: line too long (91 > 79 characters) | |||||
if new_quota == None: | if new_quota == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
continue | continue | ||||
if not int(new_quota) == int(quota): | if not int(new_quota) == int(quota): | ||||
log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota))) | log.info(_("Adjusting authentication database quota for folder %s to %d") % (folder,int(new_quota))) | ||||
Lint: PEP8 E501 line too long (116 > 79 characters) Lint: PEP8 E501: line too long (116 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
quota = int(new_quota) | quota = int(new_quota) | ||||
auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) | auth.set_user_attribute(primary_domain, user, _quota_attr, new_quota) | ||||
Lint: PEP8 E501 line too long (85 > 79 characters) Lint: PEP8 E501: line too long (85 > 79 characters) | |||||
if not int(current_quota) == int(quota): | if not int(current_quota) == int(quota): | ||||
log.info(_("Correcting quota for %s to %s (currently %s)") % (folder, quota, current_quota)) | log.info(_("Correcting quota for %s to %s (currently %s)") % (folder, quota, current_quota)) | ||||
Lint: PEP8 E501 line too long (108 > 79 characters) Lint: PEP8 E501: line too long (108 > 79 characters) | |||||
self.imap._setquota(folder, quota) | self.imap._setquota(folder, quota) | ||||
def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | def set_user_mailhost(self, users=[], primary_domain=None, secondary_domain=[], folders=[]): | ||||
Lint: PEP8 E501 line too long (96 > 79 characters) Lint: PEP8 E501: line too long (96 > 79 characters) | |||||
if conf.has_option(primary_domain, 'mailserver_attribute'): | if conf.has_option(primary_domain, 'mailserver_attribute'): | ||||
_mailserver_attr = conf.get(primary_domain, 'mailserver_attribute') | _mailserver_attr = conf.get(primary_domain, 'mailserver_attribute') | ||||
else: | else: | ||||
auth_mechanism = conf.get('kolab', 'auth_mechanism') | auth_mechanism = conf.get('kolab', 'auth_mechanism') | ||||
_mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute') | _mailserver_attr = conf.get(auth_mechanism, 'mailserver_attribute') | ||||
_inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') | _inbox_folder_attr = conf.get('cyrus-sasl', 'result_attribute') | ||||
if len(users) == 0: | if len(users) == 0: | ||||
users = auth.list_users(primary_domain) | users = auth.list_users(primary_domain) | ||||
for user in users: | for user in users: | ||||
mailhost = None | mailhost = None | ||||
if type(user) == dict: | if type(user) == dict: | ||||
if user.has_key(_mailserver_attr): | if user.has_key(_mailserver_attr): | ||||
if type(user[_mailserver_attr]) == list: | if type(user[_mailserver_attr]) == list: | ||||
_mailserver = user[_mailserver_attr].pop(0) | _mailserver = user[_mailserver_attr].pop(0) | ||||
elif type(user[_mailserver_attr]) == str: | elif type(user[_mailserver_attr]) == str: | ||||
_mailserver = user[_mailserver_attr] | _mailserver = user[_mailserver_attr] | ||||
else: | else: | ||||
_mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr) | _mailserver = auth.get_user_attribute(primary_domain, user, _mailserver_attr) | ||||
Lint: PEP8 E501 line too long (97 > 79 characters) Lint: PEP8 E501: line too long (97 > 79 characters) | |||||
if not user.has_key(_inbox_folder_attr): | if not user.has_key(_inbox_folder_attr): | ||||
continue | continue | ||||
else: | else: | ||||
if type(user[_inbox_folder_attr]) == list: | if type(user[_inbox_folder_attr]) == list: | ||||
folder = "user/%s" % user[_inbox_folder_attr].pop(0) | folder = "user/%s" % user[_inbox_folder_attr].pop(0) | ||||
elif type(user[_inbox_folder_attr]) == str: | elif type(user[_inbox_folder_attr]) == str: | ||||
folder = "user/%s" % user[_inbox_folder_attr] | folder = "user/%s" % user[_inbox_folder_attr] | ||||
elif type(user) == str: | elif type(user) == str: | ||||
_mailserver = auth.get_user_attribute(user, _mailserver_attr) | _mailserver = auth.get_user_attribute(user, _mailserver_attr) | ||||
folder = "user/%s" % (user) | folder = "user/%s" % (user) | ||||
folder = folder.lower() | folder = folder.lower() | ||||
_current_mailserver = self.imap.find_mailfolder_server(folder) | _current_mailserver = self.imap.find_mailfolder_server(folder) | ||||
if not _mailserver == None: | if not _mailserver == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
# TODO: | # TODO: | ||||
if not _current_mailserver == _mailserver: | if not _current_mailserver == _mailserver: | ||||
self.imap._xfer(folder, _current_mailserver, _mailserver) | self.imap._xfer(folder, _current_mailserver, _mailserver) | ||||
else: | else: | ||||
auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver) | auth.set_user_attribute(primary_domain, user, _mailserver_attr, _current_mailserver) | ||||
Lint: PEP8 E501 line too long (100 > 79 characters) Lint: PEP8 E501: line too long (100 > 79 characters) | |||||
def parse_mailfolder(self, mailfolder): | def parse_mailfolder(self, mailfolder): | ||||
return self.imap.parse_mailfolder(mailfolder) | return self.imap.parse_mailfolder(mailfolder) | ||||
def expunge_user_folders(self, inbox_folders=None): | def expunge_user_folders(self, inbox_folders=None): | ||||
""" | """ | ||||
Delete folders that have no equivalent user qualifier in the list | Delete folders that have no equivalent user qualifier in the list | ||||
of users passed to this function, ... | of users passed to this function, ... | ||||
TODO: Explain the domain parameter, and actually get it to work | TODO: Explain the domain parameter, and actually get it to work | ||||
properly. This also relates to threading for multi-domain | properly. This also relates to threading for multi-domain | ||||
deployments. | deployments. | ||||
Parameters: | Parameters: | ||||
users | users | ||||
A list of users. Can be a list of user qualifiers, e.g. | A list of users. Can be a list of user qualifiers, e.g. | ||||
[ 'user1', 'user2' ] or a list of user attribute | [ 'user1', 'user2' ] or a list of user attribute | ||||
dictionaries, e.g. [ { 'user1': { 'attr': 'value' } } ] | dictionaries, e.g. [ { 'user1': { 'attr': 'value' } } ] | ||||
primary_domain, secondary_domains | primary_domain, secondary_domains | ||||
""" | """ | ||||
if inbox_folders == None: | if inbox_folders == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
inbox_folders = [] | inbox_folders = [] | ||||
folders = self.list_user_folders() | folders = self.list_user_folders() | ||||
for folder in folders: | for folder in folders: | ||||
log.debug(_("Checking folder: %s") % (folder), level=1) | log.debug(_("Checking folder: %s") % (folder), level=1) | ||||
try: | try: | ||||
if inbox_folders.index(folder) > -1: | if inbox_folders.index(folder) > -1: | ||||
continue | continue | ||||
else: | else: | ||||
log.info(_("Folder has no corresponding user (1): %s") % (folder)) | log.info(_("Folder has no corresponding user (1): %s") % (folder)) | ||||
Lint: PEP8 E501 line too long (86 > 79 characters) Lint: PEP8 E501: line too long (86 > 79 characters) | |||||
self.delete_mailfolder("user/%s" % (folder)) | self.delete_mailfolder("user/%s" % (folder)) | ||||
except: | except: | ||||
log.info(_("Folder has no corresponding user (2): %s") % (folder)) | log.info(_("Folder has no corresponding user (2): %s") % (folder)) | ||||
Lint: PEP8 E501 line too long (82 > 79 characters) Lint: PEP8 E501: line too long (82 > 79 characters) | |||||
try: | try: | ||||
self.delete_mailfolder("user/%s" % (folder)) | self.delete_mailfolder("user/%s" % (folder)) | ||||
except: | except: | ||||
pass | pass | ||||
def delete_mailfolder(self, mailfolder_path): | def delete_mailfolder(self, mailfolder_path): | ||||
""" | """ | ||||
Deletes a mail folder described by mailfolder_path. | Deletes a mail folder described by mailfolder_path. | ||||
""" | """ | ||||
mbox_parts = self.parse_mailfolder(mailfolder_path) | mbox_parts = self.parse_mailfolder(mailfolder_path) | ||||
if mbox_parts == None: | if mbox_parts == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
# We got user identifier only | # We got user identifier only | ||||
log.error(_("Please don't give us just a user identifier")) | log.error(_("Please don't give us just a user identifier")) | ||||
return | return | ||||
log.info(_("Deleting folder %s") % (mailfolder_path)) | log.info(_("Deleting folder %s") % (mailfolder_path)) | ||||
self.imap.dm(self.folder_utf7(mailfolder_path)) | self.imap.dm(self.folder_utf7(mailfolder_path)) | ||||
def get_quota(self, mailfolder_path): | def get_quota(self, mailfolder_path): | ||||
try: | try: | ||||
return self.lq(self.folder_utf7(mailfolder_path)) | return self.lq(self.folder_utf7(mailfolder_path)) | ||||
except: | except: | ||||
return | return | ||||
def get_quota_root(self, mailfolder_path): | def get_quota_root(self, mailfolder_path): | ||||
return self.lqr(self.folder_utf7(mailfolder_path)) | return self.lqr(self.folder_utf7(mailfolder_path)) | ||||
def list_acls(self, folder): | def list_acls(self, folder): | ||||
""" | """ | ||||
List the ACL entries on a folder | List the ACL entries on a folder | ||||
""" | """ | ||||
return self.imap.lam(self.folder_utf7(folder)) | return self.imap.lam(self.folder_utf7(folder)) | ||||
def list_folders(self, pattern): | def list_folders(self, pattern): | ||||
return [self.folder_utf8(x) for x in self.lm(self.folder_utf7(pattern))] | return [self.folder_utf8(x) for x in self.lm(self.folder_utf7(pattern))] | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
def list_user_folders(self, primary_domain=None, secondary_domains=[]): | def list_user_folders(self, primary_domain=None, secondary_domains=[]): | ||||
""" | """ | ||||
List the INBOX folders in the IMAP backend. Returns a list of unique | List the INBOX folders in the IMAP backend. Returns a list of unique | ||||
Lint: PEP8 E501 line too long (80 > 79 characters) Lint: PEP8 E501: line too long (80 > 79 characters) | |||||
base folder names. | base folder names. | ||||
""" | """ | ||||
_folders = self.imap.lm("user/%") | _folders = self.imap.lm("user/%") | ||||
# TODO: Replace the .* below with a regex representing acceptable DNS | # TODO: Replace the .* below with a regex representing acceptable DNS | ||||
# domain names. | # domain names. | ||||
domain_re = ".*\.?%s$" | domain_re = ".*\.?%s$" | ||||
acceptable_domain_name_res = [] | acceptable_domain_name_res = [] | ||||
if not primary_domain == None: | if not primary_domain == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
for domain in [ primary_domain ] + secondary_domains: | for domain in [ primary_domain ] + secondary_domains: | ||||
Lint: PEP8 E201 whitespace after '[' Lint: PEP8 E201: whitespace after '[' | |||||
Lint: PEP8 E202 whitespace before ']' Lint: PEP8 E202: whitespace before ']' | |||||
acceptable_domain_name_res.append(domain_re % (domain)) | acceptable_domain_name_res.append(domain_re % (domain)) | ||||
folders = [] | folders = [] | ||||
for folder in _folders: | for folder in _folders: | ||||
folder_name = None | folder_name = None | ||||
if len(folder.split('@')) > 1: | if len(folder.split('@')) > 1: | ||||
# TODO: acceptable domain name spaces | # TODO: acceptable domain name spaces | ||||
#acceptable = False | #acceptable = False | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#for domain_name_re in acceptable_domain_name_res: | #for domain_name_re in acceptable_domain_name_res: | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#prog = re.compile(domain_name_re) | #prog = re.compile(domain_name_re) | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#if prog.match(folder.split('@')[1]): | #if prog.match(folder.split('@')[1]): | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#print "Acceptable indeed" | #print "Acceptable indeed" | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#acceptable = True | #acceptable = True | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#if not acceptable: | #if not acceptable: | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1],folder,domain_name_re) | #print "%s is not acceptable against %s yet using %s" % (folder.split('@')[1],folder,domain_name_re) | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
Lint: PEP8 E501 line too long (124 > 79 characters) Lint: PEP8 E501: line too long (124 > 79 characters) | |||||
#if acceptable: | #if acceptable: | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
#folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0],folder.split('@')[1]) | #folder_name = "%s@%s" % (folder.split(self.separator)[1].split('@')[0],folder.split('@')[1]) | ||||
Lint: PEP8 E265 block comment should start with '# ' Lint: PEP8 E265: block comment should start with '# ' | |||||
Lint: PEP8 E501 line too long (113 > 79 characters) Lint: PEP8 E501: line too long (113 > 79 characters) | |||||
folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0],folder.split('@')[1]) | folder_name = "%s@%s" % (folder.split(self.get_separator())[1].split('@')[0],folder.split('@')[1]) | ||||
Lint: PEP8 E501 line too long (114 > 79 characters) Lint: PEP8 E501: line too long (114 > 79 characters) | |||||
Lint: PEP8 E231 missing whitespace after ',' Lint: PEP8 E231: missing whitespace after ',' | |||||
else: | else: | ||||
folder_name = "%s" % (folder.split(self.get_separator())[1]) | folder_name = "%s" % (folder.split(self.get_separator())[1]) | ||||
if not folder_name == None: | if not folder_name == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
if not folder_name in folders: | if not folder_name in folders: | ||||
Lint: PEP8 E713 test for membership should be 'not in' Lint: PEP8 E713: test for membership should be 'not in' | |||||
folders.append(folder_name) | folders.append(folder_name) | ||||
return folders | return folders | ||||
def lm(self, *args, **kw): | def lm(self, *args, **kw): | ||||
return self.imap.lm(*args, **kw) | return self.imap.lm(*args, **kw) | ||||
def lq(self, *args, **kw): | def lq(self, *args, **kw): | ||||
Show All 10 Lines |
expected 2 blank lines, found 1